upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:36:08 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:36:08 +0000
commitea561062c0f08d608f48b6ccd6f8a4b8743b6e3b (patch)
tree1014aa98783cf1bc6d910f0e4db19b7d5d5d2e62 /src/sync
parent39e782b12fce1776f2ad0b0f5430749533cb80ea (diff)
sync: integrate health tracking and connection storage
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/health.rs475
-rw-r--r--src/sync/mod.rs13
2 files changed, 488 insertions, 0 deletions
diff --git a/src/sync/health.rs b/src/sync/health.rs
new file mode 100644
index 0000000..51bd5ae
--- /dev/null
+++ b/src/sync/health.rs
@@ -0,0 +1,475 @@
1//! Relay Health Tracking for GRASP-02 Proactive Sync
2//!
3//! This module implements health tracking for relay connections, including:
4//! - Health state machine (Healthy -> Degraded -> Dead)
5//! - Exponential backoff with configurable max delay
6//! - Dead relay detection after 24h of continuous failures
7//!
8//! ## Health States
9//!
10//! - **Healthy**: Working connection, no recent failures
11//! - **Degraded**: Connection failed, retrying with backoff
12//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day)
13
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18
19use crate::config::Config;
20
21/// Duration threshold before a relay is considered dead (24 hours)
22const DEAD_THRESHOLD_HOURS: u64 = 24;
23
24/// How often dead relays are retried (once per 24 hours)
25const DEAD_RETRY_INTERVAL_HOURS: u64 = 24;
26
27/// Default maximum backoff duration in seconds (1 hour)
28const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600;
29
30/// Base backoff duration in seconds
31const BASE_BACKOFF_SECS: u64 = 5;
32
33/// Health state of a relay connection
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum HealthState {
36 /// Working connection, no recent failures
37 Healthy,
38 /// Connection failed, retrying with exponential backoff
39 Degraded,
40 /// 24h+ of continuous failures, minimal retry
41 Dead,
42}
43
44impl std::fmt::Display for HealthState {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 HealthState::Healthy => write!(f, "healthy"),
48 HealthState::Degraded => write!(f, "degraded"),
49 HealthState::Dead => write!(f, "dead"),
50 }
51 }
52}
53
54/// Health information for a single relay
55#[derive(Debug, Clone)]
56pub struct RelayHealth {
57 /// Current health state
58 pub state: HealthState,
59 /// Number of consecutive connection failures
60 pub consecutive_failures: u32,
61 /// Time of the first failure in the current failure streak
62 pub first_failure_time: Option<Instant>,
63 /// Time of the last failure
64 pub last_failure_time: Option<Instant>,
65 /// Time of the last successful connection
66 pub last_success_time: Option<Instant>,
67 /// Next time a connection attempt should be made
68 pub next_retry_at: Option<Instant>,
69}
70
71impl Default for RelayHealth {
72 fn default() -> Self {
73 Self {
74 state: HealthState::Healthy,
75 consecutive_failures: 0,
76 first_failure_time: None,
77 last_failure_time: None,
78 last_success_time: None,
79 next_retry_at: None,
80 }
81 }
82}
83
84impl RelayHealth {
85 /// Create a new RelayHealth with healthy state
86 pub fn new() -> Self {
87 Self::default()
88 }
89}
90
91/// Thread-safe relay health tracker using DashMap
92#[derive(Debug)]
93pub struct RelayHealthTracker {
94 health: DashMap<String, RelayHealth>,
95 max_backoff_secs: u64,
96}
97
98impl RelayHealthTracker {
99 /// Create a new RelayHealthTracker
100 pub fn new(config: &Config) -> Self {
101 Self {
102 health: DashMap::new(),
103 max_backoff_secs: config.sync_max_backoff_secs,
104 }
105 }
106
107 /// Create a new RelayHealthTracker with default settings
108 pub fn with_defaults() -> Self {
109 Self {
110 health: DashMap::new(),
111 max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS,
112 }
113 }
114
115 /// Create a new RelayHealthTracker with custom max backoff
116 pub fn with_max_backoff(max_backoff_secs: u64) -> Self {
117 Self {
118 health: DashMap::new(),
119 max_backoff_secs,
120 }
121 }
122
123 /// Record a successful connection to a relay
124 ///
125 /// Resets the relay to Healthy state and clears failure counters.
126 pub fn record_success(&self, relay_url: &str) {
127 let now = Instant::now();
128 let mut entry = self.health.entry(relay_url.to_string()).or_default();
129 let health = entry.value_mut();
130
131 let old_state = health.state;
132
133 // Reset to healthy state
134 health.state = HealthState::Healthy;
135 health.consecutive_failures = 0;
136 health.first_failure_time = None;
137 health.last_failure_time = None;
138 health.last_success_time = Some(now);
139 health.next_retry_at = None;
140
141 if old_state != HealthState::Healthy {
142 tracing::info!(
143 "Relay {} recovered to healthy (was {:?})",
144 relay_url,
145 old_state
146 );
147 }
148 }
149
150 /// Record a connection failure for a relay
151 ///
152 /// Increments failure counter, updates state, and calculates next retry time.
153 pub fn record_failure(&self, relay_url: &str) {
154 let now = Instant::now();
155 let mut entry = self.health.entry(relay_url.to_string()).or_default();
156 let health = entry.value_mut();
157
158 let old_state = health.state;
159
160 // Set first_failure_time if this is a new failure streak
161 if health.first_failure_time.is_none() {
162 health.first_failure_time = Some(now);
163 }
164
165 health.consecutive_failures = health.consecutive_failures.saturating_add(1);
166 health.last_failure_time = Some(now);
167
168 // Check if we should transition to Dead state
169 if let Some(first_failure) = health.first_failure_time {
170 let failure_duration = now.duration_since(first_failure);
171 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600);
172
173 if failure_duration >= dead_threshold {
174 health.state = HealthState::Dead;
175 // Dead relays retry once per day
176 health.next_retry_at =
177 Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600));
178
179 if old_state != HealthState::Dead {
180 tracing::warn!(
181 "Relay {} marked dead after 24h failures ({} consecutive failures)",
182 relay_url,
183 health.consecutive_failures
184 );
185 }
186 } else {
187 // Degraded state with exponential backoff
188 health.state = HealthState::Degraded;
189 let backoff = Self::get_backoff_duration(
190 health.consecutive_failures,
191 self.max_backoff_secs,
192 );
193 health.next_retry_at = Some(now + backoff);
194
195 if old_state != HealthState::Degraded {
196 tracing::warn!(
197 "Relay {} degraded, backoff {:?}",
198 relay_url,
199 backoff
200 );
201 } else {
202 tracing::debug!(
203 "Relay {} failure #{}, backoff {:?}",
204 relay_url,
205 health.consecutive_failures,
206 backoff
207 );
208 }
209 }
210 }
211 }
212
213 /// Check if a connection attempt should be made to a relay
214 ///
215 /// Returns true if:
216 /// - The relay has no health record (first attempt)
217 /// - The relay is healthy
218 /// - The backoff period has elapsed
219 pub fn should_attempt_connection(&self, relay_url: &str) -> bool {
220 let entry = self.health.get(relay_url);
221
222 match entry {
223 None => true, // No record, allow first attempt
224 Some(entry) => {
225 let health = entry.value();
226
227 match health.state {
228 HealthState::Healthy => true,
229 HealthState::Degraded | HealthState::Dead => {
230 // Check if backoff period has elapsed
231 match health.next_retry_at {
232 None => true,
233 Some(next_retry) => Instant::now() >= next_retry,
234 }
235 }
236 }
237 }
238 }
239 }
240
241 /// Get the current health state of a relay
242 pub fn get_state(&self, relay_url: &str) -> HealthState {
243 self.health
244 .get(relay_url)
245 .map(|entry| entry.value().state)
246 .unwrap_or(HealthState::Healthy)
247 }
248
249 /// Check if a relay is marked as dead
250 pub fn is_dead(&self, relay_url: &str) -> bool {
251 self.get_state(relay_url) == HealthState::Dead
252 }
253
254 /// Get the remaining backoff duration for a relay
255 ///
256 /// Returns None if no backoff is active.
257 pub fn get_remaining_backoff(&self, relay_url: &str) -> Option<Duration> {
258 let entry = self.health.get(relay_url)?;
259 let health = entry.value();
260 let next_retry = health.next_retry_at?;
261 let now = Instant::now();
262
263 if now >= next_retry {
264 None
265 } else {
266 Some(next_retry - now)
267 }
268 }
269
270 /// Get the consecutive failure count for a relay
271 pub fn get_failure_count(&self, relay_url: &str) -> u32 {
272 self.health
273 .get(relay_url)
274 .map(|entry| entry.value().consecutive_failures)
275 .unwrap_or(0)
276 }
277
278 /// Calculate the backoff duration based on failure count
279 ///
280 /// Uses exponential backoff: base * 2^failures, capped at max_backoff
281 pub fn get_backoff_duration(consecutive_failures: u32, max_backoff_secs: u64) -> Duration {
282 let backoff_secs = BASE_BACKOFF_SECS
283 .saturating_mul(2u64.saturating_pow(consecutive_failures.saturating_sub(1)));
284 Duration::from_secs(backoff_secs.min(max_backoff_secs))
285 }
286
287 /// Get all tracked relay URLs
288 pub fn get_tracked_relays(&self) -> Vec<String> {
289 self.health.iter().map(|entry| entry.key().clone()).collect()
290 }
291
292 /// Get a clone of the health info for a relay
293 pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> {
294 self.health.get(relay_url).map(|entry| entry.value().clone())
295 }
296}
297
298/// Create a shared RelayHealthTracker wrapped in Arc
299pub fn create_health_tracker(config: &Config) -> Arc<RelayHealthTracker> {
300 Arc::new(RelayHealthTracker::new(config))
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_health_state_display() {
309 assert_eq!(HealthState::Healthy.to_string(), "healthy");
310 assert_eq!(HealthState::Degraded.to_string(), "degraded");
311 assert_eq!(HealthState::Dead.to_string(), "dead");
312 }
313
314 #[test]
315 fn test_default_health_is_healthy() {
316 let health = RelayHealth::default();
317 assert_eq!(health.state, HealthState::Healthy);
318 assert_eq!(health.consecutive_failures, 0);
319 assert!(health.first_failure_time.is_none());
320 }
321
322 #[test]
323 fn test_should_attempt_connection_new_relay() {
324 let tracker = RelayHealthTracker::with_defaults();
325 assert!(tracker.should_attempt_connection("wss://new-relay.example.com"));
326 }
327
328 #[test]
329 fn test_record_success_resets_to_healthy() {
330 let tracker = RelayHealthTracker::with_defaults();
331 let url = "wss://test-relay.example.com";
332
333 // Simulate a few failures
334 tracker.record_failure(url);
335 tracker.record_failure(url);
336 assert_eq!(tracker.get_state(url), HealthState::Degraded);
337 assert_eq!(tracker.get_failure_count(url), 2);
338
339 // Record success
340 tracker.record_success(url);
341 assert_eq!(tracker.get_state(url), HealthState::Healthy);
342 assert_eq!(tracker.get_failure_count(url), 0);
343 assert!(tracker.should_attempt_connection(url));
344 }
345
346 #[test]
347 fn test_backoff_increases_exponentially() {
348 // failure 1: 5s
349 assert_eq!(
350 RelayHealthTracker::get_backoff_duration(1, 3600),
351 Duration::from_secs(5)
352 );
353 // failure 2: 10s
354 assert_eq!(
355 RelayHealthTracker::get_backoff_duration(2, 3600),
356 Duration::from_secs(10)
357 );
358 // failure 3: 20s
359 assert_eq!(
360 RelayHealthTracker::get_backoff_duration(3, 3600),
361 Duration::from_secs(20)
362 );
363 // failure 4: 40s
364 assert_eq!(
365 RelayHealthTracker::get_backoff_duration(4, 3600),
366 Duration::from_secs(40)
367 );
368 // failure 5: 80s
369 assert_eq!(
370 RelayHealthTracker::get_backoff_duration(5, 3600),
371 Duration::from_secs(80)
372 );
373 }
374
375 #[test]
376 fn test_backoff_capped_at_max() {
377 let max_backoff = 3600u64;
378 // After many failures, should cap at max_backoff (1 hour)
379 assert_eq!(
380 RelayHealthTracker::get_backoff_duration(20, max_backoff),
381 Duration::from_secs(max_backoff)
382 );
383 }
384
385 #[test]
386 fn test_degraded_state_after_failure() {
387 let tracker = RelayHealthTracker::with_defaults();
388 let url = "wss://test-relay.example.com";
389
390 tracker.record_failure(url);
391 assert_eq!(tracker.get_state(url), HealthState::Degraded);
392 assert_eq!(tracker.get_failure_count(url), 1);
393 }
394
395 #[test]
396 fn test_backoff_blocks_immediate_reconnection() {
397 let tracker = RelayHealthTracker::with_defaults();
398 let url = "wss://test-relay.example.com";
399
400 tracker.record_failure(url);
401
402 // Immediately after failure, should not attempt (backoff active)
403 assert!(!tracker.should_attempt_connection(url));
404
405 // Remaining backoff should be some positive duration
406 let remaining = tracker.get_remaining_backoff(url);
407 assert!(remaining.is_some());
408 assert!(remaining.unwrap() > Duration::ZERO);
409 }
410
411 #[test]
412 fn test_is_dead() {
413 let tracker = RelayHealthTracker::with_defaults();
414 let url = "wss://test-relay.example.com";
415
416 // Initially not dead
417 assert!(!tracker.is_dead(url));
418
419 // After a failure, still not dead (just degraded)
420 tracker.record_failure(url);
421 assert!(!tracker.is_dead(url));
422 assert_eq!(tracker.get_state(url), HealthState::Degraded);
423 }
424
425 #[test]
426 fn test_get_tracked_relays() {
427 let tracker = RelayHealthTracker::with_defaults();
428
429 tracker.record_success("wss://relay1.example.com");
430 tracker.record_failure("wss://relay2.example.com");
431
432 let tracked = tracker.get_tracked_relays();
433 assert_eq!(tracked.len(), 2);
434 assert!(tracked.contains(&"wss://relay1.example.com".to_string()));
435 assert!(tracked.contains(&"wss://relay2.example.com".to_string()));
436 }
437
438 #[test]
439 fn test_custom_max_backoff() {
440 let custom_max = 60u64; // 1 minute max
441 let tracker = RelayHealthTracker::with_max_backoff(custom_max);
442 let url = "wss://test-relay.example.com";
443
444 // Simulate many failures
445 for _ in 0..20 {
446 tracker.record_failure(url);
447 }
448
449 // The remaining backoff should respect the custom max
450 // Note: We can't easily test the internal backoff calculation here,
451 // but we can verify the tracker was created with the custom setting
452 assert_eq!(tracker.max_backoff_secs, custom_max);
453 }
454
455 #[test]
456 fn test_get_health_returns_clone() {
457 let tracker = RelayHealthTracker::with_defaults();
458 let url = "wss://test-relay.example.com";
459
460 tracker.record_success(url);
461 let health = tracker.get_health(url);
462
463 assert!(health.is_some());
464 let health = health.unwrap();
465 assert_eq!(health.state, HealthState::Healthy);
466 assert!(health.last_success_time.is_some());
467 }
468
469 #[test]
470 fn test_get_health_nonexistent() {
471 let tracker = RelayHealthTracker::with_defaults();
472 let health = tracker.get_health("wss://nonexistent.example.com");
473 assert!(health.is_none());
474 }
475} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index fb09896..9ac62ed 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -14,6 +14,7 @@
14 14
15pub mod algorithms; 15pub mod algorithms;
16pub mod filters; 16pub mod filters;
17pub mod health;
17pub mod relay_connection; 18pub mod relay_connection;
18pub mod self_subscriber; 19pub mod self_subscriber;
19 20
@@ -26,6 +27,9 @@ pub use relay_connection::{RelayConnection, RelayEvent};
26// Re-export self-subscriber types 27// Re-export self-subscriber types
27pub use self_subscriber::{RelayAction, SelfSubscriber}; 28pub use self_subscriber::{RelayAction, SelfSubscriber};
28 29
30// Re-export health tracking types
31pub use health::RelayHealthTracker;
32
29use std::collections::{HashMap, HashSet}; 33use std::collections::{HashMap, HashSet};
30use std::sync::Arc; 34use std::sync::Arc;
31 35
@@ -339,6 +343,12 @@ pub struct SyncManager {
339 relay_sync_index: RelaySyncIndex, 343 relay_sync_index: RelaySyncIndex,
340 /// In-flight subscription batches 344 /// In-flight subscription batches
341 pending_sync_index: PendingSyncIndex, 345 pending_sync_index: PendingSyncIndex,
346 /// Active relay connections - keyed by relay URL
347 connections: HashMap<String, RelayConnection>,
348 /// Health tracker for relay connection state
349 health_tracker: Arc<RelayHealthTracker>,
350 /// Counter for generating unique batch IDs
351 next_batch_id: u64,
342} 352}
343 353
344impl SyncManager { 354impl SyncManager {
@@ -366,6 +376,9 @@ impl SyncManager {
366 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 376 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
367 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 377 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
368 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 378 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
379 connections: HashMap::new(),
380 health_tracker: Arc::new(RelayHealthTracker::new(config)),
381 next_batch_id: 0,
369 } 382 }
370 } 383 }
371 384