From ea561062c0f08d608f48b6ccd6f8a4b8743b6e3b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 11:36:08 +0000 Subject: sync: integrate health tracking and connection storage --- src/sync/health.rs | 475 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 13 ++ 2 files changed, 488 insertions(+) create mode 100644 src/sync/health.rs (limited to 'src/sync') diff --git a/src/sync/health.rs b/src/sync/health.rs new file mode 100644 index 0000000..51bd5ae --- /dev/null +++ b/src/sync/health.rs @@ -0,0 +1,475 @@ +//! Relay Health Tracking for GRASP-02 Proactive Sync +//! +//! This module implements health tracking for relay connections, including: +//! - Health state machine (Healthy -> Degraded -> Dead) +//! - Exponential backoff with configurable max delay +//! - Dead relay detection after 24h of continuous failures +//! +//! ## Health States +//! +//! - **Healthy**: Working connection, no recent failures +//! - **Degraded**: Connection failed, retrying with backoff +//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use dashmap::DashMap; + +use crate::config::Config; + +/// Duration threshold before a relay is considered dead (24 hours) +const DEAD_THRESHOLD_HOURS: u64 = 24; + +/// How often dead relays are retried (once per 24 hours) +const DEAD_RETRY_INTERVAL_HOURS: u64 = 24; + +/// Default maximum backoff duration in seconds (1 hour) +const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; + +/// Base backoff duration in seconds +const BASE_BACKOFF_SECS: u64 = 5; + +/// Health state of a relay connection +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HealthState { + /// Working connection, no recent failures + Healthy, + /// Connection failed, retrying with exponential backoff + Degraded, + /// 24h+ of continuous failures, minimal retry + Dead, +} + +impl std::fmt::Display for HealthState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HealthState::Healthy => write!(f, "healthy"), + HealthState::Degraded => write!(f, "degraded"), + HealthState::Dead => write!(f, "dead"), + } + } +} + +/// Health information for a single relay +#[derive(Debug, Clone)] +pub struct RelayHealth { + /// Current health state + pub state: HealthState, + /// Number of consecutive connection failures + pub consecutive_failures: u32, + /// Time of the first failure in the current failure streak + pub first_failure_time: Option, + /// Time of the last failure + pub last_failure_time: Option, + /// Time of the last successful connection + pub last_success_time: Option, + /// Next time a connection attempt should be made + pub next_retry_at: Option, +} + +impl Default for RelayHealth { + fn default() -> Self { + Self { + state: HealthState::Healthy, + consecutive_failures: 0, + first_failure_time: None, + last_failure_time: None, + last_success_time: None, + next_retry_at: None, + } + } +} + +impl RelayHealth { + /// Create a new RelayHealth with healthy state + pub fn new() -> Self { + Self::default() + } +} + +/// Thread-safe relay health tracker using DashMap +#[derive(Debug)] +pub struct RelayHealthTracker { + health: DashMap, + max_backoff_secs: u64, +} + +impl RelayHealthTracker { + /// Create a new RelayHealthTracker + pub fn new(config: &Config) -> Self { + Self { + health: DashMap::new(), + max_backoff_secs: config.sync_max_backoff_secs, + } + } + + /// Create a new RelayHealthTracker with default settings + pub fn with_defaults() -> Self { + Self { + health: DashMap::new(), + max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS, + } + } + + /// Create a new RelayHealthTracker with custom max backoff + pub fn with_max_backoff(max_backoff_secs: u64) -> Self { + Self { + health: DashMap::new(), + max_backoff_secs, + } + } + + /// Record a successful connection to a relay + /// + /// Resets the relay to Healthy state and clears failure counters. + pub fn record_success(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + + let old_state = health.state; + + // Reset to healthy state + health.state = HealthState::Healthy; + health.consecutive_failures = 0; + health.first_failure_time = None; + health.last_failure_time = None; + health.last_success_time = Some(now); + health.next_retry_at = None; + + if old_state != HealthState::Healthy { + tracing::info!( + "Relay {} recovered to healthy (was {:?})", + relay_url, + old_state + ); + } + } + + /// Record a connection failure for a relay + /// + /// Increments failure counter, updates state, and calculates next retry time. + pub fn record_failure(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + + let old_state = health.state; + + // Set first_failure_time if this is a new failure streak + if health.first_failure_time.is_none() { + health.first_failure_time = Some(now); + } + + health.consecutive_failures = health.consecutive_failures.saturating_add(1); + health.last_failure_time = Some(now); + + // Check if we should transition to Dead state + if let Some(first_failure) = health.first_failure_time { + let failure_duration = now.duration_since(first_failure); + let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); + + if failure_duration >= dead_threshold { + health.state = HealthState::Dead; + // Dead relays retry once per day + health.next_retry_at = + Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); + + if old_state != HealthState::Dead { + tracing::warn!( + "Relay {} marked dead after 24h failures ({} consecutive failures)", + relay_url, + health.consecutive_failures + ); + } + } else { + // Degraded state with exponential backoff + health.state = HealthState::Degraded; + let backoff = Self::get_backoff_duration( + health.consecutive_failures, + self.max_backoff_secs, + ); + health.next_retry_at = Some(now + backoff); + + if old_state != HealthState::Degraded { + tracing::warn!( + "Relay {} degraded, backoff {:?}", + relay_url, + backoff + ); + } else { + tracing::debug!( + "Relay {} failure #{}, backoff {:?}", + relay_url, + health.consecutive_failures, + backoff + ); + } + } + } + } + + /// Check if a connection attempt should be made to a relay + /// + /// Returns true if: + /// - The relay has no health record (first attempt) + /// - The relay is healthy + /// - The backoff period has elapsed + pub fn should_attempt_connection(&self, relay_url: &str) -> bool { + let entry = self.health.get(relay_url); + + match entry { + None => true, // No record, allow first attempt + Some(entry) => { + let health = entry.value(); + + match health.state { + HealthState::Healthy => true, + HealthState::Degraded | HealthState::Dead => { + // Check if backoff period has elapsed + match health.next_retry_at { + None => true, + Some(next_retry) => Instant::now() >= next_retry, + } + } + } + } + } + } + + /// Get the current health state of a relay + pub fn get_state(&self, relay_url: &str) -> HealthState { + self.health + .get(relay_url) + .map(|entry| entry.value().state) + .unwrap_or(HealthState::Healthy) + } + + /// Check if a relay is marked as dead + pub fn is_dead(&self, relay_url: &str) -> bool { + self.get_state(relay_url) == HealthState::Dead + } + + /// Get the remaining backoff duration for a relay + /// + /// Returns None if no backoff is active. + pub fn get_remaining_backoff(&self, relay_url: &str) -> Option { + let entry = self.health.get(relay_url)?; + let health = entry.value(); + let next_retry = health.next_retry_at?; + let now = Instant::now(); + + if now >= next_retry { + None + } else { + Some(next_retry - now) + } + } + + /// Get the consecutive failure count for a relay + pub fn get_failure_count(&self, relay_url: &str) -> u32 { + self.health + .get(relay_url) + .map(|entry| entry.value().consecutive_failures) + .unwrap_or(0) + } + + /// Calculate the backoff duration based on failure count + /// + /// Uses exponential backoff: base * 2^failures, capped at max_backoff + pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration { + let backoff_secs = BASE_BACKOFF_SECS + .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1))); + Duration::from_secs(backoff_secs.min(max_backoff_secs)) + } + + /// Get all tracked relay URLs + pub fn get_tracked_relays(&self) -> Vec { + self.health.iter().map(|entry| entry.key().clone()).collect() + } + + /// Get a clone of the health info for a relay + pub fn get_health(&self, relay_url: &str) -> Option { + self.health.get(relay_url).map(|entry| entry.value().clone()) + } +} + +/// Create a shared RelayHealthTracker wrapped in Arc +pub fn create_health_tracker(config: &Config) -> Arc { + Arc::new(RelayHealthTracker::new(config)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_health_state_display() { + assert_eq!(HealthState::Healthy.to_string(), "healthy"); + assert_eq!(HealthState::Degraded.to_string(), "degraded"); + assert_eq!(HealthState::Dead.to_string(), "dead"); + } + + #[test] + fn test_default_health_is_healthy() { + let health = RelayHealth::default(); + assert_eq!(health.state, HealthState::Healthy); + assert_eq!(health.consecutive_failures, 0); + assert!(health.first_failure_time.is_none()); + } + + #[test] + fn test_should_attempt_connection_new_relay() { + let tracker = RelayHealthTracker::with_defaults(); + assert!(tracker.should_attempt_connection("wss://new-relay.example.com")); + } + + #[test] + fn test_record_success_resets_to_healthy() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Simulate a few failures + tracker.record_failure(url); + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 2); + + // Record success + tracker.record_success(url); + assert_eq!(tracker.get_state(url), HealthState::Healthy); + assert_eq!(tracker.get_failure_count(url), 0); + assert!(tracker.should_attempt_connection(url)); + } + + #[test] + fn test_backoff_increases_exponentially() { + // failure 1: 5s + assert_eq!( + RelayHealthTracker::get_backoff_duration(1, 3600), + Duration::from_secs(5) + ); + // failure 2: 10s + assert_eq!( + RelayHealthTracker::get_backoff_duration(2, 3600), + Duration::from_secs(10) + ); + // failure 3: 20s + assert_eq!( + RelayHealthTracker::get_backoff_duration(3, 3600), + Duration::from_secs(20) + ); + // failure 4: 40s + assert_eq!( + RelayHealthTracker::get_backoff_duration(4, 3600), + Duration::from_secs(40) + ); + // failure 5: 80s + assert_eq!( + RelayHealthTracker::get_backoff_duration(5, 3600), + Duration::from_secs(80) + ); + } + + #[test] + fn test_backoff_capped_at_max() { + let max_backoff = 3600u64; + // After many failures, should cap at max_backoff (1 hour) + assert_eq!( + RelayHealthTracker::get_backoff_duration(20, max_backoff), + Duration::from_secs(max_backoff) + ); + } + + #[test] + fn test_degraded_state_after_failure() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_failure(url); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + assert_eq!(tracker.get_failure_count(url), 1); + } + + #[test] + fn test_backoff_blocks_immediate_reconnection() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_failure(url); + + // Immediately after failure, should not attempt (backoff active) + assert!(!tracker.should_attempt_connection(url)); + + // Remaining backoff should be some positive duration + let remaining = tracker.get_remaining_backoff(url); + assert!(remaining.is_some()); + assert!(remaining.unwrap() > Duration::ZERO); + } + + #[test] + fn test_is_dead() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + // Initially not dead + assert!(!tracker.is_dead(url)); + + // After a failure, still not dead (just degraded) + tracker.record_failure(url); + assert!(!tracker.is_dead(url)); + assert_eq!(tracker.get_state(url), HealthState::Degraded); + } + + #[test] + fn test_get_tracked_relays() { + let tracker = RelayHealthTracker::with_defaults(); + + tracker.record_success("wss://relay1.example.com"); + tracker.record_failure("wss://relay2.example.com"); + + let tracked = tracker.get_tracked_relays(); + assert_eq!(tracked.len(), 2); + assert!(tracked.contains(&"wss://relay1.example.com".to_string())); + assert!(tracked.contains(&"wss://relay2.example.com".to_string())); + } + + #[test] + fn test_custom_max_backoff() { + let custom_max = 60u64; // 1 minute max + let tracker = RelayHealthTracker::with_max_backoff(custom_max); + let url = "wss://test-relay.example.com"; + + // Simulate many failures + for _ in 0..20 { + tracker.record_failure(url); + } + + // The remaining backoff should respect the custom max + // Note: We can't easily test the internal backoff calculation here, + // but we can verify the tracker was created with the custom setting + assert_eq!(tracker.max_backoff_secs, custom_max); + } + + #[test] + fn test_get_health_returns_clone() { + let tracker = RelayHealthTracker::with_defaults(); + let url = "wss://test-relay.example.com"; + + tracker.record_success(url); + let health = tracker.get_health(url); + + assert!(health.is_some()); + let health = health.unwrap(); + assert_eq!(health.state, HealthState::Healthy); + assert!(health.last_success_time.is_some()); + } + + #[test] + fn test_get_health_nonexistent() { + let tracker = RelayHealthTracker::with_defaults(); + let health = tracker.get_health("wss://nonexistent.example.com"); + assert!(health.is_none()); + } +} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index fb09896..9ac62ed 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -14,6 +14,7 @@ pub mod algorithms; pub mod filters; +pub mod health; pub mod relay_connection; pub mod self_subscriber; @@ -26,6 +27,9 @@ pub use relay_connection::{RelayConnection, RelayEvent}; // Re-export self-subscriber types pub use self_subscriber::{RelayAction, SelfSubscriber}; +// Re-export health tracking types +pub use health::RelayHealthTracker; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -339,6 +343,12 @@ pub struct SyncManager { relay_sync_index: RelaySyncIndex, /// In-flight subscription batches pending_sync_index: PendingSyncIndex, + /// Active relay connections - keyed by relay URL + connections: HashMap, + /// Health tracker for relay connection state + health_tracker: Arc, + /// Counter for generating unique batch IDs + next_batch_id: u64, } impl SyncManager { @@ -366,6 +376,9 @@ impl SyncManager { repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), pending_sync_index: Arc::new(RwLock::new(HashMap::new())), + connections: HashMap::new(), + health_tracker: Arc::new(RelayHealthTracker::new(config)), + next_batch_id: 0, } } -- cgit v1.2.3