From f639ecfac6687c9e8de4e3f305e168b2e4e1bb87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 17:58:31 +0000 Subject: feat(sync): Phase 3 - resilience and health tracking - Add RelayHealthTracker with DashMap - Implement exponential backoff (5s -> 1h max) - Handle dead relays (24h failures -> daily retry) - Add startup jitter to prevent thundering herd - Add NGIT_SYNC_MAX_BACKOFF_SECS config --- Cargo.toml | 3 + src/config.rs | 5 + src/http/nip11.rs | 2 + src/main.rs | 3 +- src/sync/connection.rs | 80 ++++++- src/sync/health.rs | 475 ++++++++++++++++++++++++++++++++++++ src/sync/manager.rs | 75 +++++- src/sync/mod.rs | 9 + tests/proactive_sync_resilience.rs | 476 +++++++++++++++++++++++++++++++++++++ 9 files changed, 1109 insertions(+), 19 deletions(-) create mode 100644 src/sync/health.rs create mode 100644 tests/proactive_sync_resilience.rs diff --git a/Cargo.toml b/Cargo.toml index 80fa317..911f5e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,9 @@ prometheus = "0.13" dashmap = "5" lazy_static = "1.4" +# Random (for startup jitter) +rand = "0.8" + # Serialization serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/config.rs b/src/config.rs index a2a27be..441a14d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -87,6 +87,10 @@ pub struct Config { /// URL of relay to sync kind 30617 events from (optional, enables proactive sync) #[arg(long, env = "NGIT_SYNC_RELAY_URL")] pub sync_relay_url: Option, + + /// Maximum backoff time in seconds for sync relay reconnection (default: 3600 = 1 hour) + #[arg(long, env = "NGIT_SYNC_MAX_BACKOFF_SECS", default_value_t = 3600)] + pub sync_max_backoff_secs: u64, } impl Config { @@ -143,6 +147,7 @@ impl Config { metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, sync_relay_url: None, + sync_max_backoff_secs: 3600, } } } diff --git a/src/http/nip11.rs b/src/http/nip11.rs index e9e1c25..22e5b22 100644 --- a/src/http/nip11.rs +++ b/src/http/nip11.rs @@ -106,6 +106,7 @@ mod tests { metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, sync_relay_url: None, + sync_max_backoff_secs: 3600, }; let doc = RelayInformationDocument::from_config(&config); @@ -141,6 +142,7 @@ mod tests { metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, sync_relay_url: None, + sync_max_backoff_secs: 3600, }; let doc = RelayInformationDocument::from_config(&config); diff --git a/src/main.rs b/src/main.rs index 31e7cf6..9273afd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -51,13 +51,14 @@ async fn main() -> Result<()> { config.domain ); - // Start SyncManager for proactive sync (Phase 2: multi-relay support) + // Start SyncManager for proactive sync (Phase 2: multi-relay support, Phase 3: health tracking) // Even without initial sync_relay_url, SyncManager can discover relays from stored announcements let sync_manager = SyncManager::new( config.sync_relay_url.clone(), config.domain.clone(), relay_with_db.database.clone(), relay_with_db.write_policy.clone(), + &config, ); if config.sync_relay_url.is_some() { diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 76cc8e8..319cbbd 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -9,6 +9,12 @@ //! 1. Layer 1: kinds 30617 + 30618 (announcements) //! 2. Layer 2: A/a tags for repository events //! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) +//! +//! ## Phase 3 Features +//! +//! - Health tracking with success/failure reporting +//! - Exponential backoff with health-aware delays +//! - Dead relay detection and minimal retry use std::sync::Arc; use std::time::Duration; @@ -17,6 +23,7 @@ use nostr_sdk::prelude::*; use tokio::sync::mpsc; use super::filter::FilterService; +use super::health::RelayHealthTracker; /// Event received from the sync connection #[derive(Debug, Clone)] @@ -169,47 +176,96 @@ impl SyncConnection { } } -/// Reconnect loop with exponential backoff +/// Reconnect loop with health-aware exponential backoff +/// +/// This function manages the connection lifecycle with health tracking: +/// - Checks health state before attempting connections +/// - Reports success/failure to the health tracker +/// - Respects backoff delays from the health tracker +/// - Handles dead relay detection (24h+ failures) /// /// # Arguments /// * `url` - The relay URL to connect to /// * `tx` - Channel sender for synced events /// * `filter_service` - FilterService for building subscriptions /// * `our_domain` - Our relay's domain (used to extract remote domain) +/// * `health_tracker` - Health tracker for managing connection state pub async fn connect_with_retry( url: &str, tx: mpsc::Sender, filter_service: Arc, _our_domain: &str, + health_tracker: Arc, ) { - let mut backoff = Duration::from_secs(1); - let max_backoff = Duration::from_secs(60); - // Extract remote domain from URL let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); loop { + // Check if we should attempt connection based on health state + if !health_tracker.should_attempt_connection(url) { + // Wait for remaining backoff + if let Some(remaining) = health_tracker.get_remaining_backoff(url) { + tracing::debug!( + "Relay {} in backoff, waiting {:?} before retry", + url, + remaining + ); + tokio::time::sleep(remaining).await; + continue; + } + } + + // Log current health state for dead relays + if health_tracker.is_dead(url) { + tracing::info!( + "Attempting reconnection to dead relay {} (daily retry)", + url + ); + } + match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { Ok(conn) => { - backoff = Duration::from_secs(1); // Reset backoff on successful connection + // Record successful connection + health_tracker.record_success(url); + tracing::info!("Sync connection established to {}", url); + + // Run the connection (this blocks until disconnection) conn.run(tx.clone()).await; + + // Connection ended - record as failure for reconnection backoff + // (The connection ending is considered a failure even if it worked for a while) + health_tracker.record_failure(url); tracing::warn!("Sync connection to {} ended, will reconnect", url); } Err(e) => { + // Record connection failure + health_tracker.record_failure(url); + + let failure_count = health_tracker.get_failure_count(url); + let state = health_tracker.get_state(url); + tracing::error!( - "Failed to connect to sync relay {}: {} (retrying in {:?})", + "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", url, - e, - backoff + failure_count, + state, + e ); } } - // Wait before reconnecting - tokio::time::sleep(backoff).await; + // Get the backoff duration from health tracker + // If the health tracker has no backoff set (shouldn't happen), use a small default + let wait_duration = health_tracker + .get_remaining_backoff(url) + .unwrap_or(Duration::from_secs(5)); - // Exponential backoff - backoff = std::cmp::min(backoff * 2, max_backoff); + tracing::debug!( + "Waiting {:?} before reconnecting to {}", + wait_duration, + url + ); + tokio::time::sleep(wait_duration).await; } } diff --git a/src/sync/health.rs b/src/sync/health.rs new file mode 100644 index 0000000..51bd5ae --- /dev/null +++ b/src/sync/health.rs @@ -0,0 +1,475 @@ +//! Relay Health Tracking for GRASP-02 Proactive Sync +//! +//! This module implements health tracking for relay connections, including: +//! - Health state machine (Healthy -> Degraded -> Dead) +//! - Exponential backoff with configurable max delay +//! - Dead relay detection after 24h of continuous failures +//! +//! ## Health States +//! +//! - **Healthy**: Working connection, no recent failures +//! - **Degraded**: Connection failed, retrying with backoff +//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use dashmap::DashMap; + +use crate::config::Config; + +/// Duration threshold before a relay is considered dead (24 hours) +const DEAD_THRESHOLD_HOURS: u64 = 24; + +/// How often dead relays are retried (once per 24 hours) +const DEAD_RETRY_INTERVAL_HOURS: u64 = 24; + +/// Default maximum backoff duration in seconds (1 hour) +const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; + +/// Base backoff duration in seconds +const BASE_BACKOFF_SECS: u64 = 5; + +/// Health state of a relay connection +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HealthState { + /// Working connection, no recent failures + Healthy, + /// Connection failed, retrying with exponential backoff + Degraded, + /// 24h+ of continuous failures, minimal retry + Dead, +} + +impl std::fmt::Display for HealthState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HealthState::Healthy => write!(f, "healthy"), + HealthState::Degraded => write!(f, "degraded"), + HealthState::Dead => write!(f, "dead"), + } + } +} + +/// Health information for a single relay +#[derive(Debug, Clone)] +pub struct RelayHealth { + /// Current health state + pub state: HealthState, + /// Number of consecutive connection failures + pub consecutive_failures: u32, + /// Time of the first failure in the current failure streak + pub first_failure_time: Option, + /// Time of the last failure + pub last_failure_time: Option, + /// Time of the last successful connection + pub last_success_time: Option, + /// Next time a connection attempt should be made + pub next_retry_at: Option, +} + +impl Default for RelayHealth { + fn default() -> Self { + Self { + state: HealthState::Healthy, + consecutive_failures: 0, + first_failure_time: None, + last_failure_time: None, + last_success_time: None, + next_retry_at: None, + } + } +} + +impl RelayHealth { + /// Create a new RelayHealth with healthy state + pub fn new() -> Self { + Self::default() + } +} + +/// Thread-safe relay health tracker using DashMap +#[derive(Debug)] +pub struct RelayHealthTracker { + health: DashMap, + max_backoff_secs: u64, +} + +impl RelayHealthTracker { + /// Create a new RelayHealthTracker + pub fn new(config: &Config) -> Self { + Self { + health: DashMap::new(), + max_backoff_secs: config.sync_max_backoff_secs, + } + } + + /// Create a new RelayHealthTracker with default settings + pub fn with_defaults() -> Self { + Self { + health: DashMap::new(), + max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, + } + } + + /// Create a new RelayHealthTracker with custom max backoff + pub fn with_max_backoff(max_backoff_secs: u64) -> Self { + Self { + health: DashMap::new(), + max_backoff_secs, + } + } + + /// Record a successful connection to a relay + /// + /// Resets the relay to Healthy state and clears failure counters. + pub fn record_success(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + + let old_state = health.state; + + // Reset to healthy state + health.state = HealthState::Healthy; + health.consecutive_failures = 0; + health.first_failure_time = None; + health.last_failure_time = None; + health.last_success_time = Some(now); + health.next_retry_at = None; + + if old_state != HealthState::Healthy { + tracing::info!( + "Relay {} recovered to healthy (was {:?})", + relay_url, + old_state + ); + } + } + + /// Record a connection failure for a relay + /// + /// Increments failure counter, updates state, and calculates next retry time. + pub fn record_failure(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + + let old_state = health.state; + + // Set first_failure_time if this is a new failure streak + if health.first_failure_time.is_none() { + health.first_failure_time = Some(now); + } + + health.consecutive_failures = health.consecutive_failures.saturating_add(1); + health.last_failure_time = Some(now); + + // Check if we should transition to Dead state + if let Some(first_failure) = health.first_failure_time { + let failure_duration = now.duration_since(first_failure); + let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); + + if failure_duration >= dead_threshold { + health.state = HealthState::Dead; + // Dead relays retry once per day + health.next_retry_at = + Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); + + if old_state != HealthState::Dead { + tracing::warn!( + "Relay {} marked dead after 24h failures ({} consecutive failures)", + relay_url, + health.consecutive_failures + ); + } + } else { + // Degraded state with exponential backoff + health.state = HealthState::Degraded; + let backoff = Self::get_backoff_duration( + health.consecutive_failures, + self.max_backoff_secs, + ); + health.next_retry_at = Some(now + backoff); + + if old_state != HealthState::Degraded { + tracing::warn!( + "Relay {} degraded, backoff {:?}", + relay_url, + backoff + ); + } else { + tracing::debug!( + "Relay {} failure #{}, backoff {:?}", + relay_url, + health.consecutive_failures, + backoff + ); + } + } + } + } + + /// Check if a connection attempt should be made to a relay + /// + /// Returns true if: + /// - The relay has no health record (first attempt) + /// - The relay is healthy + /// - The backoff period has elapsed + pub fn should_attempt_connection(&self, relay_url: &str) -> bool { + let entry = self.health.get(relay_url); + + match entry { + None => true, // No record, allow first attempt + Some(entry) => { + let health = entry.value(); + + match health.state { + HealthState::Healthy => true, + HealthState::Degraded | HealthState::Dead => { + // Check if backoff period has elapsed + match health.next_retry_at { + None => true, + Some(next_retry) => Instant::now() >= next_retry, + } + } + } + } + } + } + + /// Get the current health state of a relay + pub fn get_state(&self, relay_url: &str) -> HealthState { + self.health + .get(relay_url) + .map(|entry| entry.value().state) + .unwrap_or(HealthState::Healthy) + } + + /// Check if a relay is marked as dead + pub fn is_dead(&self, relay_url: &str) -> bool { + self.get_state(relay_url) == HealthState::Dead + } + + /// Get the remaining backoff duration for a relay + /// + /// Returns None if no backoff is active. + pub fn get_remaining_backoff(&self, relay_url: &str) -> Option { + let entry = self.health.get(relay_url)?; + let health = entry.value(); + let next_retry = health.next_retry_at?; + let now = Instant::now(); + + if now >= next_retry { + None + } else { + Some(next_retry - now) + } + } + + /// Get the consecutive failure count for a relay + pub fn get_failure_count(&self, relay_url: &str) -> u32 { + self.health + .get(relay_url) + .map(|entry| entry.value().consecutive_failures) + .unwrap_or(0) + } + + /// Calculate the backoff duration based on failure count + /// + /// Uses exponential backoff: base * 2^failures, capped at max_backoff + pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration { + let backoff_secs = BASE_BACKOFF_SECS + .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); + Duration::from_secs(backoff_secs.min(max_backoff_secs)) + } + + /// Get all tracked relay URLs + pub fn get_tracked_relays(&self) -> Vec { + self.health.iter().map(|entry| entry.key().clone()).collect() + } + + /// Get a clone of the health info for a relay + pub fn get_health(&self, relay_url: &str) -> Option { + self.health.get(relay_url).map(|entry| entry.value().clone()) + } +} + +/// Create a shared RelayHealthTracker wrapped in Arc +pub fn create_health_tracker(config: &Config) -> Arc { + Arc::new(RelayHealthTracker::new(config)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_health_state_display() { + assert_eq!(HealthState::Healthy.to_string(), "healthy"); + assert_eq!(HealthState::Degraded.to_string(), "degraded"); + assert_eq!(HealthState::Dead.to_string(), "dead"); + } + + #[test] + fn test_default_health_is_healthy() { + let health = RelayHealth::default(); + assert_eq!(health.state, HealthState::Healthy); + assert_eq!(health.consecutive_failures, 0); + assert!(health.first_failure_time.is_none()); + } + + #[test] + fn test_should_attempt_connection_new_relay() { + let tracker = RelayHealthTracker::with_defaults(); + assert!(tracker.should_attempt_connection("wss://new-relay.example.com")); + } + + #[test] + fn test_record_success_resets_to_healthy() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Simulate a few failures + tracker.record_failure(url); + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 2); + + // Record success + tracker.record_success(url); + assert_eq!(tracker.get_state(url), HealthState::Healthy); + assert_eq!(tracker.get_failure_count(url), 0); + assert!(tracker.should_attempt_connection(url)); + } + + #[test] + fn test_backoff_increases_exponentially() { + // failure 1: 5s + assert_eq!( + RelayHealthTracker::get_backoff_duration(1, 3600), + Duration::from_secs(5) + ); + // failure 2: 10s + assert_eq!( + RelayHealthTracker::get_backoff_duration(2, 3600), + Duration::from_secs(10) + ); + // failure 3: 20s + assert_eq!( + RelayHealthTracker::get_backoff_duration(3, 3600), + Duration::from_secs(20) + ); + // failure 4: 40s + assert_eq!( + RelayHealthTracker::get_backoff_duration(4, 3600), + Duration::from_secs(40) + ); + // failure 5: 80s + assert_eq!( + RelayHealthTracker::get_backoff_duration(5, 3600), + Duration::from_secs(80) + ); + } + + #[test] + fn test_backoff_capped_at_max() { + let max_backoff = 3600u64; + // After many failures, should cap at max_backoff (1 hour) + assert_eq!( + RelayHealthTracker::get_backoff_duration(20, max_backoff), + Duration::from_secs(max_backoff) + ); + } + + #[test] + fn test_degraded_state_after_failure() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 1); + } + + #[test] + fn test_backoff_blocks_immediate_reconnection() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_failure(url); + + // Immediately after failure, should not attempt (backoff active) + assert!(!tracker.should_attempt_connection(url)); + + // Remaining backoff should be some positive duration + let remaining = tracker.get_remaining_backoff(url); + assert!(remaining.is_some()); + assert!(remaining.unwrap() > Duration::ZERO); + } + + #[test] + fn test_is_dead() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Initially not dead + assert!(!tracker.is_dead(url)); + + // After a failure, still not dead (just degraded) + tracker.record_failure(url); + assert!(!tracker.is_dead(url)); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + } + + #[test] + fn test_get_tracked_relays() { + let tracker = RelayHealthTracker::with_defaults(); + + tracker.record_success("wss://relay1.example.com"); + tracker.record_failure("wss://relay2.example.com"); + + let tracked = tracker.get_tracked_relays(); + assert_eq!(tracked.len(), 2); + assert!(tracked.contains(&"wss://relay1.example.com".to_string())); + assert!(tracked.contains(&"wss://relay2.example.com".to_string())); + } + + #[test] + fn test_custom_max_backoff() { + let custom_max = 60u64; // 1 minute max + let tracker = RelayHealthTracker::with_max_backoff(custom_max); + let url = "wss://test-relay.example.com"; + + // Simulate many failures + for _ in 0..20 { + tracker.record_failure(url); + } + + // The remaining backoff should respect the custom max + // Note: We can't easily test the internal backoff calculation here, + // but we can verify the tracker was created with the custom setting + assert_eq!(tracker.max_backoff_secs, custom_max); + } + + #[test] + fn test_get_health_returns_clone() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_success(url); + let health = tracker.get_health(url); + + assert!(health.is_some()); + let health = health.unwrap(); + assert_eq!(health.state, HealthState::Healthy); + assert!(health.last_success_time.is_some()); + } + + #[test] + fn test_get_health_nonexistent() { + let tracker = RelayHealthTracker::with_defaults(); + let health = tracker.get_health("wss://nonexistent.example.com"); + assert!(health.is_none()); + } +} \ No newline at end of file diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 8f6a9bd..1f70f42 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -9,18 +9,31 @@ //! - Relay discovery from stored kind 30617 announcements //! - Multiple simultaneous relay connections //! - Three-layer filter strategy via FilterService +//! +//! ## Phase 3 Features +//! +//! - Health tracking with exponential backoff +//! - Dead relay detection after 24h of failures +//! - Startup jitter to prevent thundering herd use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use nostr_relay_builder::prelude::*; +use rand::Rng; use tokio::sync::mpsc; use super::connection::{connect_with_retry, SyncedEvent}; use super::filter::FilterService; +use super::health::RelayHealthTracker; use super::SYNC_SOURCE_ADDR; +use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; +/// Maximum startup jitter in milliseconds (10 seconds) +const MAX_STARTUP_JITTER_MS: u64 = 10_000; + /// Coordinates proactive sync from configured and discovered relays pub struct SyncManager { /// Initial relay URL to sync from (from config) @@ -31,6 +44,8 @@ pub struct SyncManager { database: SharedDatabase, /// Write policy for validating events write_policy: Nip34WritePolicy, + /// Health tracker for relay connections + health_tracker: Arc, } impl SyncManager { @@ -41,17 +56,20 @@ impl SyncManager { /// * `relay_domain` - Our relay's domain (used to exclude self from sync) /// * `database` - Shared database for storing events and querying announcements /// * `write_policy` - Write policy for validating synced events + /// * `config` - Configuration for health tracking settings pub fn new( initial_relay_url: Option, relay_domain: String, database: SharedDatabase, write_policy: Nip34WritePolicy, + config: &Config, ) -> Self { Self { initial_relay_url, relay_domain, database, write_policy, + health_tracker: Arc::new(RelayHealthTracker::new(config)), } } @@ -68,9 +86,15 @@ impl SyncManager { relay_domain, database, write_policy, + health_tracker: Arc::new(RelayHealthTracker::with_defaults()), } } + /// Get a reference to the health tracker + pub fn health_tracker(&self) -> Arc { + self.health_tracker.clone() + } + /// Run the sync manager /// /// This discovers relays from stored announcements, spawns connection tasks, @@ -94,12 +118,14 @@ impl SyncManager { // Track active relay URLs to avoid duplicates let mut active_relays: HashSet = HashSet::new(); + // Collect all relays to connect to + let mut relays_to_connect: Vec = Vec::new(); + // Start with initial relay if configured if let Some(ref url) = self.initial_relay_url { if !self.is_own_relay(url) { - tracing::info!("Connecting to initial sync relay: {}", url); + relays_to_connect.push(url.clone()); active_relays.insert(url.clone()); - self.spawn_connection(url.clone(), tx.clone(), filter_service.clone()); } else { tracing::info!("Skipping initial relay (is our own relay): {}", url); } @@ -109,12 +135,17 @@ impl SyncManager { let discovered_urls = filter_service.discover_relay_urls().await; for url in discovered_urls { if !active_relays.contains(&url) && !self.is_own_relay(&url) { - tracing::info!("Connecting to discovered relay: {}", url); + relays_to_connect.push(url.clone()); active_relays.insert(url.clone()); - self.spawn_connection(url, tx.clone(), filter_service.clone()); } } + // Spawn connections with startup jitter to prevent thundering herd + for url in relays_to_connect { + tracing::info!("Scheduling connection to sync relay: {}", url); + self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone()); + } + if active_relays.is_empty() { tracing::warn!("No sync relays configured or discovered, SyncManager idle"); } else { @@ -133,6 +164,7 @@ impl SyncManager { if !active_relays.contains(&url) && !self.is_own_relay(&url) { tracing::info!("Discovered new relay from event, connecting: {}", url); active_relays.insert(url.clone()); + // New relays discovered during runtime don't need jitter self.spawn_connection(url, tx.clone(), filter_service.clone()); } } @@ -148,7 +180,36 @@ impl SyncManager { url.contains(&self.relay_domain) } - /// Spawn a connection task for a relay + /// Spawn a connection task for a relay with startup jitter + /// + /// Adds a random delay (0-10s) before connecting to prevent thundering herd + /// on startup when multiple relays are configured. + fn spawn_connection_with_jitter( + &self, + url: String, + tx: mpsc::Sender, + filter_service: Arc, + ) { + let domain = self.relay_domain.clone(); + let health_tracker = self.health_tracker.clone(); + + tokio::spawn(async move { + // Apply startup jitter + let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS); + tracing::debug!( + "Applying {}ms startup jitter before connecting to {}", + jitter_ms, + url + ); + tokio::time::sleep(Duration::from_millis(jitter_ms)).await; + + connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; + }); + } + + /// Spawn a connection task for a relay without jitter + /// + /// Used for relays discovered during runtime (not at startup). fn spawn_connection( &self, url: String, @@ -156,8 +217,10 @@ impl SyncManager { filter_service: Arc, ) { let domain = self.relay_domain.clone(); + let health_tracker = self.health_tracker.clone(); + tokio::spawn(async move { - connect_with_retry(&url, tx, filter_service, &domain).await; + connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; }); } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1155eaf..653aa27 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -9,12 +9,21 @@ //! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) //! - **Layer 2**: Repository events (A/a tags for shared repos) //! - **Layer 3**: Related events (E/e tags for discussions, reviews) +//! +//! ## Resilience & Health Tracking (Phase 3) +//! +//! - **Health tracking**: Per-relay connection health states (Healthy, Degraded, Dead) +//! - **Exponential backoff**: Smart retry delays on failures (5s -> 1h max) +//! - **Dead relay handling**: Minimal retry for 24h+ failed relays +//! - **Startup jitter**: Prevent thundering herd on launch (0-10s random delay) mod connection; mod filter; +pub mod health; mod manager; pub use filter::FilterService; +pub use health::{HealthState, RelayHealth, RelayHealthTracker}; pub use manager::SyncManager; use std::net::SocketAddr; diff --git a/tests/proactive_sync_resilience.rs b/tests/proactive_sync_resilience.rs new file mode 100644 index 0000000..60b18dd --- /dev/null +++ b/tests/proactive_sync_resilience.rs @@ -0,0 +1,476 @@ +//! Integration tests for GRASP-02 Phase 3: Resilience & Health Tracking +//! +//! Tests verify: +//! - Exponential backoff on connection failures (5s → 1h max) +//! - Dead relay detection after 24h of failures +//! - Successful connection resets to Healthy +//! - Dead relays retry minimally (once per day) +//! - Health state tracking is thread-safe + +use std::time::{Duration, Instant}; + +use ngit_grasp::sync::health::{HealthState, RelayHealthTracker}; + +/// Test that a single failure transitions relay to Degraded state +#[test] +fn test_single_failure_causes_degraded_state() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Initial state should allow connection + assert!(tracker.should_attempt_connection(url)); + + // Record a failure + tracker.record_failure(url); + + // Should be in degraded state + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 1); +} + +/// Test that successful connection resets to Healthy state +#[test] +fn test_success_resets_to_healthy() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Simulate multiple failures + tracker.record_failure(url); + tracker.record_failure(url); + tracker.record_failure(url); + + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 3); + + // Success should reset everything + tracker.record_success(url); + + assert_eq!(tracker.get_state(url), HealthState::Healthy); + assert_eq!(tracker.get_failure_count(url), 0); + assert!(tracker.should_attempt_connection(url)); +} + +/// Test that backoff increases exponentially +#[test] +fn test_exponential_backoff_calculation() { + let max_backoff = 3600u64; // 1 hour + + // failure 1: 5s (5 * 2^0) + assert_eq!( + RelayHealthTracker::get_backoff_duration(1, max_backoff), + Duration::from_secs(5) + ); + + // failure 2: 10s (5 * 2^1) + assert_eq!( + RelayHealthTracker::get_backoff_duration(2, max_backoff), + Duration::from_secs(10) + ); + + // failure 3: 20s (5 * 2^2) + assert_eq!( + RelayHealthTracker::get_backoff_duration(3, max_backoff), + Duration::from_secs(20) + ); + + // failure 4: 40s (5 * 2^3) + assert_eq!( + RelayHealthTracker::get_backoff_duration(4, max_backoff), + Duration::from_secs(40) + ); + + // failure 5: 80s (5 * 2^4) + assert_eq!( + RelayHealthTracker::get_backoff_duration(5, max_backoff), + Duration::from_secs(80) + ); + + // failure 6: 160s (5 * 2^5) + assert_eq!( + RelayHealthTracker::get_backoff_duration(6, max_backoff), + Duration::from_secs(160) + ); + + // failure 7: 320s (5 * 2^6) + assert_eq!( + RelayHealthTracker::get_backoff_duration(7, max_backoff), + Duration::from_secs(320) + ); + + // failure 8: 640s (5 * 2^7) + assert_eq!( + RelayHealthTracker::get_backoff_duration(8, max_backoff), + Duration::from_secs(640) + ); + + // failure 9: 1280s (5 * 2^8) + assert_eq!( + RelayHealthTracker::get_backoff_duration(9, max_backoff), + Duration::from_secs(1280) + ); + + // failure 10: 2560s (5 * 2^9) + assert_eq!( + RelayHealthTracker::get_backoff_duration(10, max_backoff), + Duration::from_secs(2560) + ); +} + +/// Test that backoff is capped at max_backoff +#[test] +fn test_backoff_capped_at_maximum() { + let max_backoff = 3600u64; // 1 hour + + // After many failures, should cap at max_backoff + assert_eq!( + RelayHealthTracker::get_backoff_duration(15, max_backoff), + Duration::from_secs(max_backoff) + ); + + assert_eq!( + RelayHealthTracker::get_backoff_duration(20, max_backoff), + Duration::from_secs(max_backoff) + ); + + assert_eq!( + RelayHealthTracker::get_backoff_duration(100, max_backoff), + Duration::from_secs(max_backoff) + ); +} + +/// Test that custom max_backoff is respected +#[test] +fn test_custom_max_backoff() { + let custom_max = 60u64; // 1 minute max + + // After several failures, should cap at custom max + assert_eq!( + RelayHealthTracker::get_backoff_duration(10, custom_max), + Duration::from_secs(custom_max) + ); + + // Tracker with custom max should use it + let tracker = RelayHealthTracker::with_max_backoff(custom_max); + let url = "wss://test-relay.example.com"; + + // Simulate many failures + for _ in 0..20 { + tracker.record_failure(url); + } + + // Should still be degraded (not dead without 24h) + assert_eq!(tracker.get_state(url), HealthState::Degraded); +} + +/// Test that backoff blocks immediate reconnection +#[test] +fn test_backoff_blocks_immediate_reconnection() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // First connection attempt should be allowed + assert!(tracker.should_attempt_connection(url)); + + // Record a failure + tracker.record_failure(url); + + // Immediately after failure, connection should be blocked (backoff active) + assert!(!tracker.should_attempt_connection(url)); + + // Should have remaining backoff + let remaining = tracker.get_remaining_backoff(url); + assert!(remaining.is_some()); + assert!(remaining.unwrap() > Duration::ZERO); +} + +/// Test that multiple relays are tracked independently +#[test] +fn test_multiple_relays_independent() { + let tracker = RelayHealthTracker::with_defaults(); + let url1 = "wss://relay1.example.com"; + let url2 = "wss://relay2.example.com"; + let url3 = "wss://relay3.example.com"; + + // Fail relay1 multiple times + tracker.record_failure(url1); + tracker.record_failure(url1); + tracker.record_failure(url1); + + // Succeed on relay2 + tracker.record_success(url2); + + // Fail relay3 once + tracker.record_failure(url3); + + // Verify independent states + assert_eq!(tracker.get_state(url1), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url1), 3); + + assert_eq!(tracker.get_state(url2), HealthState::Healthy); + assert_eq!(tracker.get_failure_count(url2), 0); + + assert_eq!(tracker.get_state(url3), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url3), 1); +} + +/// Test is_dead returns false for degraded relays +#[test] +fn test_is_dead_false_for_degraded() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Simulate failures + for _ in 0..10 { + tracker.record_failure(url); + } + + // Should be degraded but not dead (24h hasn't passed) + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert!(!tracker.is_dead(url)); +} + +/// Test get_tracked_relays returns all tracked URLs +#[test] +fn test_get_tracked_relays() { + let tracker = RelayHealthTracker::with_defaults(); + + // Track multiple relays + tracker.record_success("wss://relay1.example.com"); + tracker.record_failure("wss://relay2.example.com"); + tracker.record_success("wss://relay3.example.com"); + + let tracked = tracker.get_tracked_relays(); + assert_eq!(tracked.len(), 3); + assert!(tracked.contains(&"wss://relay1.example.com".to_string())); + assert!(tracked.contains(&"wss://relay2.example.com".to_string())); + assert!(tracked.contains(&"wss://relay3.example.com".to_string())); +} + +/// Test get_health returns cloned health info +#[test] +fn test_get_health_returns_clone() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Record success + tracker.record_success(url); + + // Get health info + let health = tracker.get_health(url); + assert!(health.is_some()); + + let health = health.unwrap(); + assert_eq!(health.state, HealthState::Healthy); + assert!(health.last_success_time.is_some()); + assert_eq!(health.consecutive_failures, 0); +} + +/// Test get_health returns None for non-existent relay +#[test] +fn test_get_health_nonexistent() { + let tracker = RelayHealthTracker::with_defaults(); + + let health = tracker.get_health("wss://nonexistent.example.com"); + assert!(health.is_none()); +} + +/// Test that new relays default to allowing connection +#[test] +fn test_new_relay_allows_connection() { + let tracker = RelayHealthTracker::with_defaults(); + + // A never-seen relay should allow connection + assert!(tracker.should_attempt_connection("wss://brand-new-relay.example.com")); +} + +/// Test health state display +#[test] +fn test_health_state_display() { + assert_eq!(HealthState::Healthy.to_string(), "healthy"); + assert_eq!(HealthState::Degraded.to_string(), "degraded"); + assert_eq!(HealthState::Dead.to_string(), "dead"); +} + +/// Test thread safety with concurrent access +#[tokio::test] +async fn test_concurrent_health_tracking() { + use std::sync::Arc; + + let tracker = Arc::new(RelayHealthTracker::with_defaults()); + let url = "wss://concurrent-test-relay.example.com"; + + // Spawn multiple tasks that access the tracker concurrently + let mut handles = vec![]; + + for i in 0..10 { + let tracker_clone = tracker.clone(); + let url_owned = url.to_string(); + let handle = tokio::spawn(async move { + if i % 2 == 0 { + tracker_clone.record_failure(&url_owned); + } else { + tracker_clone.record_success(&url_owned); + } + tracker_clone.get_state(&url_owned); + tracker_clone.should_attempt_connection(&url_owned); + }); + handles.push(handle); + } + + // Wait for all tasks + for handle in handles { + handle.await.unwrap(); + } + + // Tracker should still be usable + let health = tracker.get_health(url); + assert!(health.is_some()); +} + +/// Test that failure streak tracking works correctly +#[test] +fn test_failure_streak_tracking() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Build up a failure streak + for i in 1..=5 { + tracker.record_failure(url); + assert_eq!(tracker.get_failure_count(url), i); + } + + // Success should reset the streak + tracker.record_success(url); + assert_eq!(tracker.get_failure_count(url), 0); + + // Start a new streak + tracker.record_failure(url); + assert_eq!(tracker.get_failure_count(url), 1); +} + +/// Test recovery from degraded state +#[test] +fn test_recovery_from_degraded() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Enter degraded state + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + + // Recover + tracker.record_success(url); + assert_eq!(tracker.get_state(url), HealthState::Healthy); + assert!(tracker.should_attempt_connection(url)); + assert!(tracker.get_remaining_backoff(url).is_none()); +} + +/// Test that remaining backoff is None after success +#[test] +fn test_no_remaining_backoff_after_success() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Fail to set backoff + tracker.record_failure(url); + assert!(tracker.get_remaining_backoff(url).is_some()); + + // Succeed to clear backoff + tracker.record_success(url); + assert!(tracker.get_remaining_backoff(url).is_none()); +} + +/// Integration test: simulate a realistic connection lifecycle +#[test] +fn test_realistic_connection_lifecycle() { + let tracker = RelayHealthTracker::with_max_backoff(60); // 1 minute max for test + let url = "wss://production-relay.example.com"; + + // Initial connection succeeds + tracker.record_success(url); + assert_eq!(tracker.get_state(url), HealthState::Healthy); + + // Connection drops - first failure + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 1); + + // Second failure (retry failed) + tracker.record_failure(url); + assert_eq!(tracker.get_failure_count(url), 2); + + // Third failure + tracker.record_failure(url); + assert_eq!(tracker.get_failure_count(url), 3); + + // Connection finally succeeds + tracker.record_success(url); + assert_eq!(tracker.get_state(url), HealthState::Healthy); + assert_eq!(tracker.get_failure_count(url), 0); + assert!(tracker.should_attempt_connection(url)); +} + +/// Test backoff timing sequence +#[test] +fn test_backoff_timing_sequence() { + // With default max of 3600s (1 hour), verify the progression + let max = 3600u64; + + let expected = vec![ + (1, 5), // 5s + (2, 10), // 10s + (3, 20), // 20s + (4, 40), // 40s + (5, 80), // 80s + (6, 160), // 160s (~2.7 min) + (7, 320), // 320s (~5.3 min) + (8, 640), // 640s (~10.7 min) + (9, 1280), // 1280s (~21.3 min) + (10, 2560), // 2560s (~42.7 min) + (11, 3600), // capped at 3600s (1 hour) + (12, 3600), // still capped + ]; + + for (failures, expected_secs) in expected { + assert_eq!( + RelayHealthTracker::get_backoff_duration(failures, max), + Duration::from_secs(expected_secs), + "Failed for {} failures", + failures + ); + } +} + +/// Test that health info timestamp tracking works +#[test] +fn test_timestamp_tracking() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Record initial success + let before = Instant::now(); + tracker.record_success(url); + let after = Instant::now(); + + let health = tracker.get_health(url).unwrap(); + let success_time = health.last_success_time.unwrap(); + + // Success time should be between before and after + assert!(success_time >= before); + assert!(success_time <= after); + + // Record failure + let before_fail = Instant::now(); + tracker.record_failure(url); + let after_fail = Instant::now(); + + let health = tracker.get_health(url).unwrap(); + let failure_time = health.last_failure_time.unwrap(); + let first_failure = health.first_failure_time.unwrap(); + + // Failure times should be between before and after + assert!(failure_time >= before_fail); + assert!(failure_time <= after_fail); + assert!(first_failure >= before_fail); + assert!(first_failure <= after_fail); +} \ No newline at end of file -- cgit v1.2.3