diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 11:57:36 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 11:57:36 +0000 |
| commit | 4941490233a728bc7c64fa80a53d15f772a1219f (patch) | |
| tree | 7fc1bbf6114deb29b5a736b467abf785ea915f02 /src | |
| parent | 6cd7535f2d5f65477ef11b17a4661745ec3a2881 (diff) | |
sync: add sync_base_backoff_secs config for better testing
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 8 | ||||
| -rw-r--r-- | src/http/nip11.rs | 4 | ||||
| -rw-r--r-- | src/sync/health.rs | 58 | ||||
| -rw-r--r-- | src/sync/mod.rs | 6 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 8 |
5 files changed, 65 insertions, 19 deletions
diff --git a/src/config.rs b/src/config.rs index 5e74471..178e840 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -114,6 +114,13 @@ pub struct Config { | |||
| 114 | /// Set to lower value for faster reconnection testing | 114 | /// Set to lower value for faster reconnection testing |
| 115 | #[arg(long, env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", default_value_t = 60)] | 115 | #[arg(long, env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", default_value_t = 60)] |
| 116 | pub sync_disconnect_check_interval_secs: u64, | 116 | pub sync_disconnect_check_interval_secs: u64, |
| 117 | |||
| 118 | /// Base backoff time in seconds for relay reconnection (default: 5) | ||
| 119 | /// Used for exponential backoff: base * 2^(failures-1) | ||
| 120 | /// Set to 1 for faster test cycles | ||
| 121 | /// Note: The connection timeout is capped at this value | ||
| 122 | #[arg(long, env = "NGIT_SYNC_BASE_BACKOFF_SECS", default_value_t = 5)] | ||
| 123 | pub sync_base_backoff_secs: u64, | ||
| 117 | } | 124 | } |
| 118 | 125 | ||
| 119 | impl Config { | 126 | impl Config { |
| @@ -176,6 +183,7 @@ impl Config { | |||
| 176 | sync_reconnect_lookback_days: 3, | 183 | sync_reconnect_lookback_days: 3, |
| 177 | sync_startup_jitter_ms: 10_000, | 184 | sync_startup_jitter_ms: 10_000, |
| 178 | sync_disconnect_check_interval_secs: 60, | 185 | sync_disconnect_check_interval_secs: 60, |
| 186 | sync_base_backoff_secs: 5, | ||
| 179 | } | 187 | } |
| 180 | } | 188 | } |
| 181 | } | 189 | } |
diff --git a/src/http/nip11.rs b/src/http/nip11.rs index 80165ee..19d482f 100644 --- a/src/http/nip11.rs +++ b/src/http/nip11.rs | |||
| @@ -111,6 +111,8 @@ mod tests { | |||
| 111 | sync_reconnect_delay_secs: 10, | 111 | sync_reconnect_delay_secs: 10, |
| 112 | sync_reconnect_lookback_days: 3, | 112 | sync_reconnect_lookback_days: 3, |
| 113 | sync_startup_jitter_ms: 10_000, | 113 | sync_startup_jitter_ms: 10_000, |
| 114 | sync_disconnect_check_interval_secs: 60, | ||
| 115 | sync_base_backoff_secs: 5, | ||
| 114 | }; | 116 | }; |
| 115 | 117 | ||
| 116 | let doc = RelayInformationDocument::from_config(&config); | 118 | let doc = RelayInformationDocument::from_config(&config); |
| @@ -151,6 +153,8 @@ mod tests { | |||
| 151 | sync_reconnect_delay_secs: 10, | 153 | sync_reconnect_delay_secs: 10, |
| 152 | sync_reconnect_lookback_days: 3, | 154 | sync_reconnect_lookback_days: 3, |
| 153 | sync_startup_jitter_ms: 10_000, | 155 | sync_startup_jitter_ms: 10_000, |
| 156 | sync_disconnect_check_interval_secs: 60, | ||
| 157 | sync_base_backoff_secs: 5, | ||
| 154 | }; | 158 | }; |
| 155 | 159 | ||
| 156 | let doc = RelayInformationDocument::from_config(&config); | 160 | let doc = RelayInformationDocument::from_config(&config); |
diff --git a/src/sync/health.rs b/src/sync/health.rs index 51bd5ae..f9a5f3a 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs | |||
| @@ -27,8 +27,8 @@ const DEAD_RETRY_INTERVAL_HOURS: u64 = 24; | |||
| 27 | /// Default maximum backoff duration in seconds (1 hour) | 27 | /// Default maximum backoff duration in seconds (1 hour) |
| 28 | const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; | 28 | const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; |
| 29 | 29 | ||
| 30 | /// Base backoff duration in seconds | 30 | /// Default base backoff duration in seconds |
| 31 | const BASE_BACKOFF_SECS: u64 = 5; | 31 | const DEFAULT_BASE_BACKOFF_SECS: u64 = 5; |
| 32 | 32 | ||
| 33 | /// Health state of a relay connection | 33 | /// Health state of a relay connection |
| 34 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 34 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| @@ -93,6 +93,7 @@ impl RelayHealth { | |||
| 93 | pub struct RelayHealthTracker { | 93 | pub struct RelayHealthTracker { |
| 94 | health: DashMap<String, RelayHealth>, | 94 | health: DashMap<String, RelayHealth>, |
| 95 | max_backoff_secs: u64, | 95 | max_backoff_secs: u64, |
| 96 | base_backoff_secs: u64, | ||
| 96 | } | 97 | } |
| 97 | 98 | ||
| 98 | impl RelayHealthTracker { | 99 | impl RelayHealthTracker { |
| @@ -101,6 +102,7 @@ impl RelayHealthTracker { | |||
| 101 | Self { | 102 | Self { |
| 102 | health: DashMap::new(), | 103 | health: DashMap::new(), |
| 103 | max_backoff_secs: config.sync_max_backoff_secs, | 104 | max_backoff_secs: config.sync_max_backoff_secs, |
| 105 | base_backoff_secs: config.sync_base_backoff_secs, | ||
| 104 | } | 106 | } |
| 105 | } | 107 | } |
| 106 | 108 | ||
| @@ -109,6 +111,7 @@ impl RelayHealthTracker { | |||
| 109 | Self { | 111 | Self { |
| 110 | health: DashMap::new(), | 112 | health: DashMap::new(), |
| 111 | max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, | 113 | max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, |
| 114 | base_backoff_secs: DEFAULT_BASE_BACKOFF_SECS, | ||
| 112 | } | 115 | } |
| 113 | } | 116 | } |
| 114 | 117 | ||
| @@ -117,9 +120,18 @@ impl RelayHealthTracker { | |||
| 117 | Self { | 120 | Self { |
| 118 | health: DashMap::new(), | 121 | health: DashMap::new(), |
| 119 | max_backoff_secs, | 122 | max_backoff_secs, |
| 123 | base_backoff_secs: DEFAULT_BASE_BACKOFF_SECS, | ||
| 120 | } | 124 | } |
| 121 | } | 125 | } |
| 122 | 126 | ||
| 127 | /// Get the base backoff duration in seconds | ||
| 128 | /// | ||
| 129 | /// This is used by SyncManager to set connection timeout | ||
| 130 | /// (connection timeout should not exceed base backoff) | ||
| 131 | pub fn base_backoff_secs(&self) -> u64 { | ||
| 132 | self.base_backoff_secs | ||
| 133 | } | ||
| 134 | |||
| 123 | /// Record a successful connection to a relay | 135 | /// Record a successful connection to a relay |
| 124 | /// | 136 | /// |
| 125 | /// Resets the relay to Healthy state and clears failure counters. | 137 | /// Resets the relay to Healthy state and clears failure counters. |
| @@ -188,6 +200,7 @@ impl RelayHealthTracker { | |||
| 188 | health.state = HealthState::Degraded; | 200 | health.state = HealthState::Degraded; |
| 189 | let backoff = Self::get_backoff_duration( | 201 | let backoff = Self::get_backoff_duration( |
| 190 | health.consecutive_failures, | 202 | health.consecutive_failures, |
| 203 | self.base_backoff_secs, | ||
| 191 | self.max_backoff_secs, | 204 | self.max_backoff_secs, |
| 192 | ); | 205 | ); |
| 193 | health.next_retry_at = Some(now + backoff); | 206 | health.next_retry_at = Some(now + backoff); |
| @@ -277,9 +290,18 @@ impl RelayHealthTracker { | |||
| 277 | 290 | ||
| 278 | /// Calculate the backoff duration based on failure count | 291 | /// Calculate the backoff duration based on failure count |
| 279 | /// | 292 | /// |
| 280 | /// Uses exponential backoff: base * 2^failures, capped at max_backoff | 293 | /// Uses exponential backoff: base * 2^(failures-1), capped at max_backoff |
| 281 | pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration { | 294 | /// |
| 282 | let backoff_secs = BASE_BACKOFF_SECS | 295 | /// # Arguments |
| 296 | /// * `consecutive_failures` - Number of consecutive failures (1 = first failure) | ||
| 297 | /// * `base_backoff_secs` - Base backoff time in seconds | ||
| 298 | /// * `max_backoff_secs` - Maximum backoff cap in seconds | ||
| 299 | pub fn get_backoff_duration( | ||
| 300 | consecutive_failures: u32, | ||
| 301 | base_backoff_secs: u64, | ||
| 302 | max_backoff_secs: u64, | ||
| 303 | ) -> Duration { | ||
| 304 | let backoff_secs = base_backoff_secs | ||
| 283 | .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); | 305 | .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); |
| 284 | Duration::from_secs(backoff_secs.min(max_backoff_secs)) | 306 | Duration::from_secs(backoff_secs.min(max_backoff_secs)) |
| 285 | } | 307 | } |
| @@ -345,39 +367,43 @@ mod tests { | |||
| 345 | 367 | ||
| 346 | #[test] | 368 | #[test] |
| 347 | fn test_backoff_increases_exponentially() { | 369 | fn test_backoff_increases_exponentially() { |
| 348 | // failure 1: 5s | 370 | let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds |
| 371 | let max = 3600u64; | ||
| 372 | |||
| 373 | // failure 1: 5s (base * 2^0 = 5) | ||
| 349 | assert_eq!( | 374 | assert_eq!( |
| 350 | RelayHealthTracker::get_backoff_duration(1, 3600), | 375 | RelayHealthTracker::get_backoff_duration(1, base, max), |
| 351 | Duration::from_secs(5) | 376 | Duration::from_secs(5) |
| 352 | ); | 377 | ); |
| 353 | // failure 2: 10s | 378 | // failure 2: 10s (base * 2^1 = 10) |
| 354 | assert_eq!( | 379 | assert_eq!( |
| 355 | RelayHealthTracker::get_backoff_duration(2, 3600), | 380 | RelayHealthTracker::get_backoff_duration(2, base, max), |
| 356 | Duration::from_secs(10) | 381 | Duration::from_secs(10) |
| 357 | ); | 382 | ); |
| 358 | // failure 3: 20s | 383 | // failure 3: 20s (base * 2^2 = 20) |
| 359 | assert_eq!( | 384 | assert_eq!( |
| 360 | RelayHealthTracker::get_backoff_duration(3, 3600), | 385 | RelayHealthTracker::get_backoff_duration(3, base, max), |
| 361 | Duration::from_secs(20) | 386 | Duration::from_secs(20) |
| 362 | ); | 387 | ); |
| 363 | // failure 4: 40s | 388 | // failure 4: 40s (base * 2^3 = 40) |
| 364 | assert_eq!( | 389 | assert_eq!( |
| 365 | RelayHealthTracker::get_backoff_duration(4, 3600), | 390 | RelayHealthTracker::get_backoff_duration(4, base, max), |
| 366 | Duration::from_secs(40) | 391 | Duration::from_secs(40) |
| 367 | ); | 392 | ); |
| 368 | // failure 5: 80s | 393 | // failure 5: 80s (base * 2^4 = 80) |
| 369 | assert_eq!( | 394 | assert_eq!( |
| 370 | RelayHealthTracker::get_backoff_duration(5, 3600), | 395 | RelayHealthTracker::get_backoff_duration(5, base, max), |
| 371 | Duration::from_secs(80) | 396 | Duration::from_secs(80) |
| 372 | ); | 397 | ); |
| 373 | } | 398 | } |
| 374 | 399 | ||
| 375 | #[test] | 400 | #[test] |
| 376 | fn test_backoff_capped_at_max() { | 401 | fn test_backoff_capped_at_max() { |
| 402 | let base = DEFAULT_BASE_BACKOFF_SECS; | ||
| 377 | let max_backoff = 3600u64; | 403 | let max_backoff = 3600u64; |
| 378 | // After many failures, should cap at max_backoff (1 hour) | 404 | // After many failures, should cap at max_backoff (1 hour) |
| 379 | assert_eq!( | 405 | assert_eq!( |
| 380 | RelayHealthTracker::get_backoff_duration(20, max_backoff), | 406 | RelayHealthTracker::get_backoff_duration(20, base, max_backoff), |
| 381 | Duration::from_secs(max_backoff) | 407 | Duration::from_secs(max_backoff) |
| 382 | ); | 408 | ); |
| 383 | } | 409 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 16ad833..21f31df 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1184,8 +1184,12 @@ impl SyncManager { | |||
| 1184 | // Create relay connection | 1184 | // Create relay connection |
| 1185 | let connection = RelayConnection::new(relay_url.clone()); | 1185 | let connection = RelayConnection::new(relay_url.clone()); |
| 1186 | 1186 | ||
| 1187 | // Get connection timeout from health tracker (capped at base backoff) | ||
| 1188 | // This ensures the connection attempt completes before the next retry would be scheduled | ||
| 1189 | let connection_timeout_secs = self.health_tracker.base_backoff_secs(); | ||
| 1190 | |||
| 1187 | // Connect and subscribe to Layer 1 | 1191 | // Connect and subscribe to Layer 1 |
| 1188 | match connection.connect_and_subscribe(None).await { | 1192 | match connection.connect_and_subscribe(None, connection_timeout_secs).await { |
| 1189 | Ok(_) => { | 1193 | Ok(_) => { |
| 1190 | // Record successful connection attempt | 1194 | // Record successful connection attempt |
| 1191 | if let Some(ref metrics) = self.metrics { | 1195 | if let Some(ref metrics) = self.metrics { |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 4f26779..9a580d2 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -60,6 +60,9 @@ impl RelayConnection { | |||
| 60 | /// | 60 | /// |
| 61 | /// # Arguments | 61 | /// # Arguments |
| 62 | /// * `since` - Optional timestamp for incremental sync on reconnect | 62 | /// * `since` - Optional timestamp for incremental sync on reconnect |
| 63 | /// * `connection_timeout_secs` - Timeout for the connection attempt in seconds. | ||
| 64 | /// Should be no larger than base_backoff_secs to ensure the connection attempt | ||
| 65 | /// completes before the next retry would be scheduled. | ||
| 63 | /// | 66 | /// |
| 64 | /// # Returns | 67 | /// # Returns |
| 65 | /// * `Ok(SubscriptionId)` - The subscription ID on successful connection | 68 | /// * `Ok(SubscriptionId)` - The subscription ID on successful connection |
| @@ -67,6 +70,7 @@ impl RelayConnection { | |||
| 67 | pub async fn connect_and_subscribe( | 70 | pub async fn connect_and_subscribe( |
| 68 | &self, | 71 | &self, |
| 69 | since: Option<Timestamp>, | 72 | since: Option<Timestamp>, |
| 73 | connection_timeout_secs: u64, | ||
| 70 | ) -> Result<SubscriptionId, String> { | 74 | ) -> Result<SubscriptionId, String> { |
| 71 | // Add relay to client | 75 | // Add relay to client |
| 72 | self.client | 76 | self.client |
| @@ -83,13 +87,13 @@ impl RelayConnection { | |||
| 83 | // | 87 | // |
| 84 | // Using try_connect_relay gives us: | 88 | // Using try_connect_relay gives us: |
| 85 | // 1. Immediate error return on connection failure | 89 | // 1. Immediate error return on connection failure |
| 86 | // 2. Configurable timeout (5 seconds default) | 90 | // 2. Configurable timeout (set to base_backoff_secs to ensure retry timing works) |
| 87 | // 3. No conflicting retry logic (we use HealthTracker for backoff) | 91 | // 3. No conflicting retry logic (we use HealthTracker for backoff) |
| 88 | // 4. Cleaner error messages for metrics recording | 92 | // 4. Cleaner error messages for metrics recording |
| 89 | // | 93 | // |
| 90 | // See: nostr-sdk-0.44 Client::try_connect_relay documentation | 94 | // See: nostr-sdk-0.44 Client::try_connect_relay documentation |
| 91 | self.client | 95 | self.client |
| 92 | .try_connect_relay(&self.url, std::time::Duration::from_secs(5)) | 96 | .try_connect_relay(&self.url, std::time::Duration::from_secs(connection_timeout_secs)) |
| 93 | .await | 97 | .await |
| 94 | .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; | 98 | .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; |
| 95 | 99 | ||