From f639ecfac6687c9e8de4e3f305e168b2e4e1bb87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 17:58:31 +0000 Subject: feat(sync): Phase 3 - resilience and health tracking - Add RelayHealthTracker with DashMap - Implement exponential backoff (5s -> 1h max) - Handle dead relays (24h failures -> daily retry) - Add startup jitter to prevent thundering herd - Add NGIT_SYNC_MAX_BACKOFF_SECS config --- src/config.rs | 5 + src/http/nip11.rs | 2 + src/main.rs | 3 +- src/sync/connection.rs | 80 +++++++-- src/sync/health.rs | 475 +++++++++++++++++++++++++++++++++++++++++++++++++ src/sync/manager.rs | 75 +++++++- src/sync/mod.rs | 9 + 7 files changed, 630 insertions(+), 19 deletions(-) create mode 100644 src/sync/health.rs (limited to 'src') diff --git a/src/config.rs b/src/config.rs index a2a27be..441a14d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -87,6 +87,10 @@ pub struct Config { /// URL of relay to sync kind 30617 events from (optional, enables proactive sync) #[arg(long, env = "NGIT_SYNC_RELAY_URL")] pub sync_relay_url: Option, + + /// Maximum backoff time in seconds for sync relay reconnection (default: 3600 = 1 hour) + #[arg(long, env = "NGIT_SYNC_MAX_BACKOFF_SECS", default_value_t = 3600)] + pub sync_max_backoff_secs: u64, } impl Config { @@ -143,6 +147,7 @@ impl Config { metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, sync_relay_url: None, + sync_max_backoff_secs: 3600, } } } diff --git a/src/http/nip11.rs b/src/http/nip11.rs index e9e1c25..22e5b22 100644 --- a/src/http/nip11.rs +++ b/src/http/nip11.rs @@ -106,6 +106,7 @@ mod tests { metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, sync_relay_url: None, + sync_max_backoff_secs: 3600, }; let doc = RelayInformationDocument::from_config(&config); @@ -141,6 +142,7 @@ mod tests { metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, sync_relay_url: None, + sync_max_backoff_secs: 3600, }; let doc = RelayInformationDocument::from_config(&config); diff --git a/src/main.rs b/src/main.rs index 31e7cf6..9273afd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -51,13 +51,14 @@ async fn main() -> Result<()> { config.domain ); - // Start SyncManager for proactive sync (Phase 2: multi-relay support) + // Start SyncManager for proactive sync (Phase 2: multi-relay support, Phase 3: health tracking) // Even without initial sync_relay_url, SyncManager can discover relays from stored announcements let sync_manager = SyncManager::new( config.sync_relay_url.clone(), config.domain.clone(), relay_with_db.database.clone(), relay_with_db.write_policy.clone(), + &config, ); if config.sync_relay_url.is_some() { diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 76cc8e8..319cbbd 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -9,6 +9,12 @@ //! 1. Layer 1: kinds 30617 + 30618 (announcements) //! 2. Layer 2: A/a tags for repository events //! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) +//! +//! ## Phase 3 Features +//! +//! - Health tracking with success/failure reporting +//! - Exponential backoff with health-aware delays +//! - Dead relay detection and minimal retry use std::sync::Arc; use std::time::Duration; @@ -17,6 +23,7 @@ use nostr_sdk::prelude::*; use tokio::sync::mpsc; use super::filter::FilterService; +use super::health::RelayHealthTracker; /// Event received from the sync connection #[derive(Debug, Clone)] @@ -169,47 +176,96 @@ impl SyncConnection { } } -/// Reconnect loop with exponential backoff +/// Reconnect loop with health-aware exponential backoff +/// +/// This function manages the connection lifecycle with health tracking: +/// - Checks health state before attempting connections +/// - Reports success/failure to the health tracker +/// - Respects backoff delays from the health tracker +/// - Handles dead relay detection (24h+ failures) /// /// # Arguments /// * `url` - The relay URL to connect to /// * `tx` - Channel sender for synced events /// * `filter_service` - FilterService for building subscriptions /// * `our_domain` - Our relay's domain (used to extract remote domain) +/// * `health_tracker` - Health tracker for managing connection state pub async fn connect_with_retry( url: &str, tx: mpsc::Sender, filter_service: Arc, _our_domain: &str, + health_tracker: Arc, ) { - let mut backoff = Duration::from_secs(1); - let max_backoff = Duration::from_secs(60); - // Extract remote domain from URL let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); loop { + // Check if we should attempt connection based on health state + if !health_tracker.should_attempt_connection(url) { + // Wait for remaining backoff + if let Some(remaining) = health_tracker.get_remaining_backoff(url) { + tracing::debug!( + "Relay {} in backoff, waiting {:?} before retry", + url, + remaining + ); + tokio::time::sleep(remaining).await; + continue; + } + } + + // Log current health state for dead relays + if health_tracker.is_dead(url) { + tracing::info!( + "Attempting reconnection to dead relay {} (daily retry)", + url + ); + } + match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { Ok(conn) => { - backoff = Duration::from_secs(1); // Reset backoff on successful connection + // Record successful connection + health_tracker.record_success(url); + tracing::info!("Sync connection established to {}", url); + + // Run the connection (this blocks until disconnection) conn.run(tx.clone()).await; + + // Connection ended - record as failure for reconnection backoff + // (The connection ending is considered a failure even if it worked for a while) + health_tracker.record_failure(url); tracing::warn!("Sync connection to {} ended, will reconnect", url); } Err(e) => { + // Record connection failure + health_tracker.record_failure(url); + + let failure_count = health_tracker.get_failure_count(url); + let state = health_tracker.get_state(url); + tracing::error!( - "Failed to connect to sync relay {}: {} (retrying in {:?})", + "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", url, - e, - backoff + failure_count, + state, + e ); } } - // Wait before reconnecting - tokio::time::sleep(backoff).await; + // Get the backoff duration from health tracker + // If the health tracker has no backoff set (shouldn't happen), use a small default + let wait_duration = health_tracker + .get_remaining_backoff(url) + .unwrap_or(Duration::from_secs(5)); - // Exponential backoff - backoff = std::cmp::min(backoff * 2, max_backoff); + tracing::debug!( + "Waiting {:?} before reconnecting to {}", + wait_duration, + url + ); + tokio::time::sleep(wait_duration).await; } } diff --git a/src/sync/health.rs b/src/sync/health.rs new file mode 100644 index 0000000..51bd5ae --- /dev/null +++ b/src/sync/health.rs @@ -0,0 +1,475 @@ +//! Relay Health Tracking for GRASP-02 Proactive Sync +//! +//! This module implements health tracking for relay connections, including: +//! - Health state machine (Healthy -> Degraded -> Dead) +//! - Exponential backoff with configurable max delay +//! - Dead relay detection after 24h of continuous failures +//! +//! ## Health States +//! +//! - **Healthy**: Working connection, no recent failures +//! - **Degraded**: Connection failed, retrying with backoff +//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use dashmap::DashMap; + +use crate::config::Config; + +/// Duration threshold before a relay is considered dead (24 hours) +const DEAD_THRESHOLD_HOURS: u64 = 24; + +/// How often dead relays are retried (once per 24 hours) +const DEAD_RETRY_INTERVAL_HOURS: u64 = 24; + +/// Default maximum backoff duration in seconds (1 hour) +const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; + +/// Base backoff duration in seconds +const BASE_BACKOFF_SECS: u64 = 5; + +/// Health state of a relay connection +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HealthState { + /// Working connection, no recent failures + Healthy, + /// Connection failed, retrying with exponential backoff + Degraded, + /// 24h+ of continuous failures, minimal retry + Dead, +} + +impl std::fmt::Display for HealthState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HealthState::Healthy => write!(f, "healthy"), + HealthState::Degraded => write!(f, "degraded"), + HealthState::Dead => write!(f, "dead"), + } + } +} + +/// Health information for a single relay +#[derive(Debug, Clone)] +pub struct RelayHealth { + /// Current health state + pub state: HealthState, + /// Number of consecutive connection failures + pub consecutive_failures: u32, + /// Time of the first failure in the current failure streak + pub first_failure_time: Option, + /// Time of the last failure + pub last_failure_time: Option, + /// Time of the last successful connection + pub last_success_time: Option, + /// Next time a connection attempt should be made + pub next_retry_at: Option, +} + +impl Default for RelayHealth { + fn default() -> Self { + Self { + state: HealthState::Healthy, + consecutive_failures: 0, + first_failure_time: None, + last_failure_time: None, + last_success_time: None, + next_retry_at: None, + } + } +} + +impl RelayHealth { + /// Create a new RelayHealth with healthy state + pub fn new() -> Self { + Self::default() + } +} + +/// Thread-safe relay health tracker using DashMap +#[derive(Debug)] +pub struct RelayHealthTracker { + health: DashMap, + max_backoff_secs: u64, +} + +impl RelayHealthTracker { + /// Create a new RelayHealthTracker + pub fn new(config: &Config) -> Self { + Self { + health: DashMap::new(), + max_backoff_secs: config.sync_max_backoff_secs, + } + } + + /// Create a new RelayHealthTracker with default settings + pub fn with_defaults() -> Self { + Self { + health: DashMap::new(), + max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, + } + } + + /// Create a new RelayHealthTracker with custom max backoff + pub fn with_max_backoff(max_backoff_secs: u64) -> Self { + Self { + health: DashMap::new(), + max_backoff_secs, + } + } + + /// Record a successful connection to a relay + /// + /// Resets the relay to Healthy state and clears failure counters. + pub fn record_success(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + + let old_state = health.state; + + // Reset to healthy state + health.state = HealthState::Healthy; + health.consecutive_failures = 0; + health.first_failure_time = None; + health.last_failure_time = None; + health.last_success_time = Some(now); + health.next_retry_at = None; + + if old_state != HealthState::Healthy { + tracing::info!( + "Relay {} recovered to healthy (was {:?})", + relay_url, + old_state + ); + } + } + + /// Record a connection failure for a relay + /// + /// Increments failure counter, updates state, and calculates next retry time. + pub fn record_failure(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + + let old_state = health.state; + + // Set first_failure_time if this is a new failure streak + if health.first_failure_time.is_none() { + health.first_failure_time = Some(now); + } + + health.consecutive_failures = health.consecutive_failures.saturating_add(1); + health.last_failure_time = Some(now); + + // Check if we should transition to Dead state + if let Some(first_failure) = health.first_failure_time { + let failure_duration = now.duration_since(first_failure); + let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); + + if failure_duration >= dead_threshold { + health.state = HealthState::Dead; + // Dead relays retry once per day + health.next_retry_at = + Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); + + if old_state != HealthState::Dead { + tracing::warn!( + "Relay {} marked dead after 24h failures ({} consecutive failures)", + relay_url, + health.consecutive_failures + ); + } + } else { + // Degraded state with exponential backoff + health.state = HealthState::Degraded; + let backoff = Self::get_backoff_duration( + health.consecutive_failures, + self.max_backoff_secs, + ); + health.next_retry_at = Some(now + backoff); + + if old_state != HealthState::Degraded { + tracing::warn!( + "Relay {} degraded, backoff {:?}", + relay_url, + backoff + ); + } else { + tracing::debug!( + "Relay {} failure #{}, backoff {:?}", + relay_url, + health.consecutive_failures, + backoff + ); + } + } + } + } + + /// Check if a connection attempt should be made to a relay + /// + /// Returns true if: + /// - The relay has no health record (first attempt) + /// - The relay is healthy + /// - The backoff period has elapsed + pub fn should_attempt_connection(&self, relay_url: &str) -> bool { + let entry = self.health.get(relay_url); + + match entry { + None => true, // No record, allow first attempt + Some(entry) => { + let health = entry.value(); + + match health.state { + HealthState::Healthy => true, + HealthState::Degraded | HealthState::Dead => { + // Check if backoff period has elapsed + match health.next_retry_at { + None => true, + Some(next_retry) => Instant::now() >= next_retry, + } + } + } + } + } + } + + /// Get the current health state of a relay + pub fn get_state(&self, relay_url: &str) -> HealthState { + self.health + .get(relay_url) + .map(|entry| entry.value().state) + .unwrap_or(HealthState::Healthy) + } + + /// Check if a relay is marked as dead + pub fn is_dead(&self, relay_url: &str) -> bool { + self.get_state(relay_url) == HealthState::Dead + } + + /// Get the remaining backoff duration for a relay + /// + /// Returns None if no backoff is active. + pub fn get_remaining_backoff(&self, relay_url: &str) -> Option { + let entry = self.health.get(relay_url)?; + let health = entry.value(); + let next_retry = health.next_retry_at?; + let now = Instant::now(); + + if now >= next_retry { + None + } else { + Some(next_retry - now) + } + } + + /// Get the consecutive failure count for a relay + pub fn get_failure_count(&self, relay_url: &str) -> u32 { + self.health + .get(relay_url) + .map(|entry| entry.value().consecutive_failures) + .unwrap_or(0) + } + + /// Calculate the backoff duration based on failure count + /// + /// Uses exponential backoff: base * 2^failures, capped at max_backoff + pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration { + let backoff_secs = BASE_BACKOFF_SECS + .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); + Duration::from_secs(backoff_secs.min(max_backoff_secs)) + } + + /// Get all tracked relay URLs + pub fn get_tracked_relays(&self) -> Vec { + self.health.iter().map(|entry| entry.key().clone()).collect() + } + + /// Get a clone of the health info for a relay + pub fn get_health(&self, relay_url: &str) -> Option { + self.health.get(relay_url).map(|entry| entry.value().clone()) + } +} + +/// Create a shared RelayHealthTracker wrapped in Arc +pub fn create_health_tracker(config: &Config) -> Arc { + Arc::new(RelayHealthTracker::new(config)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_health_state_display() { + assert_eq!(HealthState::Healthy.to_string(), "healthy"); + assert_eq!(HealthState::Degraded.to_string(), "degraded"); + assert_eq!(HealthState::Dead.to_string(), "dead"); + } + + #[test] + fn test_default_health_is_healthy() { + let health = RelayHealth::default(); + assert_eq!(health.state, HealthState::Healthy); + assert_eq!(health.consecutive_failures, 0); + assert!(health.first_failure_time.is_none()); + } + + #[test] + fn test_should_attempt_connection_new_relay() { + let tracker = RelayHealthTracker::with_defaults(); + assert!(tracker.should_attempt_connection("wss://new-relay.example.com")); + } + + #[test] + fn test_record_success_resets_to_healthy() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Simulate a few failures + tracker.record_failure(url); + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 2); + + // Record success + tracker.record_success(url); + assert_eq!(tracker.get_state(url), HealthState::Healthy); + assert_eq!(tracker.get_failure_count(url), 0); + assert!(tracker.should_attempt_connection(url)); + } + + #[test] + fn test_backoff_increases_exponentially() { + // failure 1: 5s + assert_eq!( + RelayHealthTracker::get_backoff_duration(1, 3600), + Duration::from_secs(5) + ); + // failure 2: 10s + assert_eq!( + RelayHealthTracker::get_backoff_duration(2, 3600), + Duration::from_secs(10) + ); + // failure 3: 20s + assert_eq!( + RelayHealthTracker::get_backoff_duration(3, 3600), + Duration::from_secs(20) + ); + // failure 4: 40s + assert_eq!( + RelayHealthTracker::get_backoff_duration(4, 3600), + Duration::from_secs(40) + ); + // failure 5: 80s + assert_eq!( + RelayHealthTracker::get_backoff_duration(5, 3600), + Duration::from_secs(80) + ); + } + + #[test] + fn test_backoff_capped_at_max() { + let max_backoff = 3600u64; + // After many failures, should cap at max_backoff (1 hour) + assert_eq!( + RelayHealthTracker::get_backoff_duration(20, max_backoff), + Duration::from_secs(max_backoff) + ); + } + + #[test] + fn test_degraded_state_after_failure() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 1); + } + + #[test] + fn test_backoff_blocks_immediate_reconnection() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_failure(url); + + // Immediately after failure, should not attempt (backoff active) + assert!(!tracker.should_attempt_connection(url)); + + // Remaining backoff should be some positive duration + let remaining = tracker.get_remaining_backoff(url); + assert!(remaining.is_some()); + assert!(remaining.unwrap() > Duration::ZERO); + } + + #[test] + fn test_is_dead() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Initially not dead + assert!(!tracker.is_dead(url)); + + // After a failure, still not dead (just degraded) + tracker.record_failure(url); + assert!(!tracker.is_dead(url)); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + } + + #[test] + fn test_get_tracked_relays() { + let tracker = RelayHealthTracker::with_defaults(); + + tracker.record_success("wss://relay1.example.com"); + tracker.record_failure("wss://relay2.example.com"); + + let tracked = tracker.get_tracked_relays(); + assert_eq!(tracked.len(), 2); + assert!(tracked.contains(&"wss://relay1.example.com".to_string())); + assert!(tracked.contains(&"wss://relay2.example.com".to_string())); + } + + #[test] + fn test_custom_max_backoff() { + let custom_max = 60u64; // 1 minute max + let tracker = RelayHealthTracker::with_max_backoff(custom_max); + let url = "wss://test-relay.example.com"; + + // Simulate many failures + for _ in 0..20 { + tracker.record_failure(url); + } + + // The remaining backoff should respect the custom max + // Note: We can't easily test the internal backoff calculation here, + // but we can verify the tracker was created with the custom setting + assert_eq!(tracker.max_backoff_secs, custom_max); + } + + #[test] + fn test_get_health_returns_clone() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_success(url); + let health = tracker.get_health(url); + + assert!(health.is_some()); + let health = health.unwrap(); + assert_eq!(health.state, HealthState::Healthy); + assert!(health.last_success_time.is_some()); + } + + #[test] + fn test_get_health_nonexistent() { + let tracker = RelayHealthTracker::with_defaults(); + let health = tracker.get_health("wss://nonexistent.example.com"); + assert!(health.is_none()); + } +} \ No newline at end of file diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 8f6a9bd..1f70f42 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -9,18 +9,31 @@ //! - Relay discovery from stored kind 30617 announcements //! - Multiple simultaneous relay connections //! - Three-layer filter strategy via FilterService +//! +//! ## Phase 3 Features +//! +//! - Health tracking with exponential backoff +//! - Dead relay detection after 24h of failures +//! - Startup jitter to prevent thundering herd use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use nostr_relay_builder::prelude::*; +use rand::Rng; use tokio::sync::mpsc; use super::connection::{connect_with_retry, SyncedEvent}; use super::filter::FilterService; +use super::health::RelayHealthTracker; use super::SYNC_SOURCE_ADDR; +use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; +/// Maximum startup jitter in milliseconds (10 seconds) +const MAX_STARTUP_JITTER_MS: u64 = 10_000; + /// Coordinates proactive sync from configured and discovered relays pub struct SyncManager { /// Initial relay URL to sync from (from config) @@ -31,6 +44,8 @@ pub struct SyncManager { database: SharedDatabase, /// Write policy for validating events write_policy: Nip34WritePolicy, + /// Health tracker for relay connections + health_tracker: Arc, } impl SyncManager { @@ -41,17 +56,20 @@ impl SyncManager { /// * `relay_domain` - Our relay's domain (used to exclude self from sync) /// * `database` - Shared database for storing events and querying announcements /// * `write_policy` - Write policy for validating synced events + /// * `config` - Configuration for health tracking settings pub fn new( initial_relay_url: Option, relay_domain: String, database: SharedDatabase, write_policy: Nip34WritePolicy, + config: &Config, ) -> Self { Self { initial_relay_url, relay_domain, database, write_policy, + health_tracker: Arc::new(RelayHealthTracker::new(config)), } } @@ -68,9 +86,15 @@ impl SyncManager { relay_domain, database, write_policy, + health_tracker: Arc::new(RelayHealthTracker::with_defaults()), } } + /// Get a reference to the health tracker + pub fn health_tracker(&self) -> Arc { + self.health_tracker.clone() + } + /// Run the sync manager /// /// This discovers relays from stored announcements, spawns connection tasks, @@ -94,12 +118,14 @@ impl SyncManager { // Track active relay URLs to avoid duplicates let mut active_relays: HashSet = HashSet::new(); + // Collect all relays to connect to + let mut relays_to_connect: Vec = Vec::new(); + // Start with initial relay if configured if let Some(ref url) = self.initial_relay_url { if !self.is_own_relay(url) { - tracing::info!("Connecting to initial sync relay: {}", url); + relays_to_connect.push(url.clone()); active_relays.insert(url.clone()); - self.spawn_connection(url.clone(), tx.clone(), filter_service.clone()); } else { tracing::info!("Skipping initial relay (is our own relay): {}", url); } @@ -109,12 +135,17 @@ impl SyncManager { let discovered_urls = filter_service.discover_relay_urls().await; for url in discovered_urls { if !active_relays.contains(&url) && !self.is_own_relay(&url) { - tracing::info!("Connecting to discovered relay: {}", url); + relays_to_connect.push(url.clone()); active_relays.insert(url.clone()); - self.spawn_connection(url, tx.clone(), filter_service.clone()); } } + // Spawn connections with startup jitter to prevent thundering herd + for url in relays_to_connect { + tracing::info!("Scheduling connection to sync relay: {}", url); + self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone()); + } + if active_relays.is_empty() { tracing::warn!("No sync relays configured or discovered, SyncManager idle"); } else { @@ -133,6 +164,7 @@ impl SyncManager { if !active_relays.contains(&url) && !self.is_own_relay(&url) { tracing::info!("Discovered new relay from event, connecting: {}", url); active_relays.insert(url.clone()); + // New relays discovered during runtime don't need jitter self.spawn_connection(url, tx.clone(), filter_service.clone()); } } @@ -148,7 +180,36 @@ impl SyncManager { url.contains(&self.relay_domain) } - /// Spawn a connection task for a relay + /// Spawn a connection task for a relay with startup jitter + /// + /// Adds a random delay (0-10s) before connecting to prevent thundering herd + /// on startup when multiple relays are configured. + fn spawn_connection_with_jitter( + &self, + url: String, + tx: mpsc::Sender, + filter_service: Arc, + ) { + let domain = self.relay_domain.clone(); + let health_tracker = self.health_tracker.clone(); + + tokio::spawn(async move { + // Apply startup jitter + let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS); + tracing::debug!( + "Applying {}ms startup jitter before connecting to {}", + jitter_ms, + url + ); + tokio::time::sleep(Duration::from_millis(jitter_ms)).await; + + connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; + }); + } + + /// Spawn a connection task for a relay without jitter + /// + /// Used for relays discovered during runtime (not at startup). fn spawn_connection( &self, url: String, @@ -156,8 +217,10 @@ impl SyncManager { filter_service: Arc, ) { let domain = self.relay_domain.clone(); + let health_tracker = self.health_tracker.clone(); + tokio::spawn(async move { - connect_with_retry(&url, tx, filter_service, &domain).await; + connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; }); } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1155eaf..653aa27 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -9,12 +9,21 @@ //! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) //! - **Layer 2**: Repository events (A/a tags for shared repos) //! - **Layer 3**: Related events (E/e tags for discussions, reviews) +//! +//! ## Resilience & Health Tracking (Phase 3) +//! +//! - **Health tracking**: Per-relay connection health states (Healthy, Degraded, Dead) +//! - **Exponential backoff**: Smart retry delays on failures (5s -> 1h max) +//! - **Dead relay handling**: Minimal retry for 24h+ failed relays +//! - **Startup jitter**: Prevent thundering herd on launch (0-10s random delay) mod connection; mod filter; +pub mod health; mod manager; pub use filter::FilterService; +pub use health::{HealthState, RelayHealth, RelayHealthTracker}; pub use manager::SyncManager; use std::net::SocketAddr; -- cgit v1.2.3