upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:23:46 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:23:46 +0000
commit541f34a207047b26547154e7d631005d456f12fd (patch)
tree446cffc4b3bbc32bf61933b5ab41a044a35d6f3b /src/sync
parentb10a6cc91dab4c3d83d62fe8cb357c78f2cd4d1e (diff)
sync: add req rate-limit detection and cooldown
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/health.rs300
-rw-r--r--src/sync/metrics.rs16
-rw-r--r--src/sync/mod.rs215
-rw-r--r--src/sync/relay_connection.rs7
4 files changed, 459 insertions, 79 deletions
diff --git a/src/sync/health.rs b/src/sync/health.rs
index d919a80..a10427f 100644
--- a/src/sync/health.rs
+++ b/src/sync/health.rs
@@ -1,15 +1,17 @@
1//! Relay Health Tracking for GRASP-02 Proactive Sync 1//! Relay Health Tracking for GRASP-02 Proactive Sync
2//! 2//!
3//! This module implements health tracking for relay connections, including: 3//! This module implements health tracking for relay connections, including:
4//! - Health state machine (Healthy -> Degraded -> Dead) 4//! - Health state machine (Healthy -> Degraded -> Dead -> RateLimited)
5//! - Exponential backoff with configurable max delay 5//! - Exponential backoff with configurable max delay
6//! - Dead relay detection after 24h of continuous failures 6//! - Dead relay detection after 24h of continuous failures
7//! - Rate limit detection and fixed cooldown period
7//! 8//!
8//! ## Health States 9//! ## Health States
9//! 10//!
10//! - **Healthy**: Working connection, no recent failures 11//! - **Healthy**: Working connection, no recent failures
11//! - **Degraded**: Connection failed, retrying with backoff 12//! - **Degraded**: Connection failed, retrying with backoff
12//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) 13//! - **Dead**: 24h+ of continuous failures, minimal retry (once per day)
14//! - **RateLimited**: NOTICE-triggered 65-second cooldown to avoid rate limits
13 15
14use std::sync::Arc; 16use std::sync::Arc;
15use std::time::{Duration, Instant}; 17use std::time::{Duration, Instant};
@@ -30,37 +32,52 @@ const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600;
30/// Default base backoff duration in seconds 32/// Default base backoff duration in seconds
31const DEFAULT_BASE_BACKOFF_SECS: u64 = 5; 33const DEFAULT_BASE_BACKOFF_SECS: u64 = 5;
32 34
35/// Rate limit cooldown duration in seconds (65 seconds = typical 60s limit + buffer)
36const RATE_LIMIT_COOLDOWN_SECS: u64 = 65;
37
38/// Stability period after recovery before marking relay as fully healthy (5 minutes)
39/// A relay must maintain connection for this duration after failures before being marked Healthy
40const STABILITY_PERIOD_SECS: u64 = 300;
41
33/// Health state of a relay connection 42/// Health state of a relay connection
34#[derive(Debug, Clone, Copy, PartialEq, Eq)] 43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum HealthState { 44pub enum HealthState {
36 /// Working connection, no recent failures 45 /// Working connection, no recent failures, proven stable
37 Healthy, 46 Healthy,
38 /// Connection failed, retrying with exponential backoff 47 /// Not currently connected, but no recent failures or issues
48 Disconnected,
49 /// Connection problems: failing to connect OR recently recovered but not yet stable
39 Degraded, 50 Degraded,
40 /// 24h+ of continuous failures, minimal retry 51 /// 24h+ of continuous failures, minimal retry
41 Dead, 52 Dead,
53 /// Rate limited by relay, temporary cooldown active
54 RateLimited,
42} 55}
43 56
44impl std::fmt::Display for HealthState { 57impl std::fmt::Display for HealthState {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self { 59 match self {
47 HealthState::Healthy => write!(f, "healthy"), 60 HealthState::Healthy => write!(f, "healthy"),
61 HealthState::Disconnected => write!(f, "disconnected"),
48 HealthState::Degraded => write!(f, "degraded"), 62 HealthState::Degraded => write!(f, "degraded"),
49 HealthState::Dead => write!(f, "dead"), 63 HealthState::Dead => write!(f, "dead"),
64 HealthState::RateLimited => write!(f, "rate_limited"),
50 } 65 }
51 } 66 }
52} 67}
53 68
54/// Health information for a single relay 69/// Health information for a single relay
55#[derive(Debug, Clone)] 70#[derive(Debug, Clone, Default)]
56pub struct RelayHealth { 71pub struct RelayHealth {
57 /// Current health state 72 /// Are we currently connected to this relay
58 pub state: HealthState, 73 pub connected: bool,
74 /// Has this relay sent us a rate-limiting NOTICE recently
75 pub rate_limited: bool,
59 /// Number of consecutive connection failures 76 /// Number of consecutive connection failures
60 pub consecutive_failures: u32, 77 pub consecutive_failures: u32,
61 /// Time of the first failure in the current failure streak 78 /// Time of the first failure in the current failure streak
62 pub first_failure_time: Option<Instant>, 79 pub first_failure_time: Option<Instant>,
63 /// Time of the last failure 80 /// Time of the last failure (kept after recovery for stability period tracking)
64 pub last_failure_time: Option<Instant>, 81 pub last_failure_time: Option<Instant>,
65 /// Time of the last successful connection 82 /// Time of the last successful connection
66 pub last_success_time: Option<Instant>, 83 pub last_success_time: Option<Instant>,
@@ -70,25 +87,122 @@ pub struct RelayHealth {
70 pub next_retry_at: Option<Instant>, 87 pub next_retry_at: Option<Instant>,
71} 88}
72 89
73impl Default for RelayHealth {
74 fn default() -> Self {
75 Self {
76 state: HealthState::Healthy,
77 consecutive_failures: 0,
78 first_failure_time: None,
79 last_failure_time: None,
80 last_success_time: None,
81 last_attempt_time: None,
82 next_retry_at: None,
83 }
84 }
85}
86
87impl RelayHealth { 90impl RelayHealth {
88 /// Create a new RelayHealth with healthy state 91 /// Create a new RelayHealth with default values
89 pub fn new() -> Self { 92 pub fn new() -> Self {
90 Self::default() 93 Self::default()
91 } 94 }
95
96 /// Get the current health state based on the relay's properties
97 ///
98 /// State is computed dynamically from:
99 /// - Rate limit status
100 /// - Connection status
101 /// - Failure history and timing
102 /// - Stability period after recovery
103 ///
104 /// ## State Logic
105 ///
106 /// 1. **RateLimited**: If rate_limited flag is set and cooldown hasn't expired
107 /// 2. **Dead**: 24+ hours of continuous failures
108 /// 3. **Degraded**: Active connection failures OR in stability period after recovery
109 /// 4. **Disconnected**: Not connected, but no recent failures or issues
110 /// 5. **Healthy**: Connected and stable (past stability period with no failures)
111 pub fn state(&self) -> HealthState {
112 let now = Instant::now();
113
114 // Check rate limiting first (highest priority)
115 if self.rate_limited {
116 if let Some(next_retry) = self.next_retry_at {
117 if now < next_retry {
118 return HealthState::RateLimited;
119 }
120 }
121 }
122
123 // Check for dead state (24+ hours of failures)
124 if let Some(first_failure) = self.first_failure_time {
125 let failure_duration = now.duration_since(first_failure);
126 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600);
127 if failure_duration >= dead_threshold {
128 return HealthState::Dead;
129 }
130 }
131
132 // Check if we have active failures (currently failing to connect)
133 if self.consecutive_failures > 0 {
134 return HealthState::Degraded;
135 }
136
137 // Check if we're in stability period after recovery
138 // (recovered from failures but not yet proven stable)
139 if let (Some(last_success), Some(last_failure)) = (self.last_success_time, self.last_failure_time) {
140 // Only consider stability period if recovery happened after the last failure
141 if last_success > last_failure {
142 let time_since_recovery = now.duration_since(last_success);
143 let stability_period = Duration::from_secs(STABILITY_PERIOD_SECS);
144
145 if time_since_recovery < stability_period {
146 // Still in stability period - remain degraded to prove stability
147 return HealthState::Degraded;
148 }
149 }
150 }
151
152 // Check connection status for final state
153 if self.connected {
154 // Connected and stable (no failures, past stability period)
155 HealthState::Healthy
156 } else {
157 // Not connected, but no recent failures - just disconnected
158 HealthState::Disconnected
159 }
160 }
161
162 /// Check if the relay is currently connected
163 pub fn is_connected(&self) -> bool {
164 self.connected
165 }
166
167 /// Check if the relay is currently rate limited (cooldown active)
168 pub fn is_rate_limited_now(&self) -> bool {
169 if !self.rate_limited {
170 return false;
171 }
172 if let Some(next_retry) = self.next_retry_at {
173 Instant::now() < next_retry
174 } else {
175 false
176 }
177 }
178
179 /// Get the consecutive failure count
180 pub fn failure_count(&self) -> u32 {
181 self.consecutive_failures
182 }
183
184 /// Get time since last successful connection
185 pub fn time_since_last_success(&self) -> Option<Duration> {
186 self.last_success_time
187 .map(|t| Instant::now().duration_since(t))
188 }
189
190 /// Get time since first failure in current streak
191 pub fn time_since_first_failure(&self) -> Option<Duration> {
192 self.first_failure_time
193 .map(|t| Instant::now().duration_since(t))
194 }
195
196 /// Get remaining backoff/cooldown duration
197 pub fn remaining_backoff(&self) -> Option<Duration> {
198 let next_retry = self.next_retry_at?;
199 let now = Instant::now();
200 if now >= next_retry {
201 None
202 } else {
203 Some(next_retry - now)
204 }
205 }
92} 206}
93 207
94/// Thread-safe relay health tracker using DashMap 208/// Thread-safe relay health tracker using DashMap
@@ -148,16 +262,17 @@ impl RelayHealthTracker {
148 262
149 /// Record a successful connection to a relay 263 /// Record a successful connection to a relay
150 /// 264 ///
151 /// Resets the relay to Healthy state and clears failure counters. 265 /// Clears failure counters and rate limiting. Sets connected = true.
152 pub fn record_success(&self, relay_url: &str) { 266 pub fn record_success(&self, relay_url: &str) {
153 let now = Instant::now(); 267 let now = Instant::now();
154 let mut entry = self.health.entry(relay_url.to_string()).or_default(); 268 let mut entry = self.health.entry(relay_url.to_string()).or_default();
155 let health = entry.value_mut(); 269 let health = entry.value_mut();
156 270
157 let old_state = health.state; 271 let old_state = health.state();
158 272
159 // Reset to healthy state 273 // Reset to healthy state
160 health.state = HealthState::Healthy; 274 health.connected = true;
275 health.rate_limited = false;
161 health.consecutive_failures = 0; 276 health.consecutive_failures = 0;
162 health.first_failure_time = None; 277 health.first_failure_time = None;
163 health.last_failure_time = None; 278 health.last_failure_time = None;
@@ -176,13 +291,17 @@ impl RelayHealthTracker {
176 291
177 /// Record a connection failure for a relay 292 /// Record a connection failure for a relay
178 /// 293 ///
179 /// Increments failure counter, updates state, and calculates next retry time. 294 /// Increments failure counter and calculates next retry time with exponential backoff.
295 /// Sets connected = false.
180 pub fn record_failure(&self, relay_url: &str) { 296 pub fn record_failure(&self, relay_url: &str) {
181 let now = Instant::now(); 297 let now = Instant::now();
182 let mut entry = self.health.entry(relay_url.to_string()).or_default(); 298 let mut entry = self.health.entry(relay_url.to_string()).or_default();
183 let health = entry.value_mut(); 299 let health = entry.value_mut();
184 300
185 let old_state = health.state; 301 let old_state = health.state();
302
303 // Mark as disconnected
304 health.connected = false;
186 305
187 // Set first_failure_time if this is a new failure streak 306 // Set first_failure_time if this is a new failure streak
188 if health.first_failure_time.is_none() { 307 if health.first_failure_time.is_none() {
@@ -192,18 +311,18 @@ impl RelayHealthTracker {
192 health.consecutive_failures = health.consecutive_failures.saturating_add(1); 311 health.consecutive_failures = health.consecutive_failures.saturating_add(1);
193 health.last_failure_time = Some(now); 312 health.last_failure_time = Some(now);
194 313
195 // Check if we should transition to Dead state 314 // Calculate backoff based on whether we're dead or degraded
196 if let Some(first_failure) = health.first_failure_time { 315 if let Some(first_failure) = health.first_failure_time {
197 let failure_duration = now.duration_since(first_failure); 316 let failure_duration = now.duration_since(first_failure);
198 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); 317 let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600);
199 318
200 if failure_duration >= dead_threshold { 319 if failure_duration >= dead_threshold {
201 health.state = HealthState::Dead;
202 // Dead relays retry once per day 320 // Dead relays retry once per day
203 health.next_retry_at = 321 health.next_retry_at =
204 Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); 322 Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600));
205 323
206 if old_state != HealthState::Dead { 324 let new_state = health.state();
325 if old_state != HealthState::Dead && new_state == HealthState::Dead {
207 tracing::warn!( 326 tracing::warn!(
208 "Relay {} marked dead after 24h failures ({} consecutive failures)", 327 "Relay {} marked dead after 24h failures ({} consecutive failures)",
209 relay_url, 328 relay_url,
@@ -212,15 +331,21 @@ impl RelayHealthTracker {
212 } 331 }
213 } else { 332 } else {
214 // Degraded state with exponential backoff 333 // Degraded state with exponential backoff
215 health.state = HealthState::Degraded;
216 let backoff = Self::get_backoff_duration( 334 let backoff = Self::get_backoff_duration(
217 health.consecutive_failures, 335 health.consecutive_failures,
218 self.base_backoff_secs, 336 self.base_backoff_secs,
219 self.max_backoff_secs, 337 self.max_backoff_secs,
220 ); 338 );
221 health.next_retry_at = Some(now + backoff); 339 // Respect existing next_retry_at if it's later (e.g., from rate limiting)
340 let new_retry_at = now + backoff;
341 health.next_retry_at = Some(
342 health.next_retry_at
343 .unwrap_or(new_retry_at)
344 .max(new_retry_at)
345 );
222 346
223 if old_state != HealthState::Degraded { 347 let new_state = health.state();
348 if old_state != HealthState::Degraded && new_state == HealthState::Degraded {
224 tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); 349 tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff);
225 } else { 350 } else {
226 tracing::debug!( 351 tracing::debug!(
@@ -234,6 +359,91 @@ impl RelayHealthTracker {
234 } 359 }
235 } 360 }
236 361
362 /// Record a rate limit NOTICE from a relay
363 ///
364 /// Sets the relay to RateLimited state with a fixed 65-second cooldown.
365 /// This is distinct from connection failures (Degraded state) - it's triggered
366 /// by NOTICE messages from the relay indicating we're sending too many requests.
367 pub fn record_rate_limit(&self, relay_url: &str) {
368 let now = Instant::now();
369 let mut entry = self.health.entry(relay_url.to_string()).or_default();
370 let health = entry.value_mut();
371
372 health.rate_limited = true;
373 health.next_retry_at = Some(now + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS));
374
375 tracing::warn!(
376 relay = %relay_url,
377 cooldown_secs = RATE_LIMIT_COOLDOWN_SECS,
378 "Relay rate limited, pausing new subscriptions"
379 );
380 }
381
382 /// Clear rate limiting state for a specific relay
383 ///
384 /// This only clears the rate_limited flag, without affecting connection status
385 /// or failure counters. Use this when rate limit cooldown has expired and we
386 /// want to allow new subscriptions.
387 ///
388 /// This is different from `record_success()` which resets all health state.
389 pub fn clear_rate_limit(&self, relay_url: &str) {
390 if let Some(mut entry) = self.health.get_mut(relay_url) {
391 let health = entry.value_mut();
392 health.rate_limited = false;
393 }
394 }
395
396
397 /// Check if relay is currently rate limited
398 ///
399 /// Returns true if the relay is in RateLimited state and the cooldown period
400 /// has not yet expired. Once the cooldown expires, this returns false and the
401 /// relay can accept new subscriptions again.
402 pub fn is_rate_limited(&self, relay_url: &str) -> bool {
403 if let Some(entry) = self.health.get(relay_url) {
404 let health = entry.value();
405 health.rate_limited
406 } else {
407 false
408 }
409 }
410
411 /// Exit rate limiting state for relays whose cooldown has expired
412 ///
413 /// Finds all relays that are currently rate limited but whose cooldown period
414 /// has expired, clears their rate_limited flag, and returns their URLs.
415 ///
416 /// This method mutates state by clearing the rate_limited flag for recovered relays.
417 ///
418 /// Returns a vector of relay URLs that were recovered from rate limiting.
419 pub fn exit_expired_rate_limits(&self) -> Vec<String> {
420 let now = Instant::now();
421 let mut recovered_relays = Vec::new();
422
423 for mut entry in self.health.iter_mut() {
424 let (url, health) = entry.pair_mut();
425
426 // Check if rate limited and cooldown has expired
427 if health.rate_limited {
428 if let Some(next_retry) = health.next_retry_at {
429 if now > next_retry {
430 // Cooldown expired - clear rate limiting
431 health.rate_limited = false;
432 health.next_retry_at = None;
433 recovered_relays.push(url.clone());
434
435 tracing::info!(
436 relay = %url,
437 "Rate limit cooldown expired, relay ready for new subscriptions"
438 );
439 }
440 }
441 }
442 }
443
444 recovered_relays
445 }
446
237 /// Check if a connection attempt should be made to a relay 447 /// Check if a connection attempt should be made to a relay
238 /// 448 ///
239 /// Returns true if: 449 /// Returns true if:
@@ -248,10 +458,16 @@ impl RelayHealthTracker {
248 Some(entry) => { 458 Some(entry) => {
249 let health = entry.value(); 459 let health = entry.value();
250 460
251 match health.state { 461 // Don't reconnect if currently rate-limited
252 HealthState::Healthy => true, 462 if health.is_rate_limited_now() {
253 HealthState::Degraded | HealthState::Dead => { 463 return false;
254 // Check if backoff period has elapsed 464 }
465
466 // Check state-based logic
467 match health.state() {
468 HealthState::Healthy | HealthState::Disconnected => true,
469 HealthState::Degraded | HealthState::Dead | HealthState::RateLimited => {
470 // Check if backoff/cooldown period has elapsed
255 match health.next_retry_at { 471 match health.next_retry_at {
256 None => true, 472 None => true,
257 Some(next_retry) => Instant::now() >= next_retry, 473 Some(next_retry) => Instant::now() >= next_retry,
@@ -266,7 +482,7 @@ impl RelayHealthTracker {
266 pub fn get_state(&self, relay_url: &str) -> HealthState { 482 pub fn get_state(&self, relay_url: &str) -> HealthState {
267 self.health 483 self.health
268 .get(relay_url) 484 .get(relay_url)
269 .map(|entry| entry.value().state) 485 .map(|entry| entry.value().state())
270 .unwrap_or(HealthState::Healthy) 486 .unwrap_or(HealthState::Healthy)
271 } 487 }
272 488
@@ -350,10 +566,12 @@ mod tests {
350 } 566 }
351 567
352 #[test] 568 #[test]
353 fn test_default_health_is_healthy() { 569 fn test_default_health_is_disconnected() {
354 let health = RelayHealth::default(); 570 let health = RelayHealth::default();
355 assert_eq!(health.state, HealthState::Healthy); 571 // Default state: not connected, no failures = Disconnected
572 assert_eq!(health.state(), HealthState::Disconnected);
356 assert_eq!(health.consecutive_failures, 0); 573 assert_eq!(health.consecutive_failures, 0);
574 assert!(!health.connected);
357 assert!(health.first_failure_time.is_none()); 575 assert!(health.first_failure_time.is_none());
358 } 576 }
359 577
@@ -504,7 +722,7 @@ mod tests {
504 722
505 assert!(health.is_some()); 723 assert!(health.is_some());
506 let health = health.unwrap(); 724 let health = health.unwrap();
507 assert_eq!(health.state, HealthState::Healthy); 725 assert_eq!(health.state(), HealthState::Healthy);
508 assert!(health.last_success_time.is_some()); 726 assert!(health.last_success_time.is_some());
509 } 727 }
510 728
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index 22c9192..453a79c 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -72,7 +72,7 @@ impl SyncMetrics {
72 let relay_status = IntGaugeVec::new( 72 let relay_status = IntGaugeVec::new(
73 Opts::new( 73 Opts::new(
74 "ngit_sync_relay_status", 74 "ngit_sync_relay_status",
75 "Relay health status (1=healthy, 2=degraded, 3=dead)", 75 "Relay health status (1=healthy, 2=disconnected, 3=degraded, 4=dead, 5=rate_limited)",
76 ), 76 ),
77 &["relay"], 77 &["relay"],
78 )?; 78 )?;
@@ -178,9 +178,11 @@ impl SyncMetrics {
178 /// Record relay health state change. 178 /// Record relay health state change.
179 /// 179 ///
180 /// Maps health states to numeric values for Prometheus: 180 /// Maps health states to numeric values for Prometheus:
181 /// - Healthy = 1 181 /// - Healthy = 1 (connected and stable)
182 /// - Degraded = 2 182 /// - Disconnected = 2 (not connected, but no issues)
183 /// - Dead = 3 183 /// - Degraded = 3 (connection problems or unstable after recovery)
184 /// - Dead = 4 (24h+ of failures)
185 /// - RateLimited = 5 (rate limit cooldown active)
184 /// 186 ///
185 /// # Arguments 187 /// # Arguments
186 /// 188 ///
@@ -189,8 +191,10 @@ impl SyncMetrics {
189 pub fn record_health_state(&self, relay: &str, state: HealthState) { 191 pub fn record_health_state(&self, relay: &str, state: HealthState) {
190 let state_value = match state { 192 let state_value = match state {
191 HealthState::Healthy => 1, 193 HealthState::Healthy => 1,
192 HealthState::Degraded => 2, 194 HealthState::Disconnected => 2,
193 HealthState::Dead => 3, 195 HealthState::Degraded => 3,
196 HealthState::Dead => 4,
197 HealthState::RateLimited => 5,
194 }; 198 };
195 self.relay_status 199 self.relay_status
196 .with_label_values(&[relay]) 200 .with_label_values(&[relay])
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 6f59b19..1f95ff7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -33,6 +33,7 @@ pub use self_subscriber::SelfSubscriber;
33 33
34// Re-export health tracking types 34// Re-export health tracking types
35pub use health::RelayHealthTracker; 35pub use health::RelayHealthTracker;
36use tokio::time::sleep;
36 37
37use std::collections::{HashMap, HashSet}; 38use std::collections::{HashMap, HashSet};
38use std::sync::Arc; 39use std::sync::Arc;
@@ -303,42 +304,59 @@ async fn run_daily_timer(
303 } 304 }
304} 305}
305 306
306// ============================================================================= 307// Combined Health and Metrics Checker
307// Disconnect Checker
308// =============================================================================
309 308
310/// Run the disconnect checker for periodic cleanup of empty relays 309/// Run the combined health and metrics checker
311/// 310///
312/// This function runs in a loop, checking at the configured interval for relays 311/// This function runs in a loop with a 2-second interval, performing three tasks:
313/// that have no repos or root events to sync. Non-bootstrap relays 312/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones
314/// that are empty will be disconnected to free up resources. 313/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired
314/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker
315/// 315///
316/// Bootstrap relays are never disconnected, even if empty. 316/// The metrics update ensures that health states are kept current in metrics even when
317/// they change due to timeouts, cooldowns expiring, or stability periods completing.
317/// 318///
318/// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` 319/// The 2-second interval provides a good balance between responsiveness and overhead.
319/// (default: 60 seconds). Set to a lower value for faster reconnection testing. 320/// While disconnect checking traditionally ran at 60s intervals, the faster cadence here
320async fn run_disconnect_checker( 321/// is acceptable since the operations are lightweight (just index checks, no I/O).
322async fn run_health_and_metrics_checker(
321 sync_manager: Arc<Mutex<SyncManager>>, 323 sync_manager: Arc<Mutex<SyncManager>>,
322 mut shutdown_rx: broadcast::Receiver<()>, 324 mut shutdown_rx: broadcast::Receiver<()>,
323 check_interval_secs: u64,
324) { 325) {
325 let interval = Duration::from_secs(check_interval_secs); 326 let interval = Duration::from_secs(2);
326 tracing::info!( 327 tracing::info!("Health and metrics checker started with 2s interval");
327 interval_secs = check_interval_secs,
328 "Disconnect checker started with configured interval"
329 );
330 328
331 loop { 329 loop {
332 tokio::select! { 330 tokio::select! {
333 _ = tokio::time::sleep(interval) => { 331 _ = tokio::time::sleep(interval) => {
334 tracing::debug!("Disconnect checker running"); 332 tracing::debug!("Health and metrics checker running");
335 333
336 let mut manager = sync_manager.lock().await; 334 let mut manager = sync_manager.lock().await;
335
336 // 1. Check for disconnects and retry disconnected relays
337 manager.check_disconnects().await; 337 manager.check_disconnects().await;
338 manager.retry_disconnected_relays().await; 338 manager.retry_disconnected_relays().await;
339
340 // 2. Check for rate limit recovery
341 manager.check_rate_limit_recovery().await;
342
343 // 3. Update metrics with current health states
344 if let Some(ref metrics) = manager.metrics {
345 // Get all tracked relay URLs
346 let relay_urls: Vec<String> = {
347 let index = manager.relay_sync_index.read().await;
348 index.keys().cloned().collect()
349 };
350
351 // Update health state for each relay
352 for relay_url in relay_urls {
353 let state = manager.health_tracker.get_state(&relay_url);
354 metrics.record_health_state(&relay_url, state);
355 }
356 }
339 } 357 }
340 _ = shutdown_rx.recv() => { 358 _ = shutdown_rx.recv() => {
341 tracing::info!("Disconnect checker received shutdown signal"); 359 tracing::info!("Health and metrics checker received shutdown signal");
342 break; 360 break;
343 } 361 }
344 } 362 }
@@ -510,6 +528,45 @@ impl SyncManager {
510 // Drop the lock before async operations 528 // Drop the lock before async operations
511 drop(pending); 529 drop(pending);
512 530
531 // Wait for rate limiting to clear before pagination continues
532 if self.health_tracker.is_rate_limited(relay_url) {
533 tracing::debug!(
534 relay = %relay_url,
535 batch_id = batch_id,
536 "Relay is rate limited, waiting before pagination"
537 );
538
539 // Loop until rate limit clears, sleeping with jitter between checks
540 while self.health_tracker.is_rate_limited(relay_url) {
541 let jitter_secs = 1 + (rand::random::<u64>() % 5); // 1-5 seconds
542 sleep(Duration::from_secs(jitter_secs)).await;
543 }
544
545 tracing::debug!(
546 relay = %relay_url,
547 batch_id = batch_id,
548 "Rate limit cleared, continuing pagination"
549 );
550 let batch_exists = {
551 let pending = self.pending_sync_index.read().await;
552 pending
553 .get(&relay_url_for_pagination)
554 .map(|batches| batches.iter().any(|b| b.batch_id == batch_id))
555 .unwrap_or(false)
556 };
557
558 // If we were rate limited, verify batch still exists after waiting
559 // (batches are wiped during disconnect, so avoid orphaned pagination)
560 if !batch_exists {
561 tracing::debug!(
562 relay = %relay_url_for_pagination,
563 batch_id = batch_id,
564 "Batch no longer exists after rate limit wait, skipping pagination"
565 );
566 return;
567 }
568 }
569
513 // Subscribe to next page and add to outstanding_subs 570 // Subscribe to next page and add to outstanding_subs
514 if let Some(conn) = self.connections.get(&relay_url_for_pagination) { 571 if let Some(conn) = self.connections.get(&relay_url_for_pagination) {
515 match conn.subscribe_filter(next_filter.clone(), true).await { 572 match conn.subscribe_filter(next_filter.clone(), true).await {
@@ -752,29 +809,22 @@ impl SyncManager {
752 self.try_connect_relay(bootstrap_url).await; 809 self.try_connect_relay(bootstrap_url).await;
753 } 810 }
754 811
755 // 7. Capture config values before moving self into Arc 812 // 7. Wrap self in Arc<Mutex> for sharing with timer task
756 let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs;
757
758 // 8. Wrap self in Arc<Mutex> for sharing with timer task
759 let sync_manager = Arc::new(Mutex::new(self)); 813 let sync_manager = Arc::new(Mutex::new(self));
760 814
761 // 9. Spawn daily timer task with shutdown receiver 815 // 8. Spawn daily timer task with shutdown receiver
762 let timer_manager = Arc::clone(&sync_manager); 816 let timer_manager = Arc::clone(&sync_manager);
763 let timer_shutdown = shutdown_tx.subscribe(); 817 let timer_shutdown = shutdown_tx.subscribe();
764 tokio::spawn(async move { 818 tokio::spawn(async move {
765 run_daily_timer(timer_manager, timer_shutdown).await; 819 run_daily_timer(timer_manager, timer_shutdown).await;
766 }); 820 });
767 821
768 // 10. Spawn disconnect checker task with shutdown receiver 822 // 9. Spawn health and metrics checker task with shutdown receiver
823 // This combines disconnect checking, rate limit recovery, and metrics updates
769 let checker_manager = Arc::clone(&sync_manager); 824 let checker_manager = Arc::clone(&sync_manager);
770 let checker_shutdown = shutdown_tx.subscribe(); 825 let checker_shutdown = shutdown_tx.subscribe();
771 tokio::spawn(async move { 826 tokio::spawn(async move {
772 run_disconnect_checker( 827 run_health_and_metrics_checker(checker_manager, checker_shutdown).await;
773 checker_manager,
774 checker_shutdown,
775 disconnect_check_interval_secs,
776 )
777 .await;
778 }); 828 });
779 829
780 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 830 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
@@ -876,7 +926,18 @@ impl SyncManager {
876 } 926 }
877 } 927 }
878 928
879 // Step 2: Check if consolidation is needed BEFORE adding new filters 929 // Step 2: Check if relay is rate-limited before creating new pending items
930 if self.health_tracker.is_rate_limited(&action.relay_url) {
931 tracing::debug!(
932 relay = %action.relay_url,
933 repos = action.items.repos.len(),
934 root_events = action.items.root_events.len(),
935 "Skipping AddFilters for rate-limited relay, will recompute after cooldown"
936 );
937 return;
938 }
939
940 // Step 3: Check if consolidation is needed BEFORE adding new filters
880 self.maybe_consolidate(&action.relay_url, action.filters.len()) 941 self.maybe_consolidate(&action.relay_url, action.filters.len())
881 .await; 942 .await;
882 943
@@ -954,6 +1015,7 @@ impl SyncManager {
954 let eose_tx = self.eose_tx.as_ref().unwrap().clone(); 1015 let eose_tx = self.eose_tx.as_ref().unwrap().clone();
955 let metrics_clone = self.metrics.clone(); 1016 let metrics_clone = self.metrics.clone();
956 let pending_sync_index = Arc::clone(&self.pending_sync_index); 1017 let pending_sync_index = Arc::clone(&self.pending_sync_index);
1018 let health_tracker = Arc::clone(&self.health_tracker);
957 1019
958 tokio::spawn(async move { 1020 tokio::spawn(async move {
959 let mut disconnect_sent = false; 1021 let mut disconnect_sent = false;
@@ -1011,6 +1073,38 @@ impl SyncManager {
1011 }) 1073 })
1012 .await; 1074 .await;
1013 } 1075 }
1076 RelayEvent::Notice(notice) => {
1077 // Check for rate limiting indicators
1078 let notice_lower = notice.to_lowercase();
1079 let is_rate_limit = (notice_lower.contains("rate")
1080 && notice_lower.contains("limit"))
1081 || notice_lower.contains("too many")
1082 || notice_lower.contains("slow down")
1083 || notice_lower.contains("throttl");
1084
1085 if is_rate_limit {
1086 tracing::warn!(
1087 relay = %relay_url_clone,
1088 notice = %notice,
1089 "Rate limiting NOTICE detected from relay"
1090 );
1091
1092 // Mark relay as rate limited
1093 health_tracker.record_rate_limit(&relay_url_clone);
1094
1095 // Update metrics with new health state
1096 if let Some(ref metrics) = metrics_clone {
1097 let state = health_tracker.get_state(&relay_url_clone);
1098 metrics.record_health_state(&relay_url_clone, state);
1099 }
1100 } else {
1101 tracing::debug!(
1102 relay = %relay_url_clone,
1103 notice = %notice,
1104 "Relay issued notice"
1105 );
1106 }
1107 }
1014 RelayEvent::Closed(reason) => { 1108 RelayEvent::Closed(reason) => {
1015 // CLOSED message means one subscription was closed, not the whole connection 1109 // CLOSED message means one subscription was closed, not the whole connection
1016 // This is normal behavior (e.g., when historic_sync completes) 1110 // This is normal behavior (e.g., when historic_sync completes)
@@ -1901,6 +1995,63 @@ impl SyncManager {
1901 } 1995 }
1902 } 1996 }
1903 1997
1998 /// Check for rate-limited relays that have exceeded cooldown
1999 ///
2000 /// This method is called periodically by run_rate_limit_checker (every 1 second).
2001 /// For each relay in RateLimited state that has exceeded the 65-second cooldown:
2002 /// 1. Clears the rate limit state (sets to Healthy)
2003 /// 2. Recomputes required actions for that relay
2004 /// 3. Submits those actions
2005 async fn check_rate_limit_recovery(&mut self) {
2006 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
2007
2008 // Exit rate limiting for relays whose cooldown has expired
2009 let relays_to_recover: Vec<String> = self.health_tracker.exit_expired_rate_limits();
2010
2011 if relays_to_recover.is_empty() {
2012 return;
2013 }
2014
2015 // Recompute actions - could optimise by adding relays: Option<&[]> to derive_relay_targets
2016 let repo_index = self.repo_sync_index.read().await;
2017 let targets = derive_relay_targets(&repo_index);
2018 drop(repo_index);
2019
2020 for relay_url in relays_to_recover {
2021 tracing::info!(
2022 relay = %relay_url,
2023 "Rate limit cooldown expired, recovering"
2024 );
2025
2026 // Clear rate limit state
2027 self.health_tracker.clear_rate_limit(&relay_url);
2028
2029 // Only compute actions for this specific relay
2030 if let Some(relay_needs) = targets.get(&relay_url) {
2031 let mut single_relay_targets = std::collections::HashMap::new();
2032 single_relay_targets.insert(relay_url.clone(), relay_needs.clone());
2033
2034 let pending = self.pending_sync_index.read().await;
2035 let confirmed = self.relay_sync_index.read().await;
2036
2037 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
2038 drop(pending);
2039 drop(confirmed);
2040
2041 // Submit each action
2042 for action in actions {
2043 tracing::info!(
2044 relay = %action.relay_url,
2045 repo_count = action.items.repos.len(),
2046 event_count = action.items.root_events.len(),
2047 "Submitting recovered actions after rate limit"
2048 );
2049 self.handle_new_sync_filters(action).await;
2050 }
2051 }
2052 }
2053 }
2054
1904 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex 2055 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex
1905 /// 2056 ///
1906 /// This method applies limit(0) to all filters to receive ONLY new events. 2057 /// This method applies limit(0) to all filters to receive ONLY new events.
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 5a61777..d69e1ce 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -27,6 +27,8 @@ pub enum RelayEvent {
27 Event(Event, SubscriptionId), 27 Event(Event, SubscriptionId),
28 /// End of stored events for a subscription 28 /// End of stored events for a subscription
29 EndOfStoredEvents(SubscriptionId), 29 EndOfStoredEvents(SubscriptionId),
30 /// NOTICE message from relay
31 Notice(String),
30 /// Connection was closed 32 /// Connection was closed
31 Closed(String), 33 Closed(String),
32 /// Shutdown notification 34 /// Shutdown notification
@@ -238,6 +240,11 @@ impl RelayConnection {
238 break; 240 break;
239 } 241 }
240 } 242 }
243 RelayMessage::Notice(msg) => {
244 tracing::debug!(relay = %url, message = %msg, "Received NOTICE");
245 let _ = event_sender.send(RelayEvent::Notice(msg.to_string())).await;
246 // Don't break - continue processing events
247 }
241 RelayMessage::Closed { message: msg, .. } => { 248 RelayMessage::Closed { message: msg, .. } => {
242 tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); 249 tracing::info!(relay = %url, message = %msg, "Relay closed subscription");
243 let _ = 250 let _ =