From f639ecfac6687c9e8de4e3f305e168b2e4e1bb87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 17:58:31 +0000 Subject: feat(sync): Phase 3 - resilience and health tracking - Add RelayHealthTracker with DashMap - Implement exponential backoff (5s -> 1h max) - Handle dead relays (24h failures -> daily retry) - Add startup jitter to prevent thundering herd - Add NGIT_SYNC_MAX_BACKOFF_SECS config --- src/sync/manager.rs | 75 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 6 deletions(-) (limited to 'src/sync/manager.rs') diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 8f6a9bd..1f70f42 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -9,18 +9,31 @@ //! - Relay discovery from stored kind 30617 announcements //! - Multiple simultaneous relay connections //! - Three-layer filter strategy via FilterService +//! +//! ## Phase 3 Features +//! +//! - Health tracking with exponential backoff +//! - Dead relay detection after 24h of failures +//! - Startup jitter to prevent thundering herd use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use nostr_relay_builder::prelude::*; +use rand::Rng; use tokio::sync::mpsc; use super::connection::{connect_with_retry, SyncedEvent}; use super::filter::FilterService; +use super::health::RelayHealthTracker; use super::SYNC_SOURCE_ADDR; +use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; +/// Maximum startup jitter in milliseconds (10 seconds) +const MAX_STARTUP_JITTER_MS: u64 = 10_000; + /// Coordinates proactive sync from configured and discovered relays pub struct SyncManager { /// Initial relay URL to sync from (from config) @@ -31,6 +44,8 @@ pub struct SyncManager { database: SharedDatabase, /// Write policy for validating events write_policy: Nip34WritePolicy, + /// Health tracker for relay connections + health_tracker: Arc, } impl SyncManager { @@ -41,17 +56,20 @@ impl SyncManager { /// * `relay_domain` - Our relay's domain (used to exclude self from sync) /// * `database` - Shared database for storing events and querying announcements /// * `write_policy` - Write policy for validating synced events + /// * `config` - Configuration for health tracking settings pub fn new( initial_relay_url: Option, relay_domain: String, database: SharedDatabase, write_policy: Nip34WritePolicy, + config: &Config, ) -> Self { Self { initial_relay_url, relay_domain, database, write_policy, + health_tracker: Arc::new(RelayHealthTracker::new(config)), } } @@ -68,9 +86,15 @@ impl SyncManager { relay_domain, database, write_policy, + health_tracker: Arc::new(RelayHealthTracker::with_defaults()), } } + /// Get a reference to the health tracker + pub fn health_tracker(&self) -> Arc { + self.health_tracker.clone() + } + /// Run the sync manager /// /// This discovers relays from stored announcements, spawns connection tasks, @@ -94,12 +118,14 @@ impl SyncManager { // Track active relay URLs to avoid duplicates let mut active_relays: HashSet = HashSet::new(); + // Collect all relays to connect to + let mut relays_to_connect: Vec = Vec::new(); + // Start with initial relay if configured if let Some(ref url) = self.initial_relay_url { if !self.is_own_relay(url) { - tracing::info!("Connecting to initial sync relay: {}", url); + relays_to_connect.push(url.clone()); active_relays.insert(url.clone()); - self.spawn_connection(url.clone(), tx.clone(), filter_service.clone()); } else { tracing::info!("Skipping initial relay (is our own relay): {}", url); } @@ -109,12 +135,17 @@ impl SyncManager { let discovered_urls = filter_service.discover_relay_urls().await; for url in discovered_urls { if !active_relays.contains(&url) && !self.is_own_relay(&url) { - tracing::info!("Connecting to discovered relay: {}", url); + relays_to_connect.push(url.clone()); active_relays.insert(url.clone()); - self.spawn_connection(url, tx.clone(), filter_service.clone()); } } + // Spawn connections with startup jitter to prevent thundering herd + for url in relays_to_connect { + tracing::info!("Scheduling connection to sync relay: {}", url); + self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone()); + } + if active_relays.is_empty() { tracing::warn!("No sync relays configured or discovered, SyncManager idle"); } else { @@ -133,6 +164,7 @@ impl SyncManager { if !active_relays.contains(&url) && !self.is_own_relay(&url) { tracing::info!("Discovered new relay from event, connecting: {}", url); active_relays.insert(url.clone()); + // New relays discovered during runtime don't need jitter self.spawn_connection(url, tx.clone(), filter_service.clone()); } } @@ -148,7 +180,36 @@ impl SyncManager { url.contains(&self.relay_domain) } - /// Spawn a connection task for a relay + /// Spawn a connection task for a relay with startup jitter + /// + /// Adds a random delay (0-10s) before connecting to prevent thundering herd + /// on startup when multiple relays are configured. + fn spawn_connection_with_jitter( + &self, + url: String, + tx: mpsc::Sender, + filter_service: Arc, + ) { + let domain = self.relay_domain.clone(); + let health_tracker = self.health_tracker.clone(); + + tokio::spawn(async move { + // Apply startup jitter + let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS); + tracing::debug!( + "Applying {}ms startup jitter before connecting to {}", + jitter_ms, + url + ); + tokio::time::sleep(Duration::from_millis(jitter_ms)).await; + + connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; + }); + } + + /// Spawn a connection task for a relay without jitter + /// + /// Used for relays discovered during runtime (not at startup). fn spawn_connection( &self, url: String, @@ -156,8 +217,10 @@ impl SyncManager { filter_service: Arc, ) { let domain = self.relay_domain.clone(); + let health_tracker = self.health_tracker.clone(); + tokio::spawn(async move { - connect_with_retry(&url, tx, filter_service, &domain).await; + connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; }); } -- cgit v1.2.3