diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:23:46 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:23:46 +0000 |
| commit | 541f34a207047b26547154e7d631005d456f12fd (patch) | |
| tree | 446cffc4b3bbc32bf61933b5ab41a044a35d6f3b /src/sync | |
| parent | b10a6cc91dab4c3d83d62fe8cb357c78f2cd4d1e (diff) | |
sync: add req rate-limit detection and cooldown
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/health.rs | 300 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 16 | ||||
| -rw-r--r-- | src/sync/mod.rs | 215 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 7 |
4 files changed, 459 insertions, 79 deletions
diff --git a/src/sync/health.rs b/src/sync/health.rs index d919a80..a10427f 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs | |||
| @@ -1,15 +1,17 @@ | |||
| 1 | //! Relay Health Tracking for GRASP-02 Proactive Sync | 1 | //! Relay Health Tracking for GRASP-02 Proactive Sync |
| 2 | //! | 2 | //! |
| 3 | //! This module implements health tracking for relay connections, including: | 3 | //! This module implements health tracking for relay connections, including: |
| 4 | //! - Health state machine (Healthy -> Degraded -> Dead) | 4 | //! - Health state machine (Healthy -> Degraded -> Dead -> RateLimited) |
| 5 | //! - Exponential backoff with configurable max delay | 5 | //! - Exponential backoff with configurable max delay |
| 6 | //! - Dead relay detection after 24h of continuous failures | 6 | //! - Dead relay detection after 24h of continuous failures |
| 7 | //! - Rate limit detection and fixed cooldown period | ||
| 7 | //! | 8 | //! |
| 8 | //! ## Health States | 9 | //! ## Health States |
| 9 | //! | 10 | //! |
| 10 | //! - **Healthy**: Working connection, no recent failures | 11 | //! - **Healthy**: Working connection, no recent failures |
| 11 | //! - **Degraded**: Connection failed, retrying with backoff | 12 | //! - **Degraded**: Connection failed, retrying with backoff |
| 12 | //! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) | 13 | //! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) |
| 14 | //! - **RateLimited**: NOTICE-triggered 65-second cooldown to avoid rate limits | ||
| 13 | 15 | ||
| 14 | use std::sync::Arc; | 16 | use std::sync::Arc; |
| 15 | use std::time::{Duration, Instant}; | 17 | use std::time::{Duration, Instant}; |
| @@ -30,37 +32,52 @@ const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; | |||
| 30 | /// Default base backoff duration in seconds | 32 | /// Default base backoff duration in seconds |
| 31 | const DEFAULT_BASE_BACKOFF_SECS: u64 = 5; | 33 | const DEFAULT_BASE_BACKOFF_SECS: u64 = 5; |
| 32 | 34 | ||
| 35 | /// Rate limit cooldown duration in seconds (65 seconds = typical 60s limit + buffer) | ||
| 36 | const RATE_LIMIT_COOLDOWN_SECS: u64 = 65; | ||
| 37 | |||
| 38 | /// Stability period after recovery before marking relay as fully healthy (5 minutes) | ||
| 39 | /// A relay must maintain connection for this duration after failures before being marked Healthy | ||
| 40 | const STABILITY_PERIOD_SECS: u64 = 300; | ||
| 41 | |||
| 33 | /// Health state of a relay connection | 42 | /// Health state of a relay connection |
| 34 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 43 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| 35 | pub enum HealthState { | 44 | pub enum HealthState { |
| 36 | /// Working connection, no recent failures | 45 | /// Working connection, no recent failures, proven stable |
| 37 | Healthy, | 46 | Healthy, |
| 38 | /// Connection failed, retrying with exponential backoff | 47 | /// Not currently connected, but no recent failures or issues |
| 48 | Disconnected, | ||
| 49 | /// Connection problems: failing to connect OR recently recovered but not yet stable | ||
| 39 | Degraded, | 50 | Degraded, |
| 40 | /// 24h+ of continuous failures, minimal retry | 51 | /// 24h+ of continuous failures, minimal retry |
| 41 | Dead, | 52 | Dead, |
| 53 | /// Rate limited by relay, temporary cooldown active | ||
| 54 | RateLimited, | ||
| 42 | } | 55 | } |
| 43 | 56 | ||
| 44 | impl std::fmt::Display for HealthState { | 57 | impl std::fmt::Display for HealthState { |
| 45 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | 58 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 46 | match self { | 59 | match self { |
| 47 | HealthState::Healthy => write!(f, "healthy"), | 60 | HealthState::Healthy => write!(f, "healthy"), |
| 61 | HealthState::Disconnected => write!(f, "disconnected"), | ||
| 48 | HealthState::Degraded => write!(f, "degraded"), | 62 | HealthState::Degraded => write!(f, "degraded"), |
| 49 | HealthState::Dead => write!(f, "dead"), | 63 | HealthState::Dead => write!(f, "dead"), |
| 64 | HealthState::RateLimited => write!(f, "rate_limited"), | ||
| 50 | } | 65 | } |
| 51 | } | 66 | } |
| 52 | } | 67 | } |
| 53 | 68 | ||
| 54 | /// Health information for a single relay | 69 | /// Health information for a single relay |
| 55 | #[derive(Debug, Clone)] | 70 | #[derive(Debug, Clone, Default)] |
| 56 | pub struct RelayHealth { | 71 | pub struct RelayHealth { |
| 57 | /// Current health state | 72 | /// Are we currently connected to this relay |
| 58 | pub state: HealthState, | 73 | pub connected: bool, |
| 74 | /// Has this relay sent us a rate-limiting NOTICE recently | ||
| 75 | pub rate_limited: bool, | ||
| 59 | /// Number of consecutive connection failures | 76 | /// Number of consecutive connection failures |
| 60 | pub consecutive_failures: u32, | 77 | pub consecutive_failures: u32, |
| 61 | /// Time of the first failure in the current failure streak | 78 | /// Time of the first failure in the current failure streak |
| 62 | pub first_failure_time: Option<Instant>, | 79 | pub first_failure_time: Option<Instant>, |
| 63 | /// Time of the last failure | 80 | /// Time of the last failure (kept after recovery for stability period tracking) |
| 64 | pub last_failure_time: Option<Instant>, | 81 | pub last_failure_time: Option<Instant>, |
| 65 | /// Time of the last successful connection | 82 | /// Time of the last successful connection |
| 66 | pub last_success_time: Option<Instant>, | 83 | pub last_success_time: Option<Instant>, |
| @@ -70,25 +87,122 @@ pub struct RelayHealth { | |||
| 70 | pub next_retry_at: Option<Instant>, | 87 | pub next_retry_at: Option<Instant>, |
| 71 | } | 88 | } |
| 72 | 89 | ||
| 73 | impl Default for RelayHealth { | ||
| 74 | fn default() -> Self { | ||
| 75 | Self { | ||
| 76 | state: HealthState::Healthy, | ||
| 77 | consecutive_failures: 0, | ||
| 78 | first_failure_time: None, | ||
| 79 | last_failure_time: None, | ||
| 80 | last_success_time: None, | ||
| 81 | last_attempt_time: None, | ||
| 82 | next_retry_at: None, | ||
| 83 | } | ||
| 84 | } | ||
| 85 | } | ||
| 86 | |||
| 87 | impl RelayHealth { | 90 | impl RelayHealth { |
| 88 | /// Create a new RelayHealth with healthy state | 91 | /// Create a new RelayHealth with default values |
| 89 | pub fn new() -> Self { | 92 | pub fn new() -> Self { |
| 90 | Self::default() | 93 | Self::default() |
| 91 | } | 94 | } |
| 95 | |||
| 96 | /// Get the current health state based on the relay's properties | ||
| 97 | /// | ||
| 98 | /// State is computed dynamically from: | ||
| 99 | /// - Rate limit status | ||
| 100 | /// - Connection status | ||
| 101 | /// - Failure history and timing | ||
| 102 | /// - Stability period after recovery | ||
| 103 | /// | ||
| 104 | /// ## State Logic | ||
| 105 | /// | ||
| 106 | /// 1. **RateLimited**: If rate_limited flag is set and cooldown hasn't expired | ||
| 107 | /// 2. **Dead**: 24+ hours of continuous failures | ||
| 108 | /// 3. **Degraded**: Active connection failures OR in stability period after recovery | ||
| 109 | /// 4. **Disconnected**: Not connected, but no recent failures or issues | ||
| 110 | /// 5. **Healthy**: Connected and stable (past stability period with no failures) | ||
| 111 | pub fn state(&self) -> HealthState { | ||
| 112 | let now = Instant::now(); | ||
| 113 | |||
| 114 | // Check rate limiting first (highest priority) | ||
| 115 | if self.rate_limited { | ||
| 116 | if let Some(next_retry) = self.next_retry_at { | ||
| 117 | if now < next_retry { | ||
| 118 | return HealthState::RateLimited; | ||
| 119 | } | ||
| 120 | } | ||
| 121 | } | ||
| 122 | |||
| 123 | // Check for dead state (24+ hours of failures) | ||
| 124 | if let Some(first_failure) = self.first_failure_time { | ||
| 125 | let failure_duration = now.duration_since(first_failure); | ||
| 126 | let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); | ||
| 127 | if failure_duration >= dead_threshold { | ||
| 128 | return HealthState::Dead; | ||
| 129 | } | ||
| 130 | } | ||
| 131 | |||
| 132 | // Check if we have active failures (currently failing to connect) | ||
| 133 | if self.consecutive_failures > 0 { | ||
| 134 | return HealthState::Degraded; | ||
| 135 | } | ||
| 136 | |||
| 137 | // Check if we're in stability period after recovery | ||
| 138 | // (recovered from failures but not yet proven stable) | ||
| 139 | if let (Some(last_success), Some(last_failure)) = (self.last_success_time, self.last_failure_time) { | ||
| 140 | // Only consider stability period if recovery happened after the last failure | ||
| 141 | if last_success > last_failure { | ||
| 142 | let time_since_recovery = now.duration_since(last_success); | ||
| 143 | let stability_period = Duration::from_secs(STABILITY_PERIOD_SECS); | ||
| 144 | |||
| 145 | if time_since_recovery < stability_period { | ||
| 146 | // Still in stability period - remain degraded to prove stability | ||
| 147 | return HealthState::Degraded; | ||
| 148 | } | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | // Check connection status for final state | ||
| 153 | if self.connected { | ||
| 154 | // Connected and stable (no failures, past stability period) | ||
| 155 | HealthState::Healthy | ||
| 156 | } else { | ||
| 157 | // Not connected, but no recent failures - just disconnected | ||
| 158 | HealthState::Disconnected | ||
| 159 | } | ||
| 160 | } | ||
| 161 | |||
| 162 | /// Check if the relay is currently connected | ||
| 163 | pub fn is_connected(&self) -> bool { | ||
| 164 | self.connected | ||
| 165 | } | ||
| 166 | |||
| 167 | /// Check if the relay is currently rate limited (cooldown active) | ||
| 168 | pub fn is_rate_limited_now(&self) -> bool { | ||
| 169 | if !self.rate_limited { | ||
| 170 | return false; | ||
| 171 | } | ||
| 172 | if let Some(next_retry) = self.next_retry_at { | ||
| 173 | Instant::now() < next_retry | ||
| 174 | } else { | ||
| 175 | false | ||
| 176 | } | ||
| 177 | } | ||
| 178 | |||
| 179 | /// Get the consecutive failure count | ||
| 180 | pub fn failure_count(&self) -> u32 { | ||
| 181 | self.consecutive_failures | ||
| 182 | } | ||
| 183 | |||
| 184 | /// Get time since last successful connection | ||
| 185 | pub fn time_since_last_success(&self) -> Option<Duration> { | ||
| 186 | self.last_success_time | ||
| 187 | .map(|t| Instant::now().duration_since(t)) | ||
| 188 | } | ||
| 189 | |||
| 190 | /// Get time since first failure in current streak | ||
| 191 | pub fn time_since_first_failure(&self) -> Option<Duration> { | ||
| 192 | self.first_failure_time | ||
| 193 | .map(|t| Instant::now().duration_since(t)) | ||
| 194 | } | ||
| 195 | |||
| 196 | /// Get remaining backoff/cooldown duration | ||
| 197 | pub fn remaining_backoff(&self) -> Option<Duration> { | ||
| 198 | let next_retry = self.next_retry_at?; | ||
| 199 | let now = Instant::now(); | ||
| 200 | if now >= next_retry { | ||
| 201 | None | ||
| 202 | } else { | ||
| 203 | Some(next_retry - now) | ||
| 204 | } | ||
| 205 | } | ||
| 92 | } | 206 | } |
| 93 | 207 | ||
| 94 | /// Thread-safe relay health tracker using DashMap | 208 | /// Thread-safe relay health tracker using DashMap |
| @@ -148,16 +262,17 @@ impl RelayHealthTracker { | |||
| 148 | 262 | ||
| 149 | /// Record a successful connection to a relay | 263 | /// Record a successful connection to a relay |
| 150 | /// | 264 | /// |
| 151 | /// Resets the relay to Healthy state and clears failure counters. | 265 | /// Clears failure counters and rate limiting. Sets connected = true. |
| 152 | pub fn record_success(&self, relay_url: &str) { | 266 | pub fn record_success(&self, relay_url: &str) { |
| 153 | let now = Instant::now(); | 267 | let now = Instant::now(); |
| 154 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); | 268 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); |
| 155 | let health = entry.value_mut(); | 269 | let health = entry.value_mut(); |
| 156 | 270 | ||
| 157 | let old_state = health.state; | 271 | let old_state = health.state(); |
| 158 | 272 | ||
| 159 | // Reset to healthy state | 273 | // Reset to healthy state |
| 160 | health.state = HealthState::Healthy; | 274 | health.connected = true; |
| 275 | health.rate_limited = false; | ||
| 161 | health.consecutive_failures = 0; | 276 | health.consecutive_failures = 0; |
| 162 | health.first_failure_time = None; | 277 | health.first_failure_time = None; |
| 163 | health.last_failure_time = None; | 278 | health.last_failure_time = None; |
| @@ -176,13 +291,17 @@ impl RelayHealthTracker { | |||
| 176 | 291 | ||
| 177 | /// Record a connection failure for a relay | 292 | /// Record a connection failure for a relay |
| 178 | /// | 293 | /// |
| 179 | /// Increments failure counter, updates state, and calculates next retry time. | 294 | /// Increments failure counter and calculates next retry time with exponential backoff. |
| 295 | /// Sets connected = false. | ||
| 180 | pub fn record_failure(&self, relay_url: &str) { | 296 | pub fn record_failure(&self, relay_url: &str) { |
| 181 | let now = Instant::now(); | 297 | let now = Instant::now(); |
| 182 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); | 298 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); |
| 183 | let health = entry.value_mut(); | 299 | let health = entry.value_mut(); |
| 184 | 300 | ||
| 185 | let old_state = health.state; | 301 | let old_state = health.state(); |
| 302 | |||
| 303 | // Mark as disconnected | ||
| 304 | health.connected = false; | ||
| 186 | 305 | ||
| 187 | // Set first_failure_time if this is a new failure streak | 306 | // Set first_failure_time if this is a new failure streak |
| 188 | if health.first_failure_time.is_none() { | 307 | if health.first_failure_time.is_none() { |
| @@ -192,18 +311,18 @@ impl RelayHealthTracker { | |||
| 192 | health.consecutive_failures = health.consecutive_failures.saturating_add(1); | 311 | health.consecutive_failures = health.consecutive_failures.saturating_add(1); |
| 193 | health.last_failure_time = Some(now); | 312 | health.last_failure_time = Some(now); |
| 194 | 313 | ||
| 195 | // Check if we should transition to Dead state | 314 | // Calculate backoff based on whether we're dead or degraded |
| 196 | if let Some(first_failure) = health.first_failure_time { | 315 | if let Some(first_failure) = health.first_failure_time { |
| 197 | let failure_duration = now.duration_since(first_failure); | 316 | let failure_duration = now.duration_since(first_failure); |
| 198 | let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); | 317 | let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); |
| 199 | 318 | ||
| 200 | if failure_duration >= dead_threshold { | 319 | if failure_duration >= dead_threshold { |
| 201 | health.state = HealthState::Dead; | ||
| 202 | // Dead relays retry once per day | 320 | // Dead relays retry once per day |
| 203 | health.next_retry_at = | 321 | health.next_retry_at = |
| 204 | Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); | 322 | Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); |
| 205 | 323 | ||
| 206 | if old_state != HealthState::Dead { | 324 | let new_state = health.state(); |
| 325 | if old_state != HealthState::Dead && new_state == HealthState::Dead { | ||
| 207 | tracing::warn!( | 326 | tracing::warn!( |
| 208 | "Relay {} marked dead after 24h failures ({} consecutive failures)", | 327 | "Relay {} marked dead after 24h failures ({} consecutive failures)", |
| 209 | relay_url, | 328 | relay_url, |
| @@ -212,15 +331,21 @@ impl RelayHealthTracker { | |||
| 212 | } | 331 | } |
| 213 | } else { | 332 | } else { |
| 214 | // Degraded state with exponential backoff | 333 | // Degraded state with exponential backoff |
| 215 | health.state = HealthState::Degraded; | ||
| 216 | let backoff = Self::get_backoff_duration( | 334 | let backoff = Self::get_backoff_duration( |
| 217 | health.consecutive_failures, | 335 | health.consecutive_failures, |
| 218 | self.base_backoff_secs, | 336 | self.base_backoff_secs, |
| 219 | self.max_backoff_secs, | 337 | self.max_backoff_secs, |
| 220 | ); | 338 | ); |
| 221 | health.next_retry_at = Some(now + backoff); | 339 | // Respect existing next_retry_at if it's later (e.g., from rate limiting) |
| 340 | let new_retry_at = now + backoff; | ||
| 341 | health.next_retry_at = Some( | ||
| 342 | health.next_retry_at | ||
| 343 | .unwrap_or(new_retry_at) | ||
| 344 | .max(new_retry_at) | ||
| 345 | ); | ||
| 222 | 346 | ||
| 223 | if old_state != HealthState::Degraded { | 347 | let new_state = health.state(); |
| 348 | if old_state != HealthState::Degraded && new_state == HealthState::Degraded { | ||
| 224 | tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); | 349 | tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); |
| 225 | } else { | 350 | } else { |
| 226 | tracing::debug!( | 351 | tracing::debug!( |
| @@ -234,6 +359,91 @@ impl RelayHealthTracker { | |||
| 234 | } | 359 | } |
| 235 | } | 360 | } |
| 236 | 361 | ||
| 362 | /// Record a rate limit NOTICE from a relay | ||
| 363 | /// | ||
| 364 | /// Sets the relay to RateLimited state with a fixed 65-second cooldown. | ||
| 365 | /// This is distinct from connection failures (Degraded state) - it's triggered | ||
| 366 | /// by NOTICE messages from the relay indicating we're sending too many requests. | ||
| 367 | pub fn record_rate_limit(&self, relay_url: &str) { | ||
| 368 | let now = Instant::now(); | ||
| 369 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); | ||
| 370 | let health = entry.value_mut(); | ||
| 371 | |||
| 372 | health.rate_limited = true; | ||
| 373 | health.next_retry_at = Some(now + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS)); | ||
| 374 | |||
| 375 | tracing::warn!( | ||
| 376 | relay = %relay_url, | ||
| 377 | cooldown_secs = RATE_LIMIT_COOLDOWN_SECS, | ||
| 378 | "Relay rate limited, pausing new subscriptions" | ||
| 379 | ); | ||
| 380 | } | ||
| 381 | |||
| 382 | /// Clear rate limiting state for a specific relay | ||
| 383 | /// | ||
| 384 | /// This only clears the rate_limited flag, without affecting connection status | ||
| 385 | /// or failure counters. Use this when rate limit cooldown has expired and we | ||
| 386 | /// want to allow new subscriptions. | ||
| 387 | /// | ||
| 388 | /// This is different from `record_success()` which resets all health state. | ||
| 389 | pub fn clear_rate_limit(&self, relay_url: &str) { | ||
| 390 | if let Some(mut entry) = self.health.get_mut(relay_url) { | ||
| 391 | let health = entry.value_mut(); | ||
| 392 | health.rate_limited = false; | ||
| 393 | } | ||
| 394 | } | ||
| 395 | |||
| 396 | |||
| 397 | /// Check if relay is currently rate limited | ||
| 398 | /// | ||
| 399 | /// Returns true if the relay is in RateLimited state and the cooldown period | ||
| 400 | /// has not yet expired. Once the cooldown expires, this returns false and the | ||
| 401 | /// relay can accept new subscriptions again. | ||
| 402 | pub fn is_rate_limited(&self, relay_url: &str) -> bool { | ||
| 403 | if let Some(entry) = self.health.get(relay_url) { | ||
| 404 | let health = entry.value(); | ||
| 405 | health.rate_limited | ||
| 406 | } else { | ||
| 407 | false | ||
| 408 | } | ||
| 409 | } | ||
| 410 | |||
| 411 | /// Exit rate limiting state for relays whose cooldown has expired | ||
| 412 | /// | ||
| 413 | /// Finds all relays that are currently rate limited but whose cooldown period | ||
| 414 | /// has expired, clears their rate_limited flag, and returns their URLs. | ||
| 415 | /// | ||
| 416 | /// This method mutates state by clearing the rate_limited flag for recovered relays. | ||
| 417 | /// | ||
| 418 | /// Returns a vector of relay URLs that were recovered from rate limiting. | ||
| 419 | pub fn exit_expired_rate_limits(&self) -> Vec<String> { | ||
| 420 | let now = Instant::now(); | ||
| 421 | let mut recovered_relays = Vec::new(); | ||
| 422 | |||
| 423 | for mut entry in self.health.iter_mut() { | ||
| 424 | let (url, health) = entry.pair_mut(); | ||
| 425 | |||
| 426 | // Check if rate limited and cooldown has expired | ||
| 427 | if health.rate_limited { | ||
| 428 | if let Some(next_retry) = health.next_retry_at { | ||
| 429 | if now > next_retry { | ||
| 430 | // Cooldown expired - clear rate limiting | ||
| 431 | health.rate_limited = false; | ||
| 432 | health.next_retry_at = None; | ||
| 433 | recovered_relays.push(url.clone()); | ||
| 434 | |||
| 435 | tracing::info!( | ||
| 436 | relay = %url, | ||
| 437 | "Rate limit cooldown expired, relay ready for new subscriptions" | ||
| 438 | ); | ||
| 439 | } | ||
| 440 | } | ||
| 441 | } | ||
| 442 | } | ||
| 443 | |||
| 444 | recovered_relays | ||
| 445 | } | ||
| 446 | |||
| 237 | /// Check if a connection attempt should be made to a relay | 447 | /// Check if a connection attempt should be made to a relay |
| 238 | /// | 448 | /// |
| 239 | /// Returns true if: | 449 | /// Returns true if: |
| @@ -248,10 +458,16 @@ impl RelayHealthTracker { | |||
| 248 | Some(entry) => { | 458 | Some(entry) => { |
| 249 | let health = entry.value(); | 459 | let health = entry.value(); |
| 250 | 460 | ||
| 251 | match health.state { | 461 | // Don't reconnect if currently rate-limited |
| 252 | HealthState::Healthy => true, | 462 | if health.is_rate_limited_now() { |
| 253 | HealthState::Degraded | HealthState::Dead => { | 463 | return false; |
| 254 | // Check if backoff period has elapsed | 464 | } |
| 465 | |||
| 466 | // Check state-based logic | ||
| 467 | match health.state() { | ||
| 468 | HealthState::Healthy | HealthState::Disconnected => true, | ||
| 469 | HealthState::Degraded | HealthState::Dead | HealthState::RateLimited => { | ||
| 470 | // Check if backoff/cooldown period has elapsed | ||
| 255 | match health.next_retry_at { | 471 | match health.next_retry_at { |
| 256 | None => true, | 472 | None => true, |
| 257 | Some(next_retry) => Instant::now() >= next_retry, | 473 | Some(next_retry) => Instant::now() >= next_retry, |
| @@ -266,7 +482,7 @@ impl RelayHealthTracker { | |||
| 266 | pub fn get_state(&self, relay_url: &str) -> HealthState { | 482 | pub fn get_state(&self, relay_url: &str) -> HealthState { |
| 267 | self.health | 483 | self.health |
| 268 | .get(relay_url) | 484 | .get(relay_url) |
| 269 | .map(|entry| entry.value().state) | 485 | .map(|entry| entry.value().state()) |
| 270 | .unwrap_or(HealthState::Healthy) | 486 | .unwrap_or(HealthState::Healthy) |
| 271 | } | 487 | } |
| 272 | 488 | ||
| @@ -350,10 +566,12 @@ mod tests { | |||
| 350 | } | 566 | } |
| 351 | 567 | ||
| 352 | #[test] | 568 | #[test] |
| 353 | fn test_default_health_is_healthy() { | 569 | fn test_default_health_is_disconnected() { |
| 354 | let health = RelayHealth::default(); | 570 | let health = RelayHealth::default(); |
| 355 | assert_eq!(health.state, HealthState::Healthy); | 571 | // Default state: not connected, no failures = Disconnected |
| 572 | assert_eq!(health.state(), HealthState::Disconnected); | ||
| 356 | assert_eq!(health.consecutive_failures, 0); | 573 | assert_eq!(health.consecutive_failures, 0); |
| 574 | assert!(!health.connected); | ||
| 357 | assert!(health.first_failure_time.is_none()); | 575 | assert!(health.first_failure_time.is_none()); |
| 358 | } | 576 | } |
| 359 | 577 | ||
| @@ -504,7 +722,7 @@ mod tests { | |||
| 504 | 722 | ||
| 505 | assert!(health.is_some()); | 723 | assert!(health.is_some()); |
| 506 | let health = health.unwrap(); | 724 | let health = health.unwrap(); |
| 507 | assert_eq!(health.state, HealthState::Healthy); | 725 | assert_eq!(health.state(), HealthState::Healthy); |
| 508 | assert!(health.last_success_time.is_some()); | 726 | assert!(health.last_success_time.is_some()); |
| 509 | } | 727 | } |
| 510 | 728 | ||
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 22c9192..453a79c 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -72,7 +72,7 @@ impl SyncMetrics { | |||
| 72 | let relay_status = IntGaugeVec::new( | 72 | let relay_status = IntGaugeVec::new( |
| 73 | Opts::new( | 73 | Opts::new( |
| 74 | "ngit_sync_relay_status", | 74 | "ngit_sync_relay_status", |
| 75 | "Relay health status (1=healthy, 2=degraded, 3=dead)", | 75 | "Relay health status (1=healthy, 2=disconnected, 3=degraded, 4=dead, 5=rate_limited)", |
| 76 | ), | 76 | ), |
| 77 | &["relay"], | 77 | &["relay"], |
| 78 | )?; | 78 | )?; |
| @@ -178,9 +178,11 @@ impl SyncMetrics { | |||
| 178 | /// Record relay health state change. | 178 | /// Record relay health state change. |
| 179 | /// | 179 | /// |
| 180 | /// Maps health states to numeric values for Prometheus: | 180 | /// Maps health states to numeric values for Prometheus: |
| 181 | /// - Healthy = 1 | 181 | /// - Healthy = 1 (connected and stable) |
| 182 | /// - Degraded = 2 | 182 | /// - Disconnected = 2 (not connected, but no issues) |
| 183 | /// - Dead = 3 | 183 | /// - Degraded = 3 (connection problems or unstable after recovery) |
| 184 | /// - Dead = 4 (24h+ of failures) | ||
| 185 | /// - RateLimited = 5 (rate limit cooldown active) | ||
| 184 | /// | 186 | /// |
| 185 | /// # Arguments | 187 | /// # Arguments |
| 186 | /// | 188 | /// |
| @@ -189,8 +191,10 @@ impl SyncMetrics { | |||
| 189 | pub fn record_health_state(&self, relay: &str, state: HealthState) { | 191 | pub fn record_health_state(&self, relay: &str, state: HealthState) { |
| 190 | let state_value = match state { | 192 | let state_value = match state { |
| 191 | HealthState::Healthy => 1, | 193 | HealthState::Healthy => 1, |
| 192 | HealthState::Degraded => 2, | 194 | HealthState::Disconnected => 2, |
| 193 | HealthState::Dead => 3, | 195 | HealthState::Degraded => 3, |
| 196 | HealthState::Dead => 4, | ||
| 197 | HealthState::RateLimited => 5, | ||
| 194 | }; | 198 | }; |
| 195 | self.relay_status | 199 | self.relay_status |
| 196 | .with_label_values(&[relay]) | 200 | .with_label_values(&[relay]) |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6f59b19..1f95ff7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -33,6 +33,7 @@ pub use self_subscriber::SelfSubscriber; | |||
| 33 | 33 | ||
| 34 | // Re-export health tracking types | 34 | // Re-export health tracking types |
| 35 | pub use health::RelayHealthTracker; | 35 | pub use health::RelayHealthTracker; |
| 36 | use tokio::time::sleep; | ||
| 36 | 37 | ||
| 37 | use std::collections::{HashMap, HashSet}; | 38 | use std::collections::{HashMap, HashSet}; |
| 38 | use std::sync::Arc; | 39 | use std::sync::Arc; |
| @@ -303,42 +304,59 @@ async fn run_daily_timer( | |||
| 303 | } | 304 | } |
| 304 | } | 305 | } |
| 305 | 306 | ||
| 306 | // ============================================================================= | 307 | // Combined Health and Metrics Checker |
| 307 | // Disconnect Checker | ||
| 308 | // ============================================================================= | ||
| 309 | 308 | ||
| 310 | /// Run the disconnect checker for periodic cleanup of empty relays | 309 | /// Run the combined health and metrics checker |
| 311 | /// | 310 | /// |
| 312 | /// This function runs in a loop, checking at the configured interval for relays | 311 | /// This function runs in a loop with a 2-second interval, performing three tasks: |
| 313 | /// that have no repos or root events to sync. Non-bootstrap relays | 312 | /// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones |
| 314 | /// that are empty will be disconnected to free up resources. | 313 | /// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired |
| 314 | /// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker | ||
| 315 | /// | 315 | /// |
| 316 | /// Bootstrap relays are never disconnected, even if empty. | 316 | /// The metrics update ensures that health states are kept current in metrics even when |
| 317 | /// they change due to timeouts, cooldowns expiring, or stability periods completing. | ||
| 317 | /// | 318 | /// |
| 318 | /// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` | 319 | /// The 2-second interval provides a good balance between responsiveness and overhead. |
| 319 | /// (default: 60 seconds). Set to a lower value for faster reconnection testing. | 320 | /// While disconnect checking traditionally ran at 60s intervals, the faster cadence here |
| 320 | async fn run_disconnect_checker( | 321 | /// is acceptable since the operations are lightweight (just index checks, no I/O). |
| 322 | async fn run_health_and_metrics_checker( | ||
| 321 | sync_manager: Arc<Mutex<SyncManager>>, | 323 | sync_manager: Arc<Mutex<SyncManager>>, |
| 322 | mut shutdown_rx: broadcast::Receiver<()>, | 324 | mut shutdown_rx: broadcast::Receiver<()>, |
| 323 | check_interval_secs: u64, | ||
| 324 | ) { | 325 | ) { |
| 325 | let interval = Duration::from_secs(check_interval_secs); | 326 | let interval = Duration::from_secs(2); |
| 326 | tracing::info!( | 327 | tracing::info!("Health and metrics checker started with 2s interval"); |
| 327 | interval_secs = check_interval_secs, | ||
| 328 | "Disconnect checker started with configured interval" | ||
| 329 | ); | ||
| 330 | 328 | ||
| 331 | loop { | 329 | loop { |
| 332 | tokio::select! { | 330 | tokio::select! { |
| 333 | _ = tokio::time::sleep(interval) => { | 331 | _ = tokio::time::sleep(interval) => { |
| 334 | tracing::debug!("Disconnect checker running"); | 332 | tracing::debug!("Health and metrics checker running"); |
| 335 | 333 | ||
| 336 | let mut manager = sync_manager.lock().await; | 334 | let mut manager = sync_manager.lock().await; |
| 335 | |||
| 336 | // 1. Check for disconnects and retry disconnected relays | ||
| 337 | manager.check_disconnects().await; | 337 | manager.check_disconnects().await; |
| 338 | manager.retry_disconnected_relays().await; | 338 | manager.retry_disconnected_relays().await; |
| 339 | |||
| 340 | // 2. Check for rate limit recovery | ||
| 341 | manager.check_rate_limit_recovery().await; | ||
| 342 | |||
| 343 | // 3. Update metrics with current health states | ||
| 344 | if let Some(ref metrics) = manager.metrics { | ||
| 345 | // Get all tracked relay URLs | ||
| 346 | let relay_urls: Vec<String> = { | ||
| 347 | let index = manager.relay_sync_index.read().await; | ||
| 348 | index.keys().cloned().collect() | ||
| 349 | }; | ||
| 350 | |||
| 351 | // Update health state for each relay | ||
| 352 | for relay_url in relay_urls { | ||
| 353 | let state = manager.health_tracker.get_state(&relay_url); | ||
| 354 | metrics.record_health_state(&relay_url, state); | ||
| 355 | } | ||
| 356 | } | ||
| 339 | } | 357 | } |
| 340 | _ = shutdown_rx.recv() => { | 358 | _ = shutdown_rx.recv() => { |
| 341 | tracing::info!("Disconnect checker received shutdown signal"); | 359 | tracing::info!("Health and metrics checker received shutdown signal"); |
| 342 | break; | 360 | break; |
| 343 | } | 361 | } |
| 344 | } | 362 | } |
| @@ -510,6 +528,45 @@ impl SyncManager { | |||
| 510 | // Drop the lock before async operations | 528 | // Drop the lock before async operations |
| 511 | drop(pending); | 529 | drop(pending); |
| 512 | 530 | ||
| 531 | // Wait for rate limiting to clear before pagination continues | ||
| 532 | if self.health_tracker.is_rate_limited(relay_url) { | ||
| 533 | tracing::debug!( | ||
| 534 | relay = %relay_url, | ||
| 535 | batch_id = batch_id, | ||
| 536 | "Relay is rate limited, waiting before pagination" | ||
| 537 | ); | ||
| 538 | |||
| 539 | // Loop until rate limit clears, sleeping with jitter between checks | ||
| 540 | while self.health_tracker.is_rate_limited(relay_url) { | ||
| 541 | let jitter_secs = 1 + (rand::random::<u64>() % 5); // 1-5 seconds | ||
| 542 | sleep(Duration::from_secs(jitter_secs)).await; | ||
| 543 | } | ||
| 544 | |||
| 545 | tracing::debug!( | ||
| 546 | relay = %relay_url, | ||
| 547 | batch_id = batch_id, | ||
| 548 | "Rate limit cleared, continuing pagination" | ||
| 549 | ); | ||
| 550 | let batch_exists = { | ||
| 551 | let pending = self.pending_sync_index.read().await; | ||
| 552 | pending | ||
| 553 | .get(&relay_url_for_pagination) | ||
| 554 | .map(|batches| batches.iter().any(|b| b.batch_id == batch_id)) | ||
| 555 | .unwrap_or(false) | ||
| 556 | }; | ||
| 557 | |||
| 558 | // If we were rate limited, verify batch still exists after waiting | ||
| 559 | // (batches are wiped during disconnect, so avoid orphaned pagination) | ||
| 560 | if !batch_exists { | ||
| 561 | tracing::debug!( | ||
| 562 | relay = %relay_url_for_pagination, | ||
| 563 | batch_id = batch_id, | ||
| 564 | "Batch no longer exists after rate limit wait, skipping pagination" | ||
| 565 | ); | ||
| 566 | return; | ||
| 567 | } | ||
| 568 | } | ||
| 569 | |||
| 513 | // Subscribe to next page and add to outstanding_subs | 570 | // Subscribe to next page and add to outstanding_subs |
| 514 | if let Some(conn) = self.connections.get(&relay_url_for_pagination) { | 571 | if let Some(conn) = self.connections.get(&relay_url_for_pagination) { |
| 515 | match conn.subscribe_filter(next_filter.clone(), true).await { | 572 | match conn.subscribe_filter(next_filter.clone(), true).await { |
| @@ -752,29 +809,22 @@ impl SyncManager { | |||
| 752 | self.try_connect_relay(bootstrap_url).await; | 809 | self.try_connect_relay(bootstrap_url).await; |
| 753 | } | 810 | } |
| 754 | 811 | ||
| 755 | // 7. Capture config values before moving self into Arc | 812 | // 7. Wrap self in Arc<Mutex> for sharing with timer task |
| 756 | let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs; | ||
| 757 | |||
| 758 | // 8. Wrap self in Arc<Mutex> for sharing with timer task | ||
| 759 | let sync_manager = Arc::new(Mutex::new(self)); | 813 | let sync_manager = Arc::new(Mutex::new(self)); |
| 760 | 814 | ||
| 761 | // 9. Spawn daily timer task with shutdown receiver | 815 | // 8. Spawn daily timer task with shutdown receiver |
| 762 | let timer_manager = Arc::clone(&sync_manager); | 816 | let timer_manager = Arc::clone(&sync_manager); |
| 763 | let timer_shutdown = shutdown_tx.subscribe(); | 817 | let timer_shutdown = shutdown_tx.subscribe(); |
| 764 | tokio::spawn(async move { | 818 | tokio::spawn(async move { |
| 765 | run_daily_timer(timer_manager, timer_shutdown).await; | 819 | run_daily_timer(timer_manager, timer_shutdown).await; |
| 766 | }); | 820 | }); |
| 767 | 821 | ||
| 768 | // 10. Spawn disconnect checker task with shutdown receiver | 822 | // 9. Spawn health and metrics checker task with shutdown receiver |
| 823 | // This combines disconnect checking, rate limit recovery, and metrics updates | ||
| 769 | let checker_manager = Arc::clone(&sync_manager); | 824 | let checker_manager = Arc::clone(&sync_manager); |
| 770 | let checker_shutdown = shutdown_tx.subscribe(); | 825 | let checker_shutdown = shutdown_tx.subscribe(); |
| 771 | tokio::spawn(async move { | 826 | tokio::spawn(async move { |
| 772 | run_disconnect_checker( | 827 | run_health_and_metrics_checker(checker_manager, checker_shutdown).await; |
| 773 | checker_manager, | ||
| 774 | checker_shutdown, | ||
| 775 | disconnect_check_interval_secs, | ||
| 776 | ) | ||
| 777 | .await; | ||
| 778 | }); | 828 | }); |
| 779 | 829 | ||
| 780 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 830 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| @@ -876,7 +926,18 @@ impl SyncManager { | |||
| 876 | } | 926 | } |
| 877 | } | 927 | } |
| 878 | 928 | ||
| 879 | // Step 2: Check if consolidation is needed BEFORE adding new filters | 929 | // Step 2: Check if relay is rate-limited before creating new pending items |
| 930 | if self.health_tracker.is_rate_limited(&action.relay_url) { | ||
| 931 | tracing::debug!( | ||
| 932 | relay = %action.relay_url, | ||
| 933 | repos = action.items.repos.len(), | ||
| 934 | root_events = action.items.root_events.len(), | ||
| 935 | "Skipping AddFilters for rate-limited relay, will recompute after cooldown" | ||
| 936 | ); | ||
| 937 | return; | ||
| 938 | } | ||
| 939 | |||
| 940 | // Step 3: Check if consolidation is needed BEFORE adding new filters | ||
| 880 | self.maybe_consolidate(&action.relay_url, action.filters.len()) | 941 | self.maybe_consolidate(&action.relay_url, action.filters.len()) |
| 881 | .await; | 942 | .await; |
| 882 | 943 | ||
| @@ -954,6 +1015,7 @@ impl SyncManager { | |||
| 954 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); | 1015 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); |
| 955 | let metrics_clone = self.metrics.clone(); | 1016 | let metrics_clone = self.metrics.clone(); |
| 956 | let pending_sync_index = Arc::clone(&self.pending_sync_index); | 1017 | let pending_sync_index = Arc::clone(&self.pending_sync_index); |
| 1018 | let health_tracker = Arc::clone(&self.health_tracker); | ||
| 957 | 1019 | ||
| 958 | tokio::spawn(async move { | 1020 | tokio::spawn(async move { |
| 959 | let mut disconnect_sent = false; | 1021 | let mut disconnect_sent = false; |
| @@ -1011,6 +1073,38 @@ impl SyncManager { | |||
| 1011 | }) | 1073 | }) |
| 1012 | .await; | 1074 | .await; |
| 1013 | } | 1075 | } |
| 1076 | RelayEvent::Notice(notice) => { | ||
| 1077 | // Check for rate limiting indicators | ||
| 1078 | let notice_lower = notice.to_lowercase(); | ||
| 1079 | let is_rate_limit = (notice_lower.contains("rate") | ||
| 1080 | && notice_lower.contains("limit")) | ||
| 1081 | || notice_lower.contains("too many") | ||
| 1082 | || notice_lower.contains("slow down") | ||
| 1083 | || notice_lower.contains("throttl"); | ||
| 1084 | |||
| 1085 | if is_rate_limit { | ||
| 1086 | tracing::warn!( | ||
| 1087 | relay = %relay_url_clone, | ||
| 1088 | notice = %notice, | ||
| 1089 | "Rate limiting NOTICE detected from relay" | ||
| 1090 | ); | ||
| 1091 | |||
| 1092 | // Mark relay as rate limited | ||
| 1093 | health_tracker.record_rate_limit(&relay_url_clone); | ||
| 1094 | |||
| 1095 | // Update metrics with new health state | ||
| 1096 | if let Some(ref metrics) = metrics_clone { | ||
| 1097 | let state = health_tracker.get_state(&relay_url_clone); | ||
| 1098 | metrics.record_health_state(&relay_url_clone, state); | ||
| 1099 | } | ||
| 1100 | } else { | ||
| 1101 | tracing::debug!( | ||
| 1102 | relay = %relay_url_clone, | ||
| 1103 | notice = %notice, | ||
| 1104 | "Relay issued notice" | ||
| 1105 | ); | ||
| 1106 | } | ||
| 1107 | } | ||
| 1014 | RelayEvent::Closed(reason) => { | 1108 | RelayEvent::Closed(reason) => { |
| 1015 | // CLOSED message means one subscription was closed, not the whole connection | 1109 | // CLOSED message means one subscription was closed, not the whole connection |
| 1016 | // This is normal behavior (e.g., when historic_sync completes) | 1110 | // This is normal behavior (e.g., when historic_sync completes) |
| @@ -1901,6 +1995,63 @@ impl SyncManager { | |||
| 1901 | } | 1995 | } |
| 1902 | } | 1996 | } |
| 1903 | 1997 | ||
| 1998 | /// Check for rate-limited relays that have exceeded cooldown | ||
| 1999 | /// | ||
| 2000 | /// This method is called periodically by run_rate_limit_checker (every 1 second). | ||
| 2001 | /// For each relay in RateLimited state that has exceeded the 65-second cooldown: | ||
| 2002 | /// 1. Clears the rate limit state (sets to Healthy) | ||
| 2003 | /// 2. Recomputes required actions for that relay | ||
| 2004 | /// 3. Submits those actions | ||
| 2005 | async fn check_rate_limit_recovery(&mut self) { | ||
| 2006 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | ||
| 2007 | |||
| 2008 | // Exit rate limiting for relays whose cooldown has expired | ||
| 2009 | let relays_to_recover: Vec<String> = self.health_tracker.exit_expired_rate_limits(); | ||
| 2010 | |||
| 2011 | if relays_to_recover.is_empty() { | ||
| 2012 | return; | ||
| 2013 | } | ||
| 2014 | |||
| 2015 | // Recompute actions - could optimise by adding relays: Option<&[]> to derive_relay_targets | ||
| 2016 | let repo_index = self.repo_sync_index.read().await; | ||
| 2017 | let targets = derive_relay_targets(&repo_index); | ||
| 2018 | drop(repo_index); | ||
| 2019 | |||
| 2020 | for relay_url in relays_to_recover { | ||
| 2021 | tracing::info!( | ||
| 2022 | relay = %relay_url, | ||
| 2023 | "Rate limit cooldown expired, recovering" | ||
| 2024 | ); | ||
| 2025 | |||
| 2026 | // Clear rate limit state | ||
| 2027 | self.health_tracker.clear_rate_limit(&relay_url); | ||
| 2028 | |||
| 2029 | // Only compute actions for this specific relay | ||
| 2030 | if let Some(relay_needs) = targets.get(&relay_url) { | ||
| 2031 | let mut single_relay_targets = std::collections::HashMap::new(); | ||
| 2032 | single_relay_targets.insert(relay_url.clone(), relay_needs.clone()); | ||
| 2033 | |||
| 2034 | let pending = self.pending_sync_index.read().await; | ||
| 2035 | let confirmed = self.relay_sync_index.read().await; | ||
| 2036 | |||
| 2037 | let actions = compute_actions(&single_relay_targets, &pending, &confirmed); | ||
| 2038 | drop(pending); | ||
| 2039 | drop(confirmed); | ||
| 2040 | |||
| 2041 | // Submit each action | ||
| 2042 | for action in actions { | ||
| 2043 | tracing::info!( | ||
| 2044 | relay = %action.relay_url, | ||
| 2045 | repo_count = action.items.repos.len(), | ||
| 2046 | event_count = action.items.root_events.len(), | ||
| 2047 | "Submitting recovered actions after rate limit" | ||
| 2048 | ); | ||
| 2049 | self.handle_new_sync_filters(action).await; | ||
| 2050 | } | ||
| 2051 | } | ||
| 2052 | } | ||
| 2053 | } | ||
| 2054 | |||
| 1904 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex | 2055 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex |
| 1905 | /// | 2056 | /// |
| 1906 | /// This method applies limit(0) to all filters to receive ONLY new events. | 2057 | /// This method applies limit(0) to all filters to receive ONLY new events. |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 5a61777..d69e1ce 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -27,6 +27,8 @@ pub enum RelayEvent { | |||
| 27 | Event(Event, SubscriptionId), | 27 | Event(Event, SubscriptionId), |
| 28 | /// End of stored events for a subscription | 28 | /// End of stored events for a subscription |
| 29 | EndOfStoredEvents(SubscriptionId), | 29 | EndOfStoredEvents(SubscriptionId), |
| 30 | /// NOTICE message from relay | ||
| 31 | Notice(String), | ||
| 30 | /// Connection was closed | 32 | /// Connection was closed |
| 31 | Closed(String), | 33 | Closed(String), |
| 32 | /// Shutdown notification | 34 | /// Shutdown notification |
| @@ -238,6 +240,11 @@ impl RelayConnection { | |||
| 238 | break; | 240 | break; |
| 239 | } | 241 | } |
| 240 | } | 242 | } |
| 243 | RelayMessage::Notice(msg) => { | ||
| 244 | tracing::debug!(relay = %url, message = %msg, "Received NOTICE"); | ||
| 245 | let _ = event_sender.send(RelayEvent::Notice(msg.to_string())).await; | ||
| 246 | // Don't break - continue processing events | ||
| 247 | } | ||
| 241 | RelayMessage::Closed { message: msg, .. } => { | 248 | RelayMessage::Closed { message: msg, .. } => { |
| 242 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | 249 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); |
| 243 | let _ = | 250 | let _ = |