diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:58:31 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:58:31 +0000 |
| commit | f639ecfac6687c9e8de4e3f305e168b2e4e1bb87 (patch) | |
| tree | cfcbf16a937a59048930ccaf8557f78ed5576bde /src/sync/manager.rs | |
| parent | bf558b0dc17e14f96eea624ea5591315a2909154 (diff) | |
feat(sync): Phase 3 - resilience and health tracking
- Add RelayHealthTracker with DashMap
- Implement exponential backoff (5s -> 1h max)
- Handle dead relays (24h failures -> daily retry)
- Add startup jitter to prevent thundering herd
- Add NGIT_SYNC_MAX_BACKOFF_SECS config
Diffstat (limited to 'src/sync/manager.rs')
| -rw-r--r-- | src/sync/manager.rs | 75 |
1 files changed, 69 insertions, 6 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 8f6a9bd..1f70f42 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs | |||
| @@ -9,18 +9,31 @@ | |||
| 9 | //! - Relay discovery from stored kind 30617 announcements | 9 | //! - Relay discovery from stored kind 30617 announcements |
| 10 | //! - Multiple simultaneous relay connections | 10 | //! - Multiple simultaneous relay connections |
| 11 | //! - Three-layer filter strategy via FilterService | 11 | //! - Three-layer filter strategy via FilterService |
| 12 | //! | ||
| 13 | //! ## Phase 3 Features | ||
| 14 | //! | ||
| 15 | //! - Health tracking with exponential backoff | ||
| 16 | //! - Dead relay detection after 24h of failures | ||
| 17 | //! - Startup jitter to prevent thundering herd | ||
| 12 | 18 | ||
| 13 | use std::collections::HashSet; | 19 | use std::collections::HashSet; |
| 14 | use std::sync::Arc; | 20 | use std::sync::Arc; |
| 21 | use std::time::Duration; | ||
| 15 | 22 | ||
| 16 | use nostr_relay_builder::prelude::*; | 23 | use nostr_relay_builder::prelude::*; |
| 24 | use rand::Rng; | ||
| 17 | use tokio::sync::mpsc; | 25 | use tokio::sync::mpsc; |
| 18 | 26 | ||
| 19 | use super::connection::{connect_with_retry, SyncedEvent}; | 27 | use super::connection::{connect_with_retry, SyncedEvent}; |
| 20 | use super::filter::FilterService; | 28 | use super::filter::FilterService; |
| 29 | use super::health::RelayHealthTracker; | ||
| 21 | use super::SYNC_SOURCE_ADDR; | 30 | use super::SYNC_SOURCE_ADDR; |
| 31 | use crate::config::Config; | ||
| 22 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 32 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 23 | 33 | ||
| 34 | /// Maximum startup jitter in milliseconds (10 seconds) | ||
| 35 | const MAX_STARTUP_JITTER_MS: u64 = 10_000; | ||
| 36 | |||
| 24 | /// Coordinates proactive sync from configured and discovered relays | 37 | /// Coordinates proactive sync from configured and discovered relays |
| 25 | pub struct SyncManager { | 38 | pub struct SyncManager { |
| 26 | /// Initial relay URL to sync from (from config) | 39 | /// Initial relay URL to sync from (from config) |
| @@ -31,6 +44,8 @@ pub struct SyncManager { | |||
| 31 | database: SharedDatabase, | 44 | database: SharedDatabase, |
| 32 | /// Write policy for validating events | 45 | /// Write policy for validating events |
| 33 | write_policy: Nip34WritePolicy, | 46 | write_policy: Nip34WritePolicy, |
| 47 | /// Health tracker for relay connections | ||
| 48 | health_tracker: Arc<RelayHealthTracker>, | ||
| 34 | } | 49 | } |
| 35 | 50 | ||
| 36 | impl SyncManager { | 51 | impl SyncManager { |
| @@ -41,17 +56,20 @@ impl SyncManager { | |||
| 41 | /// * `relay_domain` - Our relay's domain (used to exclude self from sync) | 56 | /// * `relay_domain` - Our relay's domain (used to exclude self from sync) |
| 42 | /// * `database` - Shared database for storing events and querying announcements | 57 | /// * `database` - Shared database for storing events and querying announcements |
| 43 | /// * `write_policy` - Write policy for validating synced events | 58 | /// * `write_policy` - Write policy for validating synced events |
| 59 | /// * `config` - Configuration for health tracking settings | ||
| 44 | pub fn new( | 60 | pub fn new( |
| 45 | initial_relay_url: Option<String>, | 61 | initial_relay_url: Option<String>, |
| 46 | relay_domain: String, | 62 | relay_domain: String, |
| 47 | database: SharedDatabase, | 63 | database: SharedDatabase, |
| 48 | write_policy: Nip34WritePolicy, | 64 | write_policy: Nip34WritePolicy, |
| 65 | config: &Config, | ||
| 49 | ) -> Self { | 66 | ) -> Self { |
| 50 | Self { | 67 | Self { |
| 51 | initial_relay_url, | 68 | initial_relay_url, |
| 52 | relay_domain, | 69 | relay_domain, |
| 53 | database, | 70 | database, |
| 54 | write_policy, | 71 | write_policy, |
| 72 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | ||
| 55 | } | 73 | } |
| 56 | } | 74 | } |
| 57 | 75 | ||
| @@ -68,9 +86,15 @@ impl SyncManager { | |||
| 68 | relay_domain, | 86 | relay_domain, |
| 69 | database, | 87 | database, |
| 70 | write_policy, | 88 | write_policy, |
| 89 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), | ||
| 71 | } | 90 | } |
| 72 | } | 91 | } |
| 73 | 92 | ||
| 93 | /// Get a reference to the health tracker | ||
| 94 | pub fn health_tracker(&self) -> Arc<RelayHealthTracker> { | ||
| 95 | self.health_tracker.clone() | ||
| 96 | } | ||
| 97 | |||
| 74 | /// Run the sync manager | 98 | /// Run the sync manager |
| 75 | /// | 99 | /// |
| 76 | /// This discovers relays from stored announcements, spawns connection tasks, | 100 | /// This discovers relays from stored announcements, spawns connection tasks, |
| @@ -94,12 +118,14 @@ impl SyncManager { | |||
| 94 | // Track active relay URLs to avoid duplicates | 118 | // Track active relay URLs to avoid duplicates |
| 95 | let mut active_relays: HashSet<String> = HashSet::new(); | 119 | let mut active_relays: HashSet<String> = HashSet::new(); |
| 96 | 120 | ||
| 121 | // Collect all relays to connect to | ||
| 122 | let mut relays_to_connect: Vec<String> = Vec::new(); | ||
| 123 | |||
| 97 | // Start with initial relay if configured | 124 | // Start with initial relay if configured |
| 98 | if let Some(ref url) = self.initial_relay_url { | 125 | if let Some(ref url) = self.initial_relay_url { |
| 99 | if !self.is_own_relay(url) { | 126 | if !self.is_own_relay(url) { |
| 100 | tracing::info!("Connecting to initial sync relay: {}", url); | 127 | relays_to_connect.push(url.clone()); |
| 101 | active_relays.insert(url.clone()); | 128 | active_relays.insert(url.clone()); |
| 102 | self.spawn_connection(url.clone(), tx.clone(), filter_service.clone()); | ||
| 103 | } else { | 129 | } else { |
| 104 | tracing::info!("Skipping initial relay (is our own relay): {}", url); | 130 | tracing::info!("Skipping initial relay (is our own relay): {}", url); |
| 105 | } | 131 | } |
| @@ -109,12 +135,17 @@ impl SyncManager { | |||
| 109 | let discovered_urls = filter_service.discover_relay_urls().await; | 135 | let discovered_urls = filter_service.discover_relay_urls().await; |
| 110 | for url in discovered_urls { | 136 | for url in discovered_urls { |
| 111 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { | 137 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { |
| 112 | tracing::info!("Connecting to discovered relay: {}", url); | 138 | relays_to_connect.push(url.clone()); |
| 113 | active_relays.insert(url.clone()); | 139 | active_relays.insert(url.clone()); |
| 114 | self.spawn_connection(url, tx.clone(), filter_service.clone()); | ||
| 115 | } | 140 | } |
| 116 | } | 141 | } |
| 117 | 142 | ||
| 143 | // Spawn connections with startup jitter to prevent thundering herd | ||
| 144 | for url in relays_to_connect { | ||
| 145 | tracing::info!("Scheduling connection to sync relay: {}", url); | ||
| 146 | self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone()); | ||
| 147 | } | ||
| 148 | |||
| 118 | if active_relays.is_empty() { | 149 | if active_relays.is_empty() { |
| 119 | tracing::warn!("No sync relays configured or discovered, SyncManager idle"); | 150 | tracing::warn!("No sync relays configured or discovered, SyncManager idle"); |
| 120 | } else { | 151 | } else { |
| @@ -133,6 +164,7 @@ impl SyncManager { | |||
| 133 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { | 164 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { |
| 134 | tracing::info!("Discovered new relay from event, connecting: {}", url); | 165 | tracing::info!("Discovered new relay from event, connecting: {}", url); |
| 135 | active_relays.insert(url.clone()); | 166 | active_relays.insert(url.clone()); |
| 167 | // New relays discovered during runtime don't need jitter | ||
| 136 | self.spawn_connection(url, tx.clone(), filter_service.clone()); | 168 | self.spawn_connection(url, tx.clone(), filter_service.clone()); |
| 137 | } | 169 | } |
| 138 | } | 170 | } |
| @@ -148,7 +180,36 @@ impl SyncManager { | |||
| 148 | url.contains(&self.relay_domain) | 180 | url.contains(&self.relay_domain) |
| 149 | } | 181 | } |
| 150 | 182 | ||
| 151 | /// Spawn a connection task for a relay | 183 | /// Spawn a connection task for a relay with startup jitter |
| 184 | /// | ||
| 185 | /// Adds a random delay (0-10s) before connecting to prevent thundering herd | ||
| 186 | /// on startup when multiple relays are configured. | ||
| 187 | fn spawn_connection_with_jitter( | ||
| 188 | &self, | ||
| 189 | url: String, | ||
| 190 | tx: mpsc::Sender<SyncedEvent>, | ||
| 191 | filter_service: Arc<FilterService>, | ||
| 192 | ) { | ||
| 193 | let domain = self.relay_domain.clone(); | ||
| 194 | let health_tracker = self.health_tracker.clone(); | ||
| 195 | |||
| 196 | tokio::spawn(async move { | ||
| 197 | // Apply startup jitter | ||
| 198 | let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS); | ||
| 199 | tracing::debug!( | ||
| 200 | "Applying {}ms startup jitter before connecting to {}", | ||
| 201 | jitter_ms, | ||
| 202 | url | ||
| 203 | ); | ||
| 204 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | ||
| 205 | |||
| 206 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; | ||
| 207 | }); | ||
| 208 | } | ||
| 209 | |||
| 210 | /// Spawn a connection task for a relay without jitter | ||
| 211 | /// | ||
| 212 | /// Used for relays discovered during runtime (not at startup). | ||
| 152 | fn spawn_connection( | 213 | fn spawn_connection( |
| 153 | &self, | 214 | &self, |
| 154 | url: String, | 215 | url: String, |
| @@ -156,8 +217,10 @@ impl SyncManager { | |||
| 156 | filter_service: Arc<FilterService>, | 217 | filter_service: Arc<FilterService>, |
| 157 | ) { | 218 | ) { |
| 158 | let domain = self.relay_domain.clone(); | 219 | let domain = self.relay_domain.clone(); |
| 220 | let health_tracker = self.health_tracker.clone(); | ||
| 221 | |||
| 159 | tokio::spawn(async move { | 222 | tokio::spawn(async move { |
| 160 | connect_with_retry(&url, tx, filter_service, &domain).await; | 223 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; |
| 161 | }); | 224 | }); |
| 162 | } | 225 | } |
| 163 | 226 | ||