From 4941490233a728bc7c64fa80a53d15f772a1219f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 11:57:36 +0000 Subject: sync: add sync_base_backoff_secs config for better testing --- src/sync/health.rs | 58 ++++++++++++++++++++++++++++++++------------ src/sync/mod.rs | 6 ++++- src/sync/relay_connection.rs | 8 ++++-- 3 files changed, 53 insertions(+), 19 deletions(-) (limited to 'src/sync') diff --git a/src/sync/health.rs b/src/sync/health.rs index 51bd5ae..f9a5f3a 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs @@ -27,8 +27,8 @@ const DEAD_RETRY_INTERVAL_HOURS: u64 = 24; /// Default maximum backoff duration in seconds (1 hour) const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; -/// Base backoff duration in seconds -const BASE_BACKOFF_SECS: u64 = 5; +/// Default base backoff duration in seconds +const DEFAULT_BASE_BACKOFF_SECS: u64 = 5; /// Health state of a relay connection #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -93,6 +93,7 @@ impl RelayHealth { pub struct RelayHealthTracker { health: DashMap, max_backoff_secs: u64, + base_backoff_secs: u64, } impl RelayHealthTracker { @@ -101,6 +102,7 @@ impl RelayHealthTracker { Self { health: DashMap::new(), max_backoff_secs: config.sync_max_backoff_secs, + base_backoff_secs: config.sync_base_backoff_secs, } } @@ -109,6 +111,7 @@ impl RelayHealthTracker { Self { health: DashMap::new(), max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, + base_backoff_secs: DEFAULT_BASE_BACKOFF_SECS, } } @@ -117,9 +120,18 @@ impl RelayHealthTracker { Self { health: DashMap::new(), max_backoff_secs, + base_backoff_secs: DEFAULT_BASE_BACKOFF_SECS, } } + /// Get the base backoff duration in seconds + /// + /// This is used by SyncManager to set connection timeout + /// (connection timeout should not exceed base backoff) + pub fn base_backoff_secs(&self) -> u64 { + self.base_backoff_secs + } + /// Record a successful connection to a relay /// /// Resets the relay to Healthy state and clears failure counters. @@ -188,6 +200,7 @@ impl RelayHealthTracker { health.state = HealthState::Degraded; let backoff = Self::get_backoff_duration( health.consecutive_failures, + self.base_backoff_secs, self.max_backoff_secs, ); health.next_retry_at = Some(now + backoff); @@ -277,9 +290,18 @@ impl RelayHealthTracker { /// Calculate the backoff duration based on failure count /// - /// Uses exponential backoff: base * 2^failures, capped at max_backoff - pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration { - let backoff_secs = BASE_BACKOFF_SECS + /// Uses exponential backoff: base * 2^(failures-1), capped at max_backoff + /// + /// # Arguments + /// * `consecutive_failures` - Number of consecutive failures (1 = first failure) + /// * `base_backoff_secs` - Base backoff time in seconds + /// * `max_backoff_secs` - Maximum backoff cap in seconds + pub fn get_backoff_duration( + consecutive_failures: u32, + base_backoff_secs: u64, + max_backoff_secs: u64, + ) -> Duration { + let backoff_secs = base_backoff_secs .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); Duration::from_secs(backoff_secs.min(max_backoff_secs)) } @@ -345,39 +367,43 @@ mod tests { #[test] fn test_backoff_increases_exponentially() { - // failure 1: 5s + let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds + let max = 3600u64; + + // failure 1: 5s (base * 2^0 = 5) assert_eq!( - RelayHealthTracker::get_backoff_duration(1, 3600), + RelayHealthTracker::get_backoff_duration(1, base, max), Duration::from_secs(5) ); - // failure 2: 10s + // failure 2: 10s (base * 2^1 = 10) assert_eq!( - RelayHealthTracker::get_backoff_duration(2, 3600), + RelayHealthTracker::get_backoff_duration(2, base, max), Duration::from_secs(10) ); - // failure 3: 20s + // failure 3: 20s (base * 2^2 = 20) assert_eq!( - RelayHealthTracker::get_backoff_duration(3, 3600), + RelayHealthTracker::get_backoff_duration(3, base, max), Duration::from_secs(20) ); - // failure 4: 40s + // failure 4: 40s (base * 2^3 = 40) assert_eq!( - RelayHealthTracker::get_backoff_duration(4, 3600), + RelayHealthTracker::get_backoff_duration(4, base, max), Duration::from_secs(40) ); - // failure 5: 80s + // failure 5: 80s (base * 2^4 = 80) assert_eq!( - RelayHealthTracker::get_backoff_duration(5, 3600), + RelayHealthTracker::get_backoff_duration(5, base, max), Duration::from_secs(80) ); } #[test] fn test_backoff_capped_at_max() { + let base = DEFAULT_BASE_BACKOFF_SECS; let max_backoff = 3600u64; // After many failures, should cap at max_backoff (1 hour) assert_eq!( - RelayHealthTracker::get_backoff_duration(20, max_backoff), + RelayHealthTracker::get_backoff_duration(20, base, max_backoff), Duration::from_secs(max_backoff) ); } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 16ad833..21f31df 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1184,8 +1184,12 @@ impl SyncManager { // Create relay connection let connection = RelayConnection::new(relay_url.clone()); + // Get connection timeout from health tracker (capped at base backoff) + // This ensures the connection attempt completes before the next retry would be scheduled + let connection_timeout_secs = self.health_tracker.base_backoff_secs(); + // Connect and subscribe to Layer 1 - match connection.connect_and_subscribe(None).await { + match connection.connect_and_subscribe(None, connection_timeout_secs).await { Ok(_) => { // Record successful connection attempt if let Some(ref metrics) = self.metrics { diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 4f26779..9a580d2 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -60,6 +60,9 @@ impl RelayConnection { /// /// # Arguments /// * `since` - Optional timestamp for incremental sync on reconnect + /// * `connection_timeout_secs` - Timeout for the connection attempt in seconds. + /// Should be no larger than base_backoff_secs to ensure the connection attempt + /// completes before the next retry would be scheduled. /// /// # Returns /// * `Ok(SubscriptionId)` - The subscription ID on successful connection @@ -67,6 +70,7 @@ impl RelayConnection { pub async fn connect_and_subscribe( &self, since: Option, + connection_timeout_secs: u64, ) -> Result { // Add relay to client self.client @@ -83,13 +87,13 @@ impl RelayConnection { // // Using try_connect_relay gives us: // 1. Immediate error return on connection failure - // 2. Configurable timeout (5 seconds default) + // 2. Configurable timeout (set to base_backoff_secs to ensure retry timing works) // 3. No conflicting retry logic (we use HealthTracker for backoff) // 4. Cleaner error messages for metrics recording // // See: nostr-sdk-0.44 Client::try_connect_relay documentation self.client - .try_connect_relay(&self.url, std::time::Duration::from_secs(5)) + .try_connect_relay(&self.url, std::time::Duration::from_secs(connection_timeout_secs)) .await .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; -- cgit v1.2.3