From 541f34a207047b26547154e7d631005d456f12fd Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 22 Dec 2025 14:23:46 +0000 Subject: sync: add req rate-limit detection and cooldown --- src/sync/health.rs | 300 +++++++++++++++++++++++++++++++++++++------ src/sync/metrics.rs | 16 ++- src/sync/mod.rs | 215 ++++++++++++++++++++++++++----- src/sync/relay_connection.rs | 7 + 4 files changed, 459 insertions(+), 79 deletions(-) (limited to 'src/sync') diff --git a/src/sync/health.rs b/src/sync/health.rs index d919a80..a10427f 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs @@ -1,15 +1,17 @@ //! Relay Health Tracking for GRASP-02 Proactive Sync //! //! This module implements health tracking for relay connections, including: -//! - Health state machine (Healthy -> Degraded -> Dead) +//! - Health state machine (Healthy -> Degraded -> Dead -> RateLimited) //! - Exponential backoff with configurable max delay //! - Dead relay detection after 24h of continuous failures +//! - Rate limit detection and fixed cooldown period //! //! ## Health States //! //! - **Healthy**: Working connection, no recent failures //! - **Degraded**: Connection failed, retrying with backoff //! - **Dead**: 24h+ of continuous failures, minimal retry (once per day) +//! - **RateLimited**: NOTICE-triggered 65-second cooldown to avoid rate limits use std::sync::Arc; use std::time::{Duration, Instant}; @@ -30,37 +32,52 @@ const DEFAULT_MAX_BACKOFF_SECS: u64 = 3600; /// Default base backoff duration in seconds const DEFAULT_BASE_BACKOFF_SECS: u64 = 5; +/// Rate limit cooldown duration in seconds (65 seconds = typical 60s limit + buffer) +const RATE_LIMIT_COOLDOWN_SECS: u64 = 65; + +/// Stability period after recovery before marking relay as fully healthy (5 minutes) +/// A relay must maintain connection for this duration after failures before being marked Healthy +const STABILITY_PERIOD_SECS: u64 = 300; + /// Health state of a relay connection #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum HealthState { - /// Working connection, no recent failures + /// Working connection, no recent failures, proven stable Healthy, - /// Connection failed, retrying with exponential backoff + /// Not currently connected, but no recent failures or issues + Disconnected, + /// Connection problems: failing to connect OR recently recovered but not yet stable Degraded, /// 24h+ of continuous failures, minimal retry Dead, + /// Rate limited by relay, temporary cooldown active + RateLimited, } impl std::fmt::Display for HealthState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { HealthState::Healthy => write!(f, "healthy"), + HealthState::Disconnected => write!(f, "disconnected"), HealthState::Degraded => write!(f, "degraded"), HealthState::Dead => write!(f, "dead"), + HealthState::RateLimited => write!(f, "rate_limited"), } } } /// Health information for a single relay -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct RelayHealth { - /// Current health state - pub state: HealthState, + /// Are we currently connected to this relay + pub connected: bool, + /// Has this relay sent us a rate-limiting NOTICE recently + pub rate_limited: bool, /// Number of consecutive connection failures pub consecutive_failures: u32, /// Time of the first failure in the current failure streak pub first_failure_time: Option, - /// Time of the last failure + /// Time of the last failure (kept after recovery for stability period tracking) pub last_failure_time: Option, /// Time of the last successful connection pub last_success_time: Option, @@ -70,25 +87,122 @@ pub struct RelayHealth { pub next_retry_at: Option, } -impl Default for RelayHealth { - fn default() -> Self { - Self { - state: HealthState::Healthy, - consecutive_failures: 0, - first_failure_time: None, - last_failure_time: None, - last_success_time: None, - last_attempt_time: None, - next_retry_at: None, - } - } -} - impl RelayHealth { - /// Create a new RelayHealth with healthy state + /// Create a new RelayHealth with default values pub fn new() -> Self { Self::default() } + + /// Get the current health state based on the relay's properties + /// + /// State is computed dynamically from: + /// - Rate limit status + /// - Connection status + /// - Failure history and timing + /// - Stability period after recovery + /// + /// ## State Logic + /// + /// 1. **RateLimited**: If rate_limited flag is set and cooldown hasn't expired + /// 2. **Dead**: 24+ hours of continuous failures + /// 3. **Degraded**: Active connection failures OR in stability period after recovery + /// 4. **Disconnected**: Not connected, but no recent failures or issues + /// 5. **Healthy**: Connected and stable (past stability period with no failures) + pub fn state(&self) -> HealthState { + let now = Instant::now(); + + // Check rate limiting first (highest priority) + if self.rate_limited { + if let Some(next_retry) = self.next_retry_at { + if now < next_retry { + return HealthState::RateLimited; + } + } + } + + // Check for dead state (24+ hours of failures) + if let Some(first_failure) = self.first_failure_time { + let failure_duration = now.duration_since(first_failure); + let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); + if failure_duration >= dead_threshold { + return HealthState::Dead; + } + } + + // Check if we have active failures (currently failing to connect) + if self.consecutive_failures > 0 { + return HealthState::Degraded; + } + + // Check if we're in stability period after recovery + // (recovered from failures but not yet proven stable) + if let (Some(last_success), Some(last_failure)) = (self.last_success_time, self.last_failure_time) { + // Only consider stability period if recovery happened after the last failure + if last_success > last_failure { + let time_since_recovery = now.duration_since(last_success); + let stability_period = Duration::from_secs(STABILITY_PERIOD_SECS); + + if time_since_recovery < stability_period { + // Still in stability period - remain degraded to prove stability + return HealthState::Degraded; + } + } + } + + // Check connection status for final state + if self.connected { + // Connected and stable (no failures, past stability period) + HealthState::Healthy + } else { + // Not connected, but no recent failures - just disconnected + HealthState::Disconnected + } + } + + /// Check if the relay is currently connected + pub fn is_connected(&self) -> bool { + self.connected + } + + /// Check if the relay is currently rate limited (cooldown active) + pub fn is_rate_limited_now(&self) -> bool { + if !self.rate_limited { + return false; + } + if let Some(next_retry) = self.next_retry_at { + Instant::now() < next_retry + } else { + false + } + } + + /// Get the consecutive failure count + pub fn failure_count(&self) -> u32 { + self.consecutive_failures + } + + /// Get time since last successful connection + pub fn time_since_last_success(&self) -> Option { + self.last_success_time + .map(|t| Instant::now().duration_since(t)) + } + + /// Get time since first failure in current streak + pub fn time_since_first_failure(&self) -> Option { + self.first_failure_time + .map(|t| Instant::now().duration_since(t)) + } + + /// Get remaining backoff/cooldown duration + pub fn remaining_backoff(&self) -> Option { + let next_retry = self.next_retry_at?; + let now = Instant::now(); + if now >= next_retry { + None + } else { + Some(next_retry - now) + } + } } /// Thread-safe relay health tracker using DashMap @@ -148,16 +262,17 @@ impl RelayHealthTracker { /// Record a successful connection to a relay /// - /// Resets the relay to Healthy state and clears failure counters. + /// Clears failure counters and rate limiting. Sets connected = true. pub fn record_success(&self, relay_url: &str) { let now = Instant::now(); let mut entry = self.health.entry(relay_url.to_string()).or_default(); let health = entry.value_mut(); - let old_state = health.state; + let old_state = health.state(); // Reset to healthy state - health.state = HealthState::Healthy; + health.connected = true; + health.rate_limited = false; health.consecutive_failures = 0; health.first_failure_time = None; health.last_failure_time = None; @@ -176,13 +291,17 @@ impl RelayHealthTracker { /// Record a connection failure for a relay /// - /// Increments failure counter, updates state, and calculates next retry time. + /// Increments failure counter and calculates next retry time with exponential backoff. + /// Sets connected = false. pub fn record_failure(&self, relay_url: &str) { let now = Instant::now(); let mut entry = self.health.entry(relay_url.to_string()).or_default(); let health = entry.value_mut(); - let old_state = health.state; + let old_state = health.state(); + + // Mark as disconnected + health.connected = false; // Set first_failure_time if this is a new failure streak if health.first_failure_time.is_none() { @@ -192,18 +311,18 @@ impl RelayHealthTracker { health.consecutive_failures = health.consecutive_failures.saturating_add(1); health.last_failure_time = Some(now); - // Check if we should transition to Dead state + // Calculate backoff based on whether we're dead or degraded if let Some(first_failure) = health.first_failure_time { let failure_duration = now.duration_since(first_failure); let dead_threshold = Duration::from_secs(DEAD_THRESHOLD_HOURS * 3600); if failure_duration >= dead_threshold { - health.state = HealthState::Dead; // Dead relays retry once per day health.next_retry_at = Some(now + Duration::from_secs(DEAD_RETRY_INTERVAL_HOURS * 3600)); - if old_state != HealthState::Dead { + let new_state = health.state(); + if old_state != HealthState::Dead && new_state == HealthState::Dead { tracing::warn!( "Relay {} marked dead after 24h failures ({} consecutive failures)", relay_url, @@ -212,15 +331,21 @@ impl RelayHealthTracker { } } else { // Degraded state with exponential backoff - health.state = HealthState::Degraded; let backoff = Self::get_backoff_duration( health.consecutive_failures, self.base_backoff_secs, self.max_backoff_secs, ); - health.next_retry_at = Some(now + backoff); + // Respect existing next_retry_at if it's later (e.g., from rate limiting) + let new_retry_at = now + backoff; + health.next_retry_at = Some( + health.next_retry_at + .unwrap_or(new_retry_at) + .max(new_retry_at) + ); - if old_state != HealthState::Degraded { + let new_state = health.state(); + if old_state != HealthState::Degraded && new_state == HealthState::Degraded { tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); } else { tracing::debug!( @@ -234,6 +359,91 @@ impl RelayHealthTracker { } } + /// Record a rate limit NOTICE from a relay + /// + /// Sets the relay to RateLimited state with a fixed 65-second cooldown. + /// This is distinct from connection failures (Degraded state) - it's triggered + /// by NOTICE messages from the relay indicating we're sending too many requests. + pub fn record_rate_limit(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + + health.rate_limited = true; + health.next_retry_at = Some(now + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS)); + + tracing::warn!( + relay = %relay_url, + cooldown_secs = RATE_LIMIT_COOLDOWN_SECS, + "Relay rate limited, pausing new subscriptions" + ); + } + + /// Clear rate limiting state for a specific relay + /// + /// This only clears the rate_limited flag, without affecting connection status + /// or failure counters. Use this when rate limit cooldown has expired and we + /// want to allow new subscriptions. + /// + /// This is different from `record_success()` which resets all health state. + pub fn clear_rate_limit(&self, relay_url: &str) { + if let Some(mut entry) = self.health.get_mut(relay_url) { + let health = entry.value_mut(); + health.rate_limited = false; + } + } + + + /// Check if relay is currently rate limited + /// + /// Returns true if the relay is in RateLimited state and the cooldown period + /// has not yet expired. Once the cooldown expires, this returns false and the + /// relay can accept new subscriptions again. + pub fn is_rate_limited(&self, relay_url: &str) -> bool { + if let Some(entry) = self.health.get(relay_url) { + let health = entry.value(); + health.rate_limited + } else { + false + } + } + + /// Exit rate limiting state for relays whose cooldown has expired + /// + /// Finds all relays that are currently rate limited but whose cooldown period + /// has expired, clears their rate_limited flag, and returns their URLs. + /// + /// This method mutates state by clearing the rate_limited flag for recovered relays. + /// + /// Returns a vector of relay URLs that were recovered from rate limiting. + pub fn exit_expired_rate_limits(&self) -> Vec { + let now = Instant::now(); + let mut recovered_relays = Vec::new(); + + for mut entry in self.health.iter_mut() { + let (url, health) = entry.pair_mut(); + + // Check if rate limited and cooldown has expired + if health.rate_limited { + if let Some(next_retry) = health.next_retry_at { + if now > next_retry { + // Cooldown expired - clear rate limiting + health.rate_limited = false; + health.next_retry_at = None; + recovered_relays.push(url.clone()); + + tracing::info!( + relay = %url, + "Rate limit cooldown expired, relay ready for new subscriptions" + ); + } + } + } + } + + recovered_relays + } + /// Check if a connection attempt should be made to a relay /// /// Returns true if: @@ -248,10 +458,16 @@ impl RelayHealthTracker { Some(entry) => { let health = entry.value(); - match health.state { - HealthState::Healthy => true, - HealthState::Degraded | HealthState::Dead => { - // Check if backoff period has elapsed + // Don't reconnect if currently rate-limited + if health.is_rate_limited_now() { + return false; + } + + // Check state-based logic + match health.state() { + HealthState::Healthy | HealthState::Disconnected => true, + HealthState::Degraded | HealthState::Dead | HealthState::RateLimited => { + // Check if backoff/cooldown period has elapsed match health.next_retry_at { None => true, Some(next_retry) => Instant::now() >= next_retry, @@ -266,7 +482,7 @@ impl RelayHealthTracker { pub fn get_state(&self, relay_url: &str) -> HealthState { self.health .get(relay_url) - .map(|entry| entry.value().state) + .map(|entry| entry.value().state()) .unwrap_or(HealthState::Healthy) } @@ -350,10 +566,12 @@ mod tests { } #[test] - fn test_default_health_is_healthy() { + fn test_default_health_is_disconnected() { let health = RelayHealth::default(); - assert_eq!(health.state, HealthState::Healthy); + // Default state: not connected, no failures = Disconnected + assert_eq!(health.state(), HealthState::Disconnected); assert_eq!(health.consecutive_failures, 0); + assert!(!health.connected); assert!(health.first_failure_time.is_none()); } @@ -504,7 +722,7 @@ mod tests { assert!(health.is_some()); let health = health.unwrap(); - assert_eq!(health.state, HealthState::Healthy); + assert_eq!(health.state(), HealthState::Healthy); assert!(health.last_success_time.is_some()); } diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 22c9192..453a79c 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs @@ -72,7 +72,7 @@ impl SyncMetrics { let relay_status = IntGaugeVec::new( Opts::new( "ngit_sync_relay_status", - "Relay health status (1=healthy, 2=degraded, 3=dead)", + "Relay health status (1=healthy, 2=disconnected, 3=degraded, 4=dead, 5=rate_limited)", ), &["relay"], )?; @@ -178,9 +178,11 @@ impl SyncMetrics { /// Record relay health state change. /// /// Maps health states to numeric values for Prometheus: - /// - Healthy = 1 - /// - Degraded = 2 - /// - Dead = 3 + /// - Healthy = 1 (connected and stable) + /// - Disconnected = 2 (not connected, but no issues) + /// - Degraded = 3 (connection problems or unstable after recovery) + /// - Dead = 4 (24h+ of failures) + /// - RateLimited = 5 (rate limit cooldown active) /// /// # Arguments /// @@ -189,8 +191,10 @@ impl SyncMetrics { pub fn record_health_state(&self, relay: &str, state: HealthState) { let state_value = match state { HealthState::Healthy => 1, - HealthState::Degraded => 2, - HealthState::Dead => 3, + HealthState::Disconnected => 2, + HealthState::Degraded => 3, + HealthState::Dead => 4, + HealthState::RateLimited => 5, }; self.relay_status .with_label_values(&[relay]) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6f59b19..1f95ff7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -33,6 +33,7 @@ pub use self_subscriber::SelfSubscriber; // Re-export health tracking types pub use health::RelayHealthTracker; +use tokio::time::sleep; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -303,42 +304,59 @@ async fn run_daily_timer( } } -// ============================================================================= -// Disconnect Checker -// ============================================================================= +// Combined Health and Metrics Checker -/// Run the disconnect checker for periodic cleanup of empty relays +/// Run the combined health and metrics checker /// -/// This function runs in a loop, checking at the configured interval for relays -/// that have no repos or root events to sync. Non-bootstrap relays -/// that are empty will be disconnected to free up resources. +/// This function runs in a loop with a 2-second interval, performing three tasks: +/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones +/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired +/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker /// -/// Bootstrap relays are never disconnected, even if empty. +/// The metrics update ensures that health states are kept current in metrics even when +/// they change due to timeouts, cooldowns expiring, or stability periods completing. /// -/// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` -/// (default: 60 seconds). Set to a lower value for faster reconnection testing. -async fn run_disconnect_checker( +/// The 2-second interval provides a good balance between responsiveness and overhead. +/// While disconnect checking traditionally ran at 60s intervals, the faster cadence here +/// is acceptable since the operations are lightweight (just index checks, no I/O). +async fn run_health_and_metrics_checker( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, - check_interval_secs: u64, ) { - let interval = Duration::from_secs(check_interval_secs); - tracing::info!( - interval_secs = check_interval_secs, - "Disconnect checker started with configured interval" - ); + let interval = Duration::from_secs(2); + tracing::info!("Health and metrics checker started with 2s interval"); loop { tokio::select! { _ = tokio::time::sleep(interval) => { - tracing::debug!("Disconnect checker running"); + tracing::debug!("Health and metrics checker running"); let mut manager = sync_manager.lock().await; + + // 1. Check for disconnects and retry disconnected relays manager.check_disconnects().await; manager.retry_disconnected_relays().await; + + // 2. Check for rate limit recovery + manager.check_rate_limit_recovery().await; + + // 3. Update metrics with current health states + if let Some(ref metrics) = manager.metrics { + // Get all tracked relay URLs + let relay_urls: Vec = { + let index = manager.relay_sync_index.read().await; + index.keys().cloned().collect() + }; + + // Update health state for each relay + for relay_url in relay_urls { + let state = manager.health_tracker.get_state(&relay_url); + metrics.record_health_state(&relay_url, state); + } + } } _ = shutdown_rx.recv() => { - tracing::info!("Disconnect checker received shutdown signal"); + tracing::info!("Health and metrics checker received shutdown signal"); break; } } @@ -510,6 +528,45 @@ impl SyncManager { // Drop the lock before async operations drop(pending); + // Wait for rate limiting to clear before pagination continues + if self.health_tracker.is_rate_limited(relay_url) { + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + "Relay is rate limited, waiting before pagination" + ); + + // Loop until rate limit clears, sleeping with jitter between checks + while self.health_tracker.is_rate_limited(relay_url) { + let jitter_secs = 1 + (rand::random::() % 5); // 1-5 seconds + sleep(Duration::from_secs(jitter_secs)).await; + } + + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + "Rate limit cleared, continuing pagination" + ); + let batch_exists = { + let pending = self.pending_sync_index.read().await; + pending + .get(&relay_url_for_pagination) + .map(|batches| batches.iter().any(|b| b.batch_id == batch_id)) + .unwrap_or(false) + }; + + // If we were rate limited, verify batch still exists after waiting + // (batches are wiped during disconnect, so avoid orphaned pagination) + if !batch_exists { + tracing::debug!( + relay = %relay_url_for_pagination, + batch_id = batch_id, + "Batch no longer exists after rate limit wait, skipping pagination" + ); + return; + } + } + // Subscribe to next page and add to outstanding_subs if let Some(conn) = self.connections.get(&relay_url_for_pagination) { match conn.subscribe_filter(next_filter.clone(), true).await { @@ -752,29 +809,22 @@ impl SyncManager { self.try_connect_relay(bootstrap_url).await; } - // 7. Capture config values before moving self into Arc - let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs; - - // 8. Wrap self in Arc for sharing with timer task + // 7. Wrap self in Arc for sharing with timer task let sync_manager = Arc::new(Mutex::new(self)); - // 9. Spawn daily timer task with shutdown receiver + // 8. Spawn daily timer task with shutdown receiver let timer_manager = Arc::clone(&sync_manager); let timer_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { run_daily_timer(timer_manager, timer_shutdown).await; }); - // 10. Spawn disconnect checker task with shutdown receiver + // 9. Spawn health and metrics checker task with shutdown receiver + // This combines disconnect checking, rate limit recovery, and metrics updates let checker_manager = Arc::clone(&sync_manager); let checker_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_disconnect_checker( - checker_manager, - checker_shutdown, - disconnect_check_interval_secs, - ) - .await; + run_health_and_metrics_checker(checker_manager, checker_shutdown).await; }); // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications @@ -876,7 +926,18 @@ impl SyncManager { } } - // Step 2: Check if consolidation is needed BEFORE adding new filters + // Step 2: Check if relay is rate-limited before creating new pending items + if self.health_tracker.is_rate_limited(&action.relay_url) { + tracing::debug!( + relay = %action.relay_url, + repos = action.items.repos.len(), + root_events = action.items.root_events.len(), + "Skipping AddFilters for rate-limited relay, will recompute after cooldown" + ); + return; + } + + // Step 3: Check if consolidation is needed BEFORE adding new filters self.maybe_consolidate(&action.relay_url, action.filters.len()) .await; @@ -954,6 +1015,7 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); + let health_tracker = Arc::clone(&self.health_tracker); tokio::spawn(async move { let mut disconnect_sent = false; @@ -1011,6 +1073,38 @@ impl SyncManager { }) .await; } + RelayEvent::Notice(notice) => { + // Check for rate limiting indicators + let notice_lower = notice.to_lowercase(); + let is_rate_limit = (notice_lower.contains("rate") + && notice_lower.contains("limit")) + || notice_lower.contains("too many") + || notice_lower.contains("slow down") + || notice_lower.contains("throttl"); + + if is_rate_limit { + tracing::warn!( + relay = %relay_url_clone, + notice = %notice, + "Rate limiting NOTICE detected from relay" + ); + + // Mark relay as rate limited + health_tracker.record_rate_limit(&relay_url_clone); + + // Update metrics with new health state + if let Some(ref metrics) = metrics_clone { + let state = health_tracker.get_state(&relay_url_clone); + metrics.record_health_state(&relay_url_clone, state); + } + } else { + tracing::debug!( + relay = %relay_url_clone, + notice = %notice, + "Relay issued notice" + ); + } + } RelayEvent::Closed(reason) => { // CLOSED message means one subscription was closed, not the whole connection // This is normal behavior (e.g., when historic_sync completes) @@ -1901,6 +1995,63 @@ impl SyncManager { } } + /// Check for rate-limited relays that have exceeded cooldown + /// + /// This method is called periodically by run_rate_limit_checker (every 1 second). + /// For each relay in RateLimited state that has exceeded the 65-second cooldown: + /// 1. Clears the rate limit state (sets to Healthy) + /// 2. Recomputes required actions for that relay + /// 3. Submits those actions + async fn check_rate_limit_recovery(&mut self) { + use crate::sync::algorithms::{compute_actions, derive_relay_targets}; + + // Exit rate limiting for relays whose cooldown has expired + let relays_to_recover: Vec = self.health_tracker.exit_expired_rate_limits(); + + if relays_to_recover.is_empty() { + return; + } + + // Recompute actions - could optimise by adding relays: Option<&[]> to derive_relay_targets + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + drop(repo_index); + + for relay_url in relays_to_recover { + tracing::info!( + relay = %relay_url, + "Rate limit cooldown expired, recovering" + ); + + // Clear rate limit state + self.health_tracker.clear_rate_limit(&relay_url); + + // Only compute actions for this specific relay + if let Some(relay_needs) = targets.get(&relay_url) { + let mut single_relay_targets = std::collections::HashMap::new(); + single_relay_targets.insert(relay_url.clone(), relay_needs.clone()); + + let pending = self.pending_sync_index.read().await; + let confirmed = self.relay_sync_index.read().await; + + let actions = compute_actions(&single_relay_targets, &pending, &confirmed); + drop(pending); + drop(confirmed); + + // Submit each action + for action in actions { + tracing::info!( + relay = %action.relay_url, + repo_count = action.items.repos.len(), + event_count = action.items.root_events.len(), + "Submitting recovered actions after rate limit" + ); + self.handle_new_sync_filters(action).await; + } + } + } + } + /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex /// /// This method applies limit(0) to all filters to receive ONLY new events. diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 5a61777..d69e1ce 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -27,6 +27,8 @@ pub enum RelayEvent { Event(Event, SubscriptionId), /// End of stored events for a subscription EndOfStoredEvents(SubscriptionId), + /// NOTICE message from relay + Notice(String), /// Connection was closed Closed(String), /// Shutdown notification @@ -238,6 +240,11 @@ impl RelayConnection { break; } } + RelayMessage::Notice(msg) => { + tracing::debug!(relay = %url, message = %msg, "Received NOTICE"); + let _ = event_sender.send(RelayEvent::Notice(msg.to_string())).await; + // Don't break - continue processing events + } RelayMessage::Closed { message: msg, .. } => { tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); let _ = -- cgit v1.2.3