From f639ecfac6687c9e8de4e3f305e168b2e4e1bb87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 17:58:31 +0000 Subject: feat(sync): Phase 3 - resilience and health tracking - Add RelayHealthTracker with DashMap - Implement exponential backoff (5s -> 1h max) - Handle dead relays (24h failures -> daily retry) - Add startup jitter to prevent thundering herd - Add NGIT_SYNC_MAX_BACKOFF_SECS config --- src/sync/connection.rs | 80 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 12 deletions(-) (limited to 'src/sync/connection.rs') diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 76cc8e8..319cbbd 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -9,6 +9,12 @@ //! 1. Layer 1: kinds 30617 + 30618 (announcements) //! 2. Layer 2: A/a tags for repository events //! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) +//! +//! ## Phase 3 Features +//! +//! - Health tracking with success/failure reporting +//! - Exponential backoff with health-aware delays +//! - Dead relay detection and minimal retry use std::sync::Arc; use std::time::Duration; @@ -17,6 +23,7 @@ use nostr_sdk::prelude::*; use tokio::sync::mpsc; use super::filter::FilterService; +use super::health::RelayHealthTracker; /// Event received from the sync connection #[derive(Debug, Clone)] @@ -169,47 +176,96 @@ impl SyncConnection { } } -/// Reconnect loop with exponential backoff +/// Reconnect loop with health-aware exponential backoff +/// +/// This function manages the connection lifecycle with health tracking: +/// - Checks health state before attempting connections +/// - Reports success/failure to the health tracker +/// - Respects backoff delays from the health tracker +/// - Handles dead relay detection (24h+ failures) /// /// # Arguments /// * `url` - The relay URL to connect to /// * `tx` - Channel sender for synced events /// * `filter_service` - FilterService for building subscriptions /// * `our_domain` - Our relay's domain (used to extract remote domain) +/// * `health_tracker` - Health tracker for managing connection state pub async fn connect_with_retry( url: &str, tx: mpsc::Sender, filter_service: Arc, _our_domain: &str, + health_tracker: Arc, ) { - let mut backoff = Duration::from_secs(1); - let max_backoff = Duration::from_secs(60); - // Extract remote domain from URL let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); loop { + // Check if we should attempt connection based on health state + if !health_tracker.should_attempt_connection(url) { + // Wait for remaining backoff + if let Some(remaining) = health_tracker.get_remaining_backoff(url) { + tracing::debug!( + "Relay {} in backoff, waiting {:?} before retry", + url, + remaining + ); + tokio::time::sleep(remaining).await; + continue; + } + } + + // Log current health state for dead relays + if health_tracker.is_dead(url) { + tracing::info!( + "Attempting reconnection to dead relay {} (daily retry)", + url + ); + } + match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { Ok(conn) => { - backoff = Duration::from_secs(1); // Reset backoff on successful connection + // Record successful connection + health_tracker.record_success(url); + tracing::info!("Sync connection established to {}", url); + + // Run the connection (this blocks until disconnection) conn.run(tx.clone()).await; + + // Connection ended - record as failure for reconnection backoff + // (The connection ending is considered a failure even if it worked for a while) + health_tracker.record_failure(url); tracing::warn!("Sync connection to {} ended, will reconnect", url); } Err(e) => { + // Record connection failure + health_tracker.record_failure(url); + + let failure_count = health_tracker.get_failure_count(url); + let state = health_tracker.get_state(url); + tracing::error!( - "Failed to connect to sync relay {}: {} (retrying in {:?})", + "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", url, - e, - backoff + failure_count, + state, + e ); } } - // Wait before reconnecting - tokio::time::sleep(backoff).await; + // Get the backoff duration from health tracker + // If the health tracker has no backoff set (shouldn't happen), use a small default + let wait_duration = health_tracker + .get_remaining_backoff(url) + .unwrap_or(Duration::from_secs(5)); - // Exponential backoff - backoff = std::cmp::min(backoff * 2, max_backoff); + tracing::debug!( + "Waiting {:?} before reconnecting to {}", + wait_duration, + url + ); + tokio::time::sleep(wait_duration).await; } } -- cgit v1.2.3