upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/connection.rs80
-rw-r--r--src/sync/health.rs475
-rw-r--r--src/sync/manager.rs75
-rw-r--r--src/sync/mod.rs9
4 files changed, 621 insertions, 18 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index 76cc8e8..319cbbd 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -9,6 +9,12 @@
9//! 1. Layer 1: kinds 30617 + 30618 (announcements) 9//! 1. Layer 1: kinds 30617 + 30618 (announcements)
10//! 2. Layer 2: A/a tags for repository events 10//! 2. Layer 2: A/a tags for repository events
11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) 11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.)
12//!
13//! ## Phase 3 Features
14//!
15//! - Health tracking with success/failure reporting
16//! - Exponential backoff with health-aware delays
17//! - Dead relay detection and minimal retry
12 18
13use std::sync::Arc; 19use std::sync::Arc;
14use std::time::Duration; 20use std::time::Duration;
@@ -17,6 +23,7 @@ use nostr_sdk::prelude::*;
17use tokio::sync::mpsc; 23use tokio::sync::mpsc;
18 24
19use super::filter::FilterService; 25use super::filter::FilterService;
26use super::health::RelayHealthTracker;
20 27
21/// Event received from the sync connection 28/// Event received from the sync connection
22#[derive(Debug, Clone)] 29#[derive(Debug, Clone)]
@@ -169,47 +176,96 @@ impl SyncConnection {
169 } 176 }
170} 177}
171 178
172/// Reconnect loop with exponential backoff 179/// Reconnect loop with health-aware exponential backoff
180///
181/// This function manages the connection lifecycle with health tracking:
182/// - Checks health state before attempting connections
183/// - Reports success/failure to the health tracker
184/// - Respects backoff delays from the health tracker
185/// - Handles dead relay detection (24h+ failures)
173/// 186///
174/// # Arguments 187/// # Arguments
175/// * `url` - The relay URL to connect to 188/// * `url` - The relay URL to connect to
176/// * `tx` - Channel sender for synced events 189/// * `tx` - Channel sender for synced events
177/// * `filter_service` - FilterService for building subscriptions 190/// * `filter_service` - FilterService for building subscriptions
178/// * `our_domain` - Our relay's domain (used to extract remote domain) 191/// * `our_domain` - Our relay's domain (used to extract remote domain)
192/// * `health_tracker` - Health tracker for managing connection state
179pub async fn connect_with_retry( 193pub async fn connect_with_retry(
180 url: &str, 194 url: &str,
181 tx: mpsc::Sender<SyncedEvent>, 195 tx: mpsc::Sender<SyncedEvent>,
182 filter_service: Arc<FilterService>, 196 filter_service: Arc<FilterService>,
183 _our_domain: &str, 197 _our_domain: &str,
198 health_tracker: Arc<RelayHealthTracker>,
184) { 199) {
185 let mut backoff = Duration::from_secs(1);
186 let max_backoff = Duration::from_secs(60);
187
188 // Extract remote domain from URL 200 // Extract remote domain from URL
189 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); 201 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
190 202
191 loop { 203 loop {
204 // Check if we should attempt connection based on health state
205 if !health_tracker.should_attempt_connection(url) {
206 // Wait for remaining backoff
207 if let Some(remaining) = health_tracker.get_remaining_backoff(url) {
208 tracing::debug!(
209 "Relay {} in backoff, waiting {:?} before retry",
210 url,
211 remaining
212 );
213 tokio::time::sleep(remaining).await;
214 continue;
215 }
216 }
217
218 // Log current health state for dead relays
219 if health_tracker.is_dead(url) {
220 tracing::info!(
221 "Attempting reconnection to dead relay {} (daily retry)",
222 url
223 );
224 }
225
192 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { 226 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await {
193 Ok(conn) => { 227 Ok(conn) => {
194 backoff = Duration::from_secs(1); // Reset backoff on successful connection 228 // Record successful connection
229 health_tracker.record_success(url);
230 tracing::info!("Sync connection established to {}", url);
231
232 // Run the connection (this blocks until disconnection)
195 conn.run(tx.clone()).await; 233 conn.run(tx.clone()).await;
234
235 // Connection ended - record as failure for reconnection backoff
236 // (The connection ending is considered a failure even if it worked for a while)
237 health_tracker.record_failure(url);
196 tracing::warn!("Sync connection to {} ended, will reconnect", url); 238 tracing::warn!("Sync connection to {} ended, will reconnect", url);
197 } 239 }
198 Err(e) => { 240 Err(e) => {
241 // Record connection failure
242 health_tracker.record_failure(url);
243
244 let failure_count = health_tracker.get_failure_count(url);
245 let state = health_tracker.get_state(url);
246
199 tracing::error!( 247 tracing::error!(
200 "Failed to connect to sync relay {}: {} (retrying in {:?})", 248 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}",
201 url, 249 url,
202 e, 250 failure_count,
203 backoff 251 state,
252 e
204 ); 253 );
205 } 254 }
206 } 255 }
207 256
208 // Wait before reconnecting 257 // Get the backoff duration from health tracker
209 tokio::time::sleep(backoff).await; 258 // If the health tracker has no backoff set (shouldn't happen), use a small default
259 let wait_duration = health_tracker
260 .get_remaining_backoff(url)
261 .unwrap_or(Duration::from_secs(5));
210 262
211 // Exponential backoff 263 tracing::debug!(
212 backoff = std::cmp::min(backoff * 2, max_backoff); 264 "Waiting {:?} before reconnecting to {}",
265 wait_duration,
266 url
267 );
268 tokio::time::sleep(wait_duration).await;
213 } 269 }
214} 270}
215 271
diff --git a/src/sync/health.rs b/src/sync/health.rs
new file mode 100644
index 0000000..51bd5ae
--- /dev/null
+++ b/src/sync/health.rs
@@ -0,0 +1,475 @@
1//! Relay Health Tracking for GRASP-02 Proactive Sync
2//!
3//! This module implements health tracking for relay connections, including:
4//! - Health state machine (Healthy -> Degraded -> Dead)
5//! - Exponential backoff with configurable max delay
6//! - Dead relay detection after 24h of continuous failures
7//!
8//! ## Health States
9//!
10//! - **Healthy**: Working connection, no recent failures
11//! - **Degraded**: Connection failed, retrying with backoff
12//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day)
13
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18
19use crate::config::Config;
20
21/// Duration threshold before a relay is considered dead (24 hours)
22const DEAD_THRESHOLD_HOURS: u64 = 24;
23
24/// How often dead relays are retried (once per 24 hours)
25const DEAD_RETRY_INTERVAL_HOURS: u64 = 24;
26
27/// Default maximum backoff duration in seconds (1 hour)
28const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600;
29
30/// Base backoff duration in seconds
31const BASE_BACKOFF_SECS: u64 = 5;
32
33/// Health state of a relay connection
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum HealthState {
36 /// Working connection, no recent failures
37 Healthy,
38 /// Connection failed, retrying with exponential backoff
39 Degraded,
40 /// 24h+ of continuous failures, minimal retry
41 Dead,
42}
43
44impl std::fmt::Display for HealthState {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 HealthState::Healthy => write!(f, "healthy"),
48 HealthState::Degraded => write!(f, "degraded"),
49 HealthState::Dead => write!(f, "dead"),
50 }
51 }
52}
53
54/// Health information for a single relay
55#[derive(Debug, Clone)]
56pub struct RelayHealth {
57 /// Current health state
58 pub state: HealthState,
59 /// Number of consecutive connection failures
60 pub consecutive_failures: u32,
61 /// Time of the first failure in the current failure streak
62 pub first_failure_time: Option<Instant>,
63 /// Time of the last failure
64 pub last_failure_time: Option<Instant>,
65 /// Time of the last successful connection
66 pub last_success_time: Option<Instant>,
67 /// Next time a connection attempt should be made
68 pub next_retry_at: Option<Instant>,
69}
70
71impl Default for RelayHealth {
72 fn default() -> Self {
73 Self {
74 state: HealthState::Healthy,
75 consecutive_failures: 0,
76 first_failure_time: None,
77 last_failure_time: None,
78 last_success_time: None,
79 next_retry_at: None,
80 }
81 }
82}
83
84impl RelayHealth {
85 /// Create a new RelayHealth with healthy state
86 pub fn new() -> Self {
87 Self::default()
88 }
89}
90
91/// Thread-safe relay health tracker using DashMap
92#[derive(Debug)]
93pub struct RelayHealthTracker {
94 health: DashMap<String, RelayHealth>,
95 max_backoff_secs: u64,
96}
97
98impl RelayHealthTracker {
99 /// Create a new RelayHealthTracker
100 pub fn new(config: &Config) -> Self {
101 Self {
102 health: DashMap::new(),
103 max_backoff_secs: config.sync_max_backoff_secs,
104 }
105 }
106
107 /// Create a new RelayHealthTracker with default settings
108 pub fn with_defaults() -> Self {
109 Self {
110 health: DashMap::new(),
111 max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS,
112 }
113 }
114
115 /// Create a new RelayHealthTracker with custom max backoff
116 pub fn with_max_backoff(max_backoff_secs: u64) -> Self {
117 Self {
118 health: DashMap::new(),
119 max_backoff_secs,
120 }
121 }
122
123 /// Record a successful connection to a relay
124 ///
125 /// Resets the relay to Healthy state and clears failure counters.
126 pub fn record_success(&self, relay_url: &str) {
127 let now = Instant::now();
128 let mut entry = self.health.entry(relay_url.to_string()).or_default();
129 let health = entry.value_mut();
130
131 let old_state = health.state;
132
133 // Reset to healthy state
134 health.state = HealthState::Healthy;
135 health.consecutive_failures = 0;
136 health.first_failure_time = None;
137 health.last_failure_time = None;
138 health.last_success_time = Some(now);
139 health.next_retry_at = None;
140
141 if old_state != HealthState::Healthy {
142 tracing::info!(
143 "Relay {} recovered to healthy (was {:?})",
144 relay_url,
145 old_state
146 );
147 }
148 }
149
150 /// Record a connection failure for a relay
151 ///
152 /// Increments failure counter, updates state, and calculates next retry time.
153 pub fn record_failure(&self, relay_url: &str) {
154 let now = Instant::now();
155 let mut entry = self.health.entry(relay_url.to_string()).or_default();
156 let health = entry.value_mut();
157
158 let old_state = health.state;
159
160 // Set first_failure_time if this is a new failure streak
161 if health.first_failure_time.is_none() {
162 health.first_failure_time = Some(now);
163 }
164
165 health.consecutive_failures = health.consecutive_failures.saturating_add(1);
166 health.last_failure_time = Some(now);
167
168 // Check if we should transition to Dead state
169 if let Some(first_failure) = health.first_failure_time {
170 let failure_duration = now.duration_since(first_failure);
171 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600);
172
173 if failure_duration >= dead_threshold {
174 health.state = HealthState::Dead;
175 // Dead relays retry once per day
176 health.next_retry_at =
177 Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600));
178
179 if old_state != HealthState::Dead {
180 tracing::warn!(
181 "Relay {} marked dead after 24h failures ({} consecutive failures)",
182 relay_url,
183 health.consecutive_failures
184 );
185 }
186 } else {
187 // Degraded state with exponential backoff
188 health.state = HealthState::Degraded;
189 let backoff = Self::get_backoff_duration(
190 health.consecutive_failures,
191 self.max_backoff_secs,
192 );
193 health.next_retry_at = Some(now + backoff);
194
195 if old_state != HealthState::Degraded {
196 tracing::warn!(
197 "Relay {} degraded, backoff {:?}",
198 relay_url,
199 backoff
200 );
201 } else {
202 tracing::debug!(
203 "Relay {} failure #{}, backoff {:?}",
204 relay_url,
205 health.consecutive_failures,
206 backoff
207 );
208 }
209 }
210 }
211 }
212
213 /// Check if a connection attempt should be made to a relay
214 ///
215 /// Returns true if:
216 /// - The relay has no health record (first attempt)
217 /// - The relay is healthy
218 /// - The backoff period has elapsed
219 pub fn should_attempt_connection(&self, relay_url: &str) -> bool {
220 let entry = self.health.get(relay_url);
221
222 match entry {
223 None => true, // No record, allow first attempt
224 Some(entry) => {
225 let health = entry.value();
226
227 match health.state {
228 HealthState::Healthy => true,
229 HealthState::Degraded | HealthState::Dead => {
230 // Check if backoff period has elapsed
231 match health.next_retry_at {
232 None => true,
233 Some(next_retry) => Instant::now() >= next_retry,
234 }
235 }
236 }
237 }
238 }
239 }
240
241 /// Get the current health state of a relay
242 pub fn get_state(&self, relay_url: &str) -> HealthState {
243 self.health
244 .get(relay_url)
245 .map(|entry| entry.value().state)
246 .unwrap_or(HealthState::Healthy)
247 }
248
249 /// Check if a relay is marked as dead
250 pub fn is_dead(&self, relay_url: &str) -> bool {
251 self.get_state(relay_url) == HealthState::Dead
252 }
253
254 /// Get the remaining backoff duration for a relay
255 ///
256 /// Returns None if no backoff is active.
257 pub fn get_remaining_backoff(&self, relay_url: &str) -> Option<Duration> {
258 let entry = self.health.get(relay_url)?;
259 let health = entry.value();
260 let next_retry = health.next_retry_at?;
261 let now = Instant::now();
262
263 if now >= next_retry {
264 None
265 } else {
266 Some(next_retry - now)
267 }
268 }
269
270 /// Get the consecutive failure count for a relay
271 pub fn get_failure_count(&self, relay_url: &str) -> u32 {
272 self.health
273 .get(relay_url)
274 .map(|entry| entry.value().consecutive_failures)
275 .unwrap_or(0)
276 }
277
278 /// Calculate the backoff duration based on failure count
279 ///
280 /// Uses exponential backoff: base * 2^failures, capped at max_backoff
281 pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration {
282 let backoff_secs = BASE_BACKOFF_SECS
283 .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1)));
284 Duration::from_secs(backoff_secs.min(max_backoff_secs))
285 }
286
287 /// Get all tracked relay URLs
288 pub fn get_tracked_relays(&self) -> Vec<String> {
289 self.health.iter().map(|entry| entry.key().clone()).collect()
290 }
291
292 /// Get a clone of the health info for a relay
293 pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> {
294 self.health.get(relay_url).map(|entry| entry.value().clone())
295 }
296}
297
298/// Create a shared RelayHealthTracker wrapped in Arc
299pub fn create_health_tracker(config: &Config) -> Arc<RelayHealthTracker> {
300 Arc::new(RelayHealthTracker::new(config))
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_health_state_display() {
309 assert_eq!(HealthState::Healthy.to_string(), "healthy");
310 assert_eq!(HealthState::Degraded.to_string(), "degraded");
311 assert_eq!(HealthState::Dead.to_string(), "dead");
312 }
313
314 #[test]
315 fn test_default_health_is_healthy() {
316 let health = RelayHealth::default();
317 assert_eq!(health.state, HealthState::Healthy);
318 assert_eq!(health.consecutive_failures, 0);
319 assert!(health.first_failure_time.is_none());
320 }
321
322 #[test]
323 fn test_should_attempt_connection_new_relay() {
324 let tracker = RelayHealthTracker::with_defaults();
325 assert!(tracker.should_attempt_connection("wss://new-relay.example.com"));
326 }
327
328 #[test]
329 fn test_record_success_resets_to_healthy() {
330 let tracker = RelayHealthTracker::with_defaults();
331 let url = "wss://test-relay.example.com";
332
333 // Simulate a few failures
334 tracker.record_failure(url);
335 tracker.record_failure(url);
336 assert_eq!(tracker.get_state(url), HealthState::Degraded);
337 assert_eq!(tracker.get_failure_count(url), 2);
338
339 // Record success
340 tracker.record_success(url);
341 assert_eq!(tracker.get_state(url), HealthState::Healthy);
342 assert_eq!(tracker.get_failure_count(url), 0);
343 assert!(tracker.should_attempt_connection(url));
344 }
345
346 #[test]
347 fn test_backoff_increases_exponentially() {
348 // failure 1: 5s
349 assert_eq!(
350 RelayHealthTracker::get_backoff_duration(1, 3600),
351 Duration::from_secs(5)
352 );
353 // failure 2: 10s
354 assert_eq!(
355 RelayHealthTracker::get_backoff_duration(2, 3600),
356 Duration::from_secs(10)
357 );
358 // failure 3: 20s
359 assert_eq!(
360 RelayHealthTracker::get_backoff_duration(3, 3600),
361 Duration::from_secs(20)
362 );
363 // failure 4: 40s
364 assert_eq!(
365 RelayHealthTracker::get_backoff_duration(4, 3600),
366 Duration::from_secs(40)
367 );
368 // failure 5: 80s
369 assert_eq!(
370 RelayHealthTracker::get_backoff_duration(5, 3600),
371 Duration::from_secs(80)
372 );
373 }
374
375 #[test]
376 fn test_backoff_capped_at_max() {
377 let max_backoff = 3600u64;
378 // After many failures, should cap at max_backoff (1 hour)
379 assert_eq!(
380 RelayHealthTracker::get_backoff_duration(20, max_backoff),
381 Duration::from_secs(max_backoff)
382 );
383 }
384
385 #[test]
386 fn test_degraded_state_after_failure() {
387 let tracker = RelayHealthTracker::with_defaults();
388 let url = "wss://test-relay.example.com";
389
390 tracker.record_failure(url);
391 assert_eq!(tracker.get_state(url), HealthState::Degraded);
392 assert_eq!(tracker.get_failure_count(url), 1);
393 }
394
395 #[test]
396 fn test_backoff_blocks_immediate_reconnection() {
397 let tracker = RelayHealthTracker::with_defaults();
398 let url = "wss://test-relay.example.com";
399
400 tracker.record_failure(url);
401
402 // Immediately after failure, should not attempt (backoff active)
403 assert!(!tracker.should_attempt_connection(url));
404
405 // Remaining backoff should be some positive duration
406 let remaining = tracker.get_remaining_backoff(url);
407 assert!(remaining.is_some());
408 assert!(remaining.unwrap() > Duration::ZERO);
409 }
410
411 #[test]
412 fn test_is_dead() {
413 let tracker = RelayHealthTracker::with_defaults();
414 let url = "wss://test-relay.example.com";
415
416 // Initially not dead
417 assert!(!tracker.is_dead(url));
418
419 // After a failure, still not dead (just degraded)
420 tracker.record_failure(url);
421 assert!(!tracker.is_dead(url));
422 assert_eq!(tracker.get_state(url), HealthState::Degraded);
423 }
424
425 #[test]
426 fn test_get_tracked_relays() {
427 let tracker = RelayHealthTracker::with_defaults();
428
429 tracker.record_success("wss://relay1.example.com");
430 tracker.record_failure("wss://relay2.example.com");
431
432 let tracked = tracker.get_tracked_relays();
433 assert_eq!(tracked.len(), 2);
434 assert!(tracked.contains(&"wss://relay1.example.com".to_string()));
435 assert!(tracked.contains(&"wss://relay2.example.com".to_string()));
436 }
437
438 #[test]
439 fn test_custom_max_backoff() {
440 let custom_max = 60u64; // 1 minute max
441 let tracker = RelayHealthTracker::with_max_backoff(custom_max);
442 let url = "wss://test-relay.example.com";
443
444 // Simulate many failures
445 for _ in 0..20 {
446 tracker.record_failure(url);
447 }
448
449 // The remaining backoff should respect the custom max
450 // Note: We can't easily test the internal backoff calculation here,
451 // but we can verify the tracker was created with the custom setting
452 assert_eq!(tracker.max_backoff_secs, custom_max);
453 }
454
455 #[test]
456 fn test_get_health_returns_clone() {
457 let tracker = RelayHealthTracker::with_defaults();
458 let url = "wss://test-relay.example.com";
459
460 tracker.record_success(url);
461 let health = tracker.get_health(url);
462
463 assert!(health.is_some());
464 let health = health.unwrap();
465 assert_eq!(health.state, HealthState::Healthy);
466 assert!(health.last_success_time.is_some());
467 }
468
469 #[test]
470 fn test_get_health_nonexistent() {
471 let tracker = RelayHealthTracker::with_defaults();
472 let health = tracker.get_health("wss://nonexistent.example.com");
473 assert!(health.is_none());
474 }
475} \ No newline at end of file
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 8f6a9bd..1f70f42 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -9,18 +9,31 @@
9//! - Relay discovery from stored kind 30617 announcements 9//! - Relay discovery from stored kind 30617 announcements
10//! - Multiple simultaneous relay connections 10//! - Multiple simultaneous relay connections
11//! - Three-layer filter strategy via FilterService 11//! - Three-layer filter strategy via FilterService
12//!
13//! ## Phase 3 Features
14//!
15//! - Health tracking with exponential backoff
16//! - Dead relay detection after 24h of failures
17//! - Startup jitter to prevent thundering herd
12 18
13use std::collections::HashSet; 19use std::collections::HashSet;
14use std::sync::Arc; 20use std::sync::Arc;
21use std::time::Duration;
15 22
16use nostr_relay_builder::prelude::*; 23use nostr_relay_builder::prelude::*;
24use rand::Rng;
17use tokio::sync::mpsc; 25use tokio::sync::mpsc;
18 26
19use super::connection::{connect_with_retry, SyncedEvent}; 27use super::connection::{connect_with_retry, SyncedEvent};
20use super::filter::FilterService; 28use super::filter::FilterService;
29use super::health::RelayHealthTracker;
21use super::SYNC_SOURCE_ADDR; 30use super::SYNC_SOURCE_ADDR;
31use crate::config::Config;
22use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 32use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
23 33
34/// Maximum startup jitter in milliseconds (10 seconds)
35const MAX_STARTUP_JITTER_MS: u64 = 10_000;
36
24/// Coordinates proactive sync from configured and discovered relays 37/// Coordinates proactive sync from configured and discovered relays
25pub struct SyncManager { 38pub struct SyncManager {
26 /// Initial relay URL to sync from (from config) 39 /// Initial relay URL to sync from (from config)
@@ -31,6 +44,8 @@ pub struct SyncManager {
31 database: SharedDatabase, 44 database: SharedDatabase,
32 /// Write policy for validating events 45 /// Write policy for validating events
33 write_policy: Nip34WritePolicy, 46 write_policy: Nip34WritePolicy,
47 /// Health tracker for relay connections
48 health_tracker: Arc<RelayHealthTracker>,
34} 49}
35 50
36impl SyncManager { 51impl SyncManager {
@@ -41,17 +56,20 @@ impl SyncManager {
41 /// * `relay_domain` - Our relay's domain (used to exclude self from sync) 56 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
42 /// * `database` - Shared database for storing events and querying announcements 57 /// * `database` - Shared database for storing events and querying announcements
43 /// * `write_policy` - Write policy for validating synced events 58 /// * `write_policy` - Write policy for validating synced events
59 /// * `config` - Configuration for health tracking settings
44 pub fn new( 60 pub fn new(
45 initial_relay_url: Option<String>, 61 initial_relay_url: Option<String>,
46 relay_domain: String, 62 relay_domain: String,
47 database: SharedDatabase, 63 database: SharedDatabase,
48 write_policy: Nip34WritePolicy, 64 write_policy: Nip34WritePolicy,
65 config: &Config,
49 ) -> Self { 66 ) -> Self {
50 Self { 67 Self {
51 initial_relay_url, 68 initial_relay_url,
52 relay_domain, 69 relay_domain,
53 database, 70 database,
54 write_policy, 71 write_policy,
72 health_tracker: Arc::new(RelayHealthTracker::new(config)),
55 } 73 }
56 } 74 }
57 75
@@ -68,9 +86,15 @@ impl SyncManager {
68 relay_domain, 86 relay_domain,
69 database, 87 database,
70 write_policy, 88 write_policy,
89 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
71 } 90 }
72 } 91 }
73 92
93 /// Get a reference to the health tracker
94 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
95 self.health_tracker.clone()
96 }
97
74 /// Run the sync manager 98 /// Run the sync manager
75 /// 99 ///
76 /// This discovers relays from stored announcements, spawns connection tasks, 100 /// This discovers relays from stored announcements, spawns connection tasks,
@@ -94,12 +118,14 @@ impl SyncManager {
94 // Track active relay URLs to avoid duplicates 118 // Track active relay URLs to avoid duplicates
95 let mut active_relays: HashSet<String> = HashSet::new(); 119 let mut active_relays: HashSet<String> = HashSet::new();
96 120
121 // Collect all relays to connect to
122 let mut relays_to_connect: Vec<String> = Vec::new();
123
97 // Start with initial relay if configured 124 // Start with initial relay if configured
98 if let Some(ref url) = self.initial_relay_url { 125 if let Some(ref url) = self.initial_relay_url {
99 if !self.is_own_relay(url) { 126 if !self.is_own_relay(url) {
100 tracing::info!("Connecting to initial sync relay: {}", url); 127 relays_to_connect.push(url.clone());
101 active_relays.insert(url.clone()); 128 active_relays.insert(url.clone());
102 self.spawn_connection(url.clone(), tx.clone(), filter_service.clone());
103 } else { 129 } else {
104 tracing::info!("Skipping initial relay (is our own relay): {}", url); 130 tracing::info!("Skipping initial relay (is our own relay): {}", url);
105 } 131 }
@@ -109,12 +135,17 @@ impl SyncManager {
109 let discovered_urls = filter_service.discover_relay_urls().await; 135 let discovered_urls = filter_service.discover_relay_urls().await;
110 for url in discovered_urls { 136 for url in discovered_urls {
111 if !active_relays.contains(&url) && !self.is_own_relay(&url) { 137 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
112 tracing::info!("Connecting to discovered relay: {}", url); 138 relays_to_connect.push(url.clone());
113 active_relays.insert(url.clone()); 139 active_relays.insert(url.clone());
114 self.spawn_connection(url, tx.clone(), filter_service.clone());
115 } 140 }
116 } 141 }
117 142
143 // Spawn connections with startup jitter to prevent thundering herd
144 for url in relays_to_connect {
145 tracing::info!("Scheduling connection to sync relay: {}", url);
146 self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone());
147 }
148
118 if active_relays.is_empty() { 149 if active_relays.is_empty() {
119 tracing::warn!("No sync relays configured or discovered, SyncManager idle"); 150 tracing::warn!("No sync relays configured or discovered, SyncManager idle");
120 } else { 151 } else {
@@ -133,6 +164,7 @@ impl SyncManager {
133 if !active_relays.contains(&url) && !self.is_own_relay(&url) { 164 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
134 tracing::info!("Discovered new relay from event, connecting: {}", url); 165 tracing::info!("Discovered new relay from event, connecting: {}", url);
135 active_relays.insert(url.clone()); 166 active_relays.insert(url.clone());
167 // New relays discovered during runtime don't need jitter
136 self.spawn_connection(url, tx.clone(), filter_service.clone()); 168 self.spawn_connection(url, tx.clone(), filter_service.clone());
137 } 169 }
138 } 170 }
@@ -148,7 +180,36 @@ impl SyncManager {
148 url.contains(&self.relay_domain) 180 url.contains(&self.relay_domain)
149 } 181 }
150 182
151 /// Spawn a connection task for a relay 183 /// Spawn a connection task for a relay with startup jitter
184 ///
185 /// Adds a random delay (0-10s) before connecting to prevent thundering herd
186 /// on startup when multiple relays are configured.
187 fn spawn_connection_with_jitter(
188 &self,
189 url: String,
190 tx: mpsc::Sender<SyncedEvent>,
191 filter_service: Arc<FilterService>,
192 ) {
193 let domain = self.relay_domain.clone();
194 let health_tracker = self.health_tracker.clone();
195
196 tokio::spawn(async move {
197 // Apply startup jitter
198 let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS);
199 tracing::debug!(
200 "Applying {}ms startup jitter before connecting to {}",
201 jitter_ms,
202 url
203 );
204 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
205
206 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
207 });
208 }
209
210 /// Spawn a connection task for a relay without jitter
211 ///
212 /// Used for relays discovered during runtime (not at startup).
152 fn spawn_connection( 213 fn spawn_connection(
153 &self, 214 &self,
154 url: String, 215 url: String,
@@ -156,8 +217,10 @@ impl SyncManager {
156 filter_service: Arc<FilterService>, 217 filter_service: Arc<FilterService>,
157 ) { 218 ) {
158 let domain = self.relay_domain.clone(); 219 let domain = self.relay_domain.clone();
220 let health_tracker = self.health_tracker.clone();
221
159 tokio::spawn(async move { 222 tokio::spawn(async move {
160 connect_with_retry(&url, tx, filter_service, &domain).await; 223 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
161 }); 224 });
162 } 225 }
163 226
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 1155eaf..653aa27 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -9,12 +9,21 @@
9//! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) 9//! - **Layer 1**: Announcement discovery (kinds 30617 + 30618)
10//! - **Layer 2**: Repository events (A/a tags for shared repos) 10//! - **Layer 2**: Repository events (A/a tags for shared repos)
11//! - **Layer 3**: Related events (E/e tags for discussions, reviews) 11//! - **Layer 3**: Related events (E/e tags for discussions, reviews)
12//!
13//! ## Resilience & Health Tracking (Phase 3)
14//!
15//! - **Health tracking**: Per-relay connection health states (Healthy, Degraded, Dead)
16//! - **Exponential backoff**: Smart retry delays on failures (5s -> 1h max)
17//! - **Dead relay handling**: Minimal retry for 24h+ failed relays
18//! - **Startup jitter**: Prevent thundering herd on launch (0-10s random delay)
12 19
13mod connection; 20mod connection;
14mod filter; 21mod filter;
22pub mod health;
15mod manager; 23mod manager;
16 24
17pub use filter::FilterService; 25pub use filter::FilterService;
26pub use health::{HealthState, RelayHealth, RelayHealthTracker};
18pub use manager::SyncManager; 27pub use manager::SyncManager;
19 28
20use std::net::SocketAddr; 29use std::net::SocketAddr;