upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/manager.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:58:31 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:58:31 +0000
commitf639ecfac6687c9e8de4e3f305e168b2e4e1bb87 (patch)
treecfcbf16a937a59048930ccaf8557f78ed5576bde /src/sync/manager.rs
parentbf558b0dc17e14f96eea624ea5591315a2909154 (diff)
feat(sync): Phase 3 - resilience and health tracking
- Add RelayHealthTracker with DashMap - Implement exponential backoff (5s -> 1h max) - Handle dead relays (24h failures -> daily retry) - Add startup jitter to prevent thundering herd - Add NGIT_SYNC_MAX_BACKOFF_SECS config
Diffstat (limited to 'src/sync/manager.rs')
-rw-r--r--src/sync/manager.rs75
1 files changed, 69 insertions, 6 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 8f6a9bd..1f70f42 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -9,18 +9,31 @@
9//! - Relay discovery from stored kind 30617 announcements 9//! - Relay discovery from stored kind 30617 announcements
10//! - Multiple simultaneous relay connections 10//! - Multiple simultaneous relay connections
11//! - Three-layer filter strategy via FilterService 11//! - Three-layer filter strategy via FilterService
12//!
13//! ## Phase 3 Features
14//!
15//! - Health tracking with exponential backoff
16//! - Dead relay detection after 24h of failures
17//! - Startup jitter to prevent thundering herd
12 18
13use std::collections::HashSet; 19use std::collections::HashSet;
14use std::sync::Arc; 20use std::sync::Arc;
21use std::time::Duration;
15 22
16use nostr_relay_builder::prelude::*; 23use nostr_relay_builder::prelude::*;
24use rand::Rng;
17use tokio::sync::mpsc; 25use tokio::sync::mpsc;
18 26
19use super::connection::{connect_with_retry, SyncedEvent}; 27use super::connection::{connect_with_retry, SyncedEvent};
20use super::filter::FilterService; 28use super::filter::FilterService;
29use super::health::RelayHealthTracker;
21use super::SYNC_SOURCE_ADDR; 30use super::SYNC_SOURCE_ADDR;
31use crate::config::Config;
22use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 32use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
23 33
34/// Maximum startup jitter in milliseconds (10 seconds)
35const MAX_STARTUP_JITTER_MS: u64 = 10_000;
36
24/// Coordinates proactive sync from configured and discovered relays 37/// Coordinates proactive sync from configured and discovered relays
25pub struct SyncManager { 38pub struct SyncManager {
26 /// Initial relay URL to sync from (from config) 39 /// Initial relay URL to sync from (from config)
@@ -31,6 +44,8 @@ pub struct SyncManager {
31 database: SharedDatabase, 44 database: SharedDatabase,
32 /// Write policy for validating events 45 /// Write policy for validating events
33 write_policy: Nip34WritePolicy, 46 write_policy: Nip34WritePolicy,
47 /// Health tracker for relay connections
48 health_tracker: Arc<RelayHealthTracker>,
34} 49}
35 50
36impl SyncManager { 51impl SyncManager {
@@ -41,17 +56,20 @@ impl SyncManager {
41 /// * `relay_domain` - Our relay's domain (used to exclude self from sync) 56 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
42 /// * `database` - Shared database for storing events and querying announcements 57 /// * `database` - Shared database for storing events and querying announcements
43 /// * `write_policy` - Write policy for validating synced events 58 /// * `write_policy` - Write policy for validating synced events
59 /// * `config` - Configuration for health tracking settings
44 pub fn new( 60 pub fn new(
45 initial_relay_url: Option<String>, 61 initial_relay_url: Option<String>,
46 relay_domain: String, 62 relay_domain: String,
47 database: SharedDatabase, 63 database: SharedDatabase,
48 write_policy: Nip34WritePolicy, 64 write_policy: Nip34WritePolicy,
65 config: &Config,
49 ) -> Self { 66 ) -> Self {
50 Self { 67 Self {
51 initial_relay_url, 68 initial_relay_url,
52 relay_domain, 69 relay_domain,
53 database, 70 database,
54 write_policy, 71 write_policy,
72 health_tracker: Arc::new(RelayHealthTracker::new(config)),
55 } 73 }
56 } 74 }
57 75
@@ -68,9 +86,15 @@ impl SyncManager {
68 relay_domain, 86 relay_domain,
69 database, 87 database,
70 write_policy, 88 write_policy,
89 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
71 } 90 }
72 } 91 }
73 92
93 /// Get a reference to the health tracker
94 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
95 self.health_tracker.clone()
96 }
97
74 /// Run the sync manager 98 /// Run the sync manager
75 /// 99 ///
76 /// This discovers relays from stored announcements, spawns connection tasks, 100 /// This discovers relays from stored announcements, spawns connection tasks,
@@ -94,12 +118,14 @@ impl SyncManager {
94 // Track active relay URLs to avoid duplicates 118 // Track active relay URLs to avoid duplicates
95 let mut active_relays: HashSet<String> = HashSet::new(); 119 let mut active_relays: HashSet<String> = HashSet::new();
96 120
121 // Collect all relays to connect to
122 let mut relays_to_connect: Vec<String> = Vec::new();
123
97 // Start with initial relay if configured 124 // Start with initial relay if configured
98 if let Some(ref url) = self.initial_relay_url { 125 if let Some(ref url) = self.initial_relay_url {
99 if !self.is_own_relay(url) { 126 if !self.is_own_relay(url) {
100 tracing::info!("Connecting to initial sync relay: {}", url); 127 relays_to_connect.push(url.clone());
101 active_relays.insert(url.clone()); 128 active_relays.insert(url.clone());
102 self.spawn_connection(url.clone(), tx.clone(), filter_service.clone());
103 } else { 129 } else {
104 tracing::info!("Skipping initial relay (is our own relay): {}", url); 130 tracing::info!("Skipping initial relay (is our own relay): {}", url);
105 } 131 }
@@ -109,12 +135,17 @@ impl SyncManager {
109 let discovered_urls = filter_service.discover_relay_urls().await; 135 let discovered_urls = filter_service.discover_relay_urls().await;
110 for url in discovered_urls { 136 for url in discovered_urls {
111 if !active_relays.contains(&url) && !self.is_own_relay(&url) { 137 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
112 tracing::info!("Connecting to discovered relay: {}", url); 138 relays_to_connect.push(url.clone());
113 active_relays.insert(url.clone()); 139 active_relays.insert(url.clone());
114 self.spawn_connection(url, tx.clone(), filter_service.clone());
115 } 140 }
116 } 141 }
117 142
143 // Spawn connections with startup jitter to prevent thundering herd
144 for url in relays_to_connect {
145 tracing::info!("Scheduling connection to sync relay: {}", url);
146 self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone());
147 }
148
118 if active_relays.is_empty() { 149 if active_relays.is_empty() {
119 tracing::warn!("No sync relays configured or discovered, SyncManager idle"); 150 tracing::warn!("No sync relays configured or discovered, SyncManager idle");
120 } else { 151 } else {
@@ -133,6 +164,7 @@ impl SyncManager {
133 if !active_relays.contains(&url) && !self.is_own_relay(&url) { 164 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
134 tracing::info!("Discovered new relay from event, connecting: {}", url); 165 tracing::info!("Discovered new relay from event, connecting: {}", url);
135 active_relays.insert(url.clone()); 166 active_relays.insert(url.clone());
167 // New relays discovered during runtime don't need jitter
136 self.spawn_connection(url, tx.clone(), filter_service.clone()); 168 self.spawn_connection(url, tx.clone(), filter_service.clone());
137 } 169 }
138 } 170 }
@@ -148,7 +180,36 @@ impl SyncManager {
148 url.contains(&self.relay_domain) 180 url.contains(&self.relay_domain)
149 } 181 }
150 182
151 /// Spawn a connection task for a relay 183 /// Spawn a connection task for a relay with startup jitter
184 ///
185 /// Adds a random delay (0-10s) before connecting to prevent thundering herd
186 /// on startup when multiple relays are configured.
187 fn spawn_connection_with_jitter(
188 &self,
189 url: String,
190 tx: mpsc::Sender<SyncedEvent>,
191 filter_service: Arc<FilterService>,
192 ) {
193 let domain = self.relay_domain.clone();
194 let health_tracker = self.health_tracker.clone();
195
196 tokio::spawn(async move {
197 // Apply startup jitter
198 let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS);
199 tracing::debug!(
200 "Applying {}ms startup jitter before connecting to {}",
201 jitter_ms,
202 url
203 );
204 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
205
206 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
207 });
208 }
209
210 /// Spawn a connection task for a relay without jitter
211 ///
212 /// Used for relays discovered during runtime (not at startup).
152 fn spawn_connection( 213 fn spawn_connection(
153 &self, 214 &self,
154 url: String, 215 url: String,
@@ -156,8 +217,10 @@ impl SyncManager {
156 filter_service: Arc<FilterService>, 217 filter_service: Arc<FilterService>,
157 ) { 218 ) {
158 let domain = self.relay_domain.clone(); 219 let domain = self.relay_domain.clone();
220 let health_tracker = self.health_tracker.clone();
221
159 tokio::spawn(async move { 222 tokio::spawn(async move {
160 connect_with_retry(&url, tx, filter_service, &domain).await; 223 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
161 }); 224 });
162 } 225 }
163 226