upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/health.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:23:46 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:23:46 +0000
commit541f34a207047b26547154e7d631005d456f12fd (patch)
tree446cffc4b3bbc32bf61933b5ab41a044a35d6f3b /src/sync/health.rs
parentb10a6cc91dab4c3d83d62fe8cb357c78f2cd4d1e (diff)
sync: add req rate-limit detection and cooldown
Diffstat (limited to 'src/sync/health.rs')
-rw-r--r--src/sync/health.rs300
1 files changed, 259 insertions, 41 deletions
diff --git a/src/sync/health.rs b/src/sync/health.rs
index d919a80..a10427f 100644
--- a/src/sync/health.rs
+++ b/src/sync/health.rs
@@ -1,15 +1,17 @@
1//! Relay Health Tracking for GRASP-02 Proactive Sync 1//! Relay Health Tracking for GRASP-02 Proactive Sync
2//! 2//!
3//! This module implements health tracking for relay connections, including: 3//! This module implements health tracking for relay connections, including:
4//! - Health state machine (Healthy -> Degraded -> Dead) 4//! - Health state machine (Healthy -> Degraded -> Dead -> RateLimited)
5//! - Exponential backoff with configurable max delay 5//! - Exponential backoff with configurable max delay
6//! - Dead relay detection after 24h of continuous failures 6//! - Dead relay detection after 24h of continuous failures
7//! - Rate limit detection and fixed cooldown period
7//! 8//!
8//! ## Health States 9//! ## Health States
9//! 10//!
10//! - **Healthy**: Working connection, no recent failures 11//! - **Healthy**: Working connection, no recent failures
11//! - **Degraded**: Connection failed, retrying with backoff 12//! - **Degraded**: Connection failed, retrying with backoff
12//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) 13//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day)
14//! - **RateLimited**: NOTICE-triggered 65-second cooldown to avoid rate limits
13 15
14use std::sync::Arc; 16use std::sync::Arc;
15use std::time::{Duration, Instant}; 17use std::time::{Duration, Instant};
@@ -30,37 +32,52 @@ const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600;
30/// Default base backoff duration in seconds 32/// Default base backoff duration in seconds
31const DEFAULT_BASE_BACKOFF_SECS: u64 = 5; 33const DEFAULT_BASE_BACKOFF_SECS: u64 = 5;
32 34
35/// Rate limit cooldown duration in seconds (65 seconds = typical 60s limit + buffer)
36const RATE_LIMIT_COOLDOWN_SECS: u64 = 65;
37
38/// Stability period after recovery before marking relay as fully healthy (5 minutes)
39/// A relay must maintain connection for this duration after failures before being marked Healthy
40const STABILITY_PERIOD_SECS: u64 = 300;
41
33/// Health state of a relay connection 42/// Health state of a relay connection
34#[derive(Debug, Clone, Copy, PartialEq, Eq)] 43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum HealthState { 44pub enum HealthState {
36 /// Working connection, no recent failures 45 /// Working connection, no recent failures, proven stable
37 Healthy, 46 Healthy,
38 /// Connection failed, retrying with exponential backoff 47 /// Not currently connected, but no recent failures or issues
48 Disconnected,
49 /// Connection problems: failing to connect OR recently recovered but not yet stable
39 Degraded, 50 Degraded,
40 /// 24h+ of continuous failures, minimal retry 51 /// 24h+ of continuous failures, minimal retry
41 Dead, 52 Dead,
53 /// Rate limited by relay, temporary cooldown active
54 RateLimited,
42} 55}
43 56
44impl std::fmt::Display for HealthState { 57impl std::fmt::Display for HealthState {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self { 59 match self {
47 HealthState::Healthy => write!(f, "healthy"), 60 HealthState::Healthy => write!(f, "healthy"),
61 HealthState::Disconnected => write!(f, "disconnected"),
48 HealthState::Degraded => write!(f, "degraded"), 62 HealthState::Degraded => write!(f, "degraded"),
49 HealthState::Dead => write!(f, "dead"), 63 HealthState::Dead => write!(f, "dead"),
64 HealthState::RateLimited => write!(f, "rate_limited"),
50 } 65 }
51 } 66 }
52} 67}
53 68
54/// Health information for a single relay 69/// Health information for a single relay
55#[derive(Debug, Clone)] 70#[derive(Debug, Clone, Default)]
56pub struct RelayHealth { 71pub struct RelayHealth {
57 /// Current health state 72 /// Are we currently connected to this relay
58 pub state: HealthState, 73 pub connected: bool,
74 /// Has this relay sent us a rate-limiting NOTICE recently
75 pub rate_limited: bool,
59 /// Number of consecutive connection failures 76 /// Number of consecutive connection failures
60 pub consecutive_failures: u32, 77 pub consecutive_failures: u32,
61 /// Time of the first failure in the current failure streak 78 /// Time of the first failure in the current failure streak
62 pub first_failure_time: Option<Instant>, 79 pub first_failure_time: Option<Instant>,
63 /// Time of the last failure 80 /// Time of the last failure (kept after recovery for stability period tracking)
64 pub last_failure_time: Option<Instant>, 81 pub last_failure_time: Option<Instant>,
65 /// Time of the last successful connection 82 /// Time of the last successful connection
66 pub last_success_time: Option<Instant>, 83 pub last_success_time: Option<Instant>,
@@ -70,25 +87,122 @@ pub struct RelayHealth {
70 pub next_retry_at: Option<Instant>, 87 pub next_retry_at: Option<Instant>,
71} 88}
72 89
73impl Default for RelayHealth {
74 fn default() -> Self {
75 Self {
76 state: HealthState::Healthy,
77 consecutive_failures: 0,
78 first_failure_time: None,
79 last_failure_time: None,
80 last_success_time: None,
81 last_attempt_time: None,
82 next_retry_at: None,
83 }
84 }
85}
86
87impl RelayHealth { 90impl RelayHealth {
88 /// Create a new RelayHealth with healthy state 91 /// Create a new RelayHealth with default values
89 pub fn new() -> Self { 92 pub fn new() -> Self {
90 Self::default() 93 Self::default()
91 } 94 }
95
96 /// Get the current health state based on the relay's properties
97 ///
98 /// State is computed dynamically from:
99 /// - Rate limit status
100 /// - Connection status
101 /// - Failure history and timing
102 /// - Stability period after recovery
103 ///
104 /// ## State Logic
105 ///
106 /// 1. **RateLimited**: If rate_limited flag is set and cooldown hasn't expired
107 /// 2. **Dead**: 24+ hours of continuous failures
108 /// 3. **Degraded**: Active connection failures OR in stability period after recovery
109 /// 4. **Disconnected**: Not connected, but no recent failures or issues
110 /// 5. **Healthy**: Connected and stable (past stability period with no failures)
111 pub fn state(&self) -> HealthState {
112 let now = Instant::now();
113
114 // Check rate limiting first (highest priority)
115 if self.rate_limited {
116 if let Some(next_retry) = self.next_retry_at {
117 if now < next_retry {
118 return HealthState::RateLimited;
119 }
120 }
121 }
122
123 // Check for dead state (24+ hours of failures)
124 if let Some(first_failure) = self.first_failure_time {
125 let failure_duration = now.duration_since(first_failure);
126 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600);
127 if failure_duration >= dead_threshold {
128 return HealthState::Dead;
129 }
130 }
131
132 // Check if we have active failures (currently failing to connect)
133 if self.consecutive_failures > 0 {
134 return HealthState::Degraded;
135 }
136
137 // Check if we're in stability period after recovery
138 // (recovered from failures but not yet proven stable)
139 if let (Some(last_success), Some(last_failure)) = (self.last_success_time, self.last_failure_time) {
140 // Only consider stability period if recovery happened after the last failure
141 if last_success > last_failure {
142 let time_since_recovery = now.duration_since(last_success);
143 let stability_period = Duration::from_secs(STABILITY_PERIOD_SECS);
144
145 if time_since_recovery < stability_period {
146 // Still in stability period - remain degraded to prove stability
147 return HealthState::Degraded;
148 }
149 }
150 }
151
152 // Check connection status for final state
153 if self.connected {
154 // Connected and stable (no failures, past stability period)
155 HealthState::Healthy
156 } else {
157 // Not connected, but no recent failures - just disconnected
158 HealthState::Disconnected
159 }
160 }
161
162 /// Check if the relay is currently connected
163 pub fn is_connected(&self) -> bool {
164 self.connected
165 }
166
167 /// Check if the relay is currently rate limited (cooldown active)
168 pub fn is_rate_limited_now(&self) -> bool {
169 if !self.rate_limited {
170 return false;
171 }
172 if let Some(next_retry) = self.next_retry_at {
173 Instant::now() < next_retry
174 } else {
175 false
176 }
177 }
178
179 /// Get the consecutive failure count
180 pub fn failure_count(&self) -> u32 {
181 self.consecutive_failures
182 }
183
184 /// Get time since last successful connection
185 pub fn time_since_last_success(&self) -> Option<Duration> {
186 self.last_success_time
187 .map(|t| Instant::now().duration_since(t))
188 }
189
190 /// Get time since first failure in current streak
191 pub fn time_since_first_failure(&self) -> Option<Duration> {
192 self.first_failure_time
193 .map(|t| Instant::now().duration_since(t))
194 }
195
196 /// Get remaining backoff/cooldown duration
197 pub fn remaining_backoff(&self) -> Option<Duration> {
198 let next_retry = self.next_retry_at?;
199 let now = Instant::now();
200 if now >= next_retry {
201 None
202 } else {
203 Some(next_retry - now)
204 }
205 }
92} 206}
93 207
94/// Thread-safe relay health tracker using DashMap 208/// Thread-safe relay health tracker using DashMap
@@ -148,16 +262,17 @@ impl RelayHealthTracker {
148 262
149 /// Record a successful connection to a relay 263 /// Record a successful connection to a relay
150 /// 264 ///
151 /// Resets the relay to Healthy state and clears failure counters. 265 /// Clears failure counters and rate limiting. Sets connected = true.
152 pub fn record_success(&self, relay_url: &str) { 266 pub fn record_success(&self, relay_url: &str) {
153 let now = Instant::now(); 267 let now = Instant::now();
154 let mut entry = self.health.entry(relay_url.to_string()).or_default(); 268 let mut entry = self.health.entry(relay_url.to_string()).or_default();
155 let health = entry.value_mut(); 269 let health = entry.value_mut();
156 270
157 let old_state = health.state; 271 let old_state = health.state();
158 272
159 // Reset to healthy state 273 // Reset to healthy state
160 health.state = HealthState::Healthy; 274 health.connected = true;
275 health.rate_limited = false;
161 health.consecutive_failures = 0; 276 health.consecutive_failures = 0;
162 health.first_failure_time = None; 277 health.first_failure_time = None;
163 health.last_failure_time = None; 278 health.last_failure_time = None;
@@ -176,13 +291,17 @@ impl RelayHealthTracker {
176 291
177 /// Record a connection failure for a relay 292 /// Record a connection failure for a relay
178 /// 293 ///
179 /// Increments failure counter, updates state, and calculates next retry time. 294 /// Increments failure counter and calculates next retry time with exponential backoff.
295 /// Sets connected = false.
180 pub fn record_failure(&self, relay_url: &str) { 296 pub fn record_failure(&self, relay_url: &str) {
181 let now = Instant::now(); 297 let now = Instant::now();
182 let mut entry = self.health.entry(relay_url.to_string()).or_default(); 298 let mut entry = self.health.entry(relay_url.to_string()).or_default();
183 let health = entry.value_mut(); 299 let health = entry.value_mut();
184 300
185 let old_state = health.state; 301 let old_state = health.state();
302
303 // Mark as disconnected
304 health.connected = false;
186 305
187 // Set first_failure_time if this is a new failure streak 306 // Set first_failure_time if this is a new failure streak
188 if health.first_failure_time.is_none() { 307 if health.first_failure_time.is_none() {
@@ -192,18 +311,18 @@ impl RelayHealthTracker {
192 health.consecutive_failures = health.consecutive_failures.saturating_add(1); 311 health.consecutive_failures = health.consecutive_failures.saturating_add(1);
193 health.last_failure_time = Some(now); 312 health.last_failure_time = Some(now);
194 313
195 // Check if we should transition to Dead state 314 // Calculate backoff based on whether we're dead or degraded
196 if let Some(first_failure) = health.first_failure_time { 315 if let Some(first_failure) = health.first_failure_time {
197 let failure_duration = now.duration_since(first_failure); 316 let failure_duration = now.duration_since(first_failure);
198 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); 317 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600);
199 318
200 if failure_duration >= dead_threshold { 319 if failure_duration >= dead_threshold {
201 health.state = HealthState::Dead;
202 // Dead relays retry once per day 320 // Dead relays retry once per day
203 health.next_retry_at = 321 health.next_retry_at =
204 Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); 322 Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600));
205 323
206 if old_state != HealthState::Dead { 324 let new_state = health.state();
325 if old_state != HealthState::Dead && new_state == HealthState::Dead {
207 tracing::warn!( 326 tracing::warn!(
208 "Relay {} marked dead after 24h failures ({} consecutive failures)", 327 "Relay {} marked dead after 24h failures ({} consecutive failures)",
209 relay_url, 328 relay_url,
@@ -212,15 +331,21 @@ impl RelayHealthTracker {
212 } 331 }
213 } else { 332 } else {
214 // Degraded state with exponential backoff 333 // Degraded state with exponential backoff
215 health.state = HealthState::Degraded;
216 let backoff = Self::get_backoff_duration( 334 let backoff = Self::get_backoff_duration(
217 health.consecutive_failures, 335 health.consecutive_failures,
218 self.base_backoff_secs, 336 self.base_backoff_secs,
219 self.max_backoff_secs, 337 self.max_backoff_secs,
220 ); 338 );
221 health.next_retry_at = Some(now + backoff); 339 // Respect existing next_retry_at if it's later (e.g., from rate limiting)
340 let new_retry_at = now + backoff;
341 health.next_retry_at = Some(
342 health.next_retry_at
343 .unwrap_or(new_retry_at)
344 .max(new_retry_at)
345 );
222 346
223 if old_state != HealthState::Degraded { 347 let new_state = health.state();
348 if old_state != HealthState::Degraded && new_state == HealthState::Degraded {
224 tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); 349 tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff);
225 } else { 350 } else {
226 tracing::debug!( 351 tracing::debug!(
@@ -234,6 +359,91 @@ impl RelayHealthTracker {
234 } 359 }
235 } 360 }
236 361
362 /// Record a rate limit NOTICE from a relay
363 ///
364 /// Sets the relay to RateLimited state with a fixed 65-second cooldown.
365 /// This is distinct from connection failures (Degraded state) - it's triggered
366 /// by NOTICE messages from the relay indicating we're sending too many requests.
367 pub fn record_rate_limit(&self, relay_url: &str) {
368 let now = Instant::now();
369 let mut entry = self.health.entry(relay_url.to_string()).or_default();
370 let health = entry.value_mut();
371
372 health.rate_limited = true;
373 health.next_retry_at = Some(now + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS));
374
375 tracing::warn!(
376 relay = %relay_url,
377 cooldown_secs = RATE_LIMIT_COOLDOWN_SECS,
378 "Relay rate limited, pausing new subscriptions"
379 );
380 }
381
382 /// Clear rate limiting state for a specific relay
383 ///
384 /// This only clears the rate_limited flag, without affecting connection status
385 /// or failure counters. Use this when rate limit cooldown has expired and we
386 /// want to allow new subscriptions.
387 ///
388 /// This is different from `record_success()` which resets all health state.
389 pub fn clear_rate_limit(&self, relay_url: &str) {
390 if let Some(mut entry) = self.health.get_mut(relay_url) {
391 let health = entry.value_mut();
392 health.rate_limited = false;
393 }
394 }
395
396
397 /// Check if relay is currently rate limited
398 ///
399 /// Returns true if the relay is in RateLimited state and the cooldown period
400 /// has not yet expired. Once the cooldown expires, this returns false and the
401 /// relay can accept new subscriptions again.
402 pub fn is_rate_limited(&self, relay_url: &str) -> bool {
403 if let Some(entry) = self.health.get(relay_url) {
404 let health = entry.value();
405 health.rate_limited
406 } else {
407 false
408 }
409 }
410
411 /// Exit rate limiting state for relays whose cooldown has expired
412 ///
413 /// Finds all relays that are currently rate limited but whose cooldown period
414 /// has expired, clears their rate_limited flag, and returns their URLs.
415 ///
416 /// This method mutates state by clearing the rate_limited flag for recovered relays.
417 ///
418 /// Returns a vector of relay URLs that were recovered from rate limiting.
419 pub fn exit_expired_rate_limits(&self) -> Vec<String> {
420 let now = Instant::now();
421 let mut recovered_relays = Vec::new();
422
423 for mut entry in self.health.iter_mut() {
424 let (url, health) = entry.pair_mut();
425
426 // Check if rate limited and cooldown has expired
427 if health.rate_limited {
428 if let Some(next_retry) = health.next_retry_at {
429 if now > next_retry {
430 // Cooldown expired - clear rate limiting
431 health.rate_limited = false;
432 health.next_retry_at = None;
433 recovered_relays.push(url.clone());
434
435 tracing::info!(
436 relay = %url,
437 "Rate limit cooldown expired, relay ready for new subscriptions"
438 );
439 }
440 }
441 }
442 }
443
444 recovered_relays
445 }
446
237 /// Check if a connection attempt should be made to a relay 447 /// Check if a connection attempt should be made to a relay
238 /// 448 ///
239 /// Returns true if: 449 /// Returns true if:
@@ -248,10 +458,16 @@ impl RelayHealthTracker {
248 Some(entry) => { 458 Some(entry) => {
249 let health = entry.value(); 459 let health = entry.value();
250 460
251 match health.state { 461 // Don't reconnect if currently rate-limited
252 HealthState::Healthy => true, 462 if health.is_rate_limited_now() {
253 HealthState::Degraded | HealthState::Dead => { 463 return false;
254 // Check if backoff period has elapsed 464 }
465
466 // Check state-based logic
467 match health.state() {
468 HealthState::Healthy | HealthState::Disconnected => true,
469 HealthState::Degraded | HealthState::Dead | HealthState::RateLimited => {
470 // Check if backoff/cooldown period has elapsed
255 match health.next_retry_at { 471 match health.next_retry_at {
256 None => true, 472 None => true,
257 Some(next_retry) => Instant::now() >= next_retry, 473 Some(next_retry) => Instant::now() >= next_retry,
@@ -266,7 +482,7 @@ impl RelayHealthTracker {
266 pub fn get_state(&self, relay_url: &str) -> HealthState { 482 pub fn get_state(&self, relay_url: &str) -> HealthState {
267 self.health 483 self.health
268 .get(relay_url) 484 .get(relay_url)
269 .map(|entry| entry.value().state) 485 .map(|entry| entry.value().state())
270 .unwrap_or(HealthState::Healthy) 486 .unwrap_or(HealthState::Healthy)
271 } 487 }
272 488
@@ -350,10 +566,12 @@ mod tests {
350 } 566 }
351 567
352 #[test] 568 #[test]
353 fn test_default_health_is_healthy() { 569 fn test_default_health_is_disconnected() {
354 let health = RelayHealth::default(); 570 let health = RelayHealth::default();
355 assert_eq!(health.state, HealthState::Healthy); 571 // Default state: not connected, no failures = Disconnected
572 assert_eq!(health.state(), HealthState::Disconnected);
356 assert_eq!(health.consecutive_failures, 0); 573 assert_eq!(health.consecutive_failures, 0);
574 assert!(!health.connected);
357 assert!(health.first_failure_time.is_none()); 575 assert!(health.first_failure_time.is_none());
358 } 576 }
359 577
@@ -504,7 +722,7 @@ mod tests {
504 722
505 assert!(health.is_some()); 723 assert!(health.is_some());
506 let health = health.unwrap(); 724 let health = health.unwrap();
507 assert_eq!(health.state, HealthState::Healthy); 725 assert_eq!(health.state(), HealthState::Healthy);
508 assert!(health.last_success_time.is_some()); 726 assert!(health.last_success_time.is_some());
509 } 727 }
510 728