From 541f34a207047b26547154e7d631005d456f12fd Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 22 Dec 2025 14:23:46 +0000 Subject: sync: add req rate-limit detection and cooldown --- src/sync/mod.rs | 215 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 183 insertions(+), 32 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6f59b19..1f95ff7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -33,6 +33,7 @@ pub use self_subscriber::SelfSubscriber; // Re-export health tracking types pub use health::RelayHealthTracker; +use tokio::time::sleep; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -303,42 +304,59 @@ async fn run_daily_timer( } } -// ============================================================================= -// Disconnect Checker -// ============================================================================= +// Combined Health and Metrics Checker -/// Run the disconnect checker for periodic cleanup of empty relays +/// Run the combined health and metrics checker /// -/// This function runs in a loop, checking at the configured interval for relays -/// that have no repos or root events to sync. Non-bootstrap relays -/// that are empty will be disconnected to free up resources. +/// This function runs in a loop with a 2-second interval, performing three tasks: +/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones +/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired +/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker /// -/// Bootstrap relays are never disconnected, even if empty. +/// The metrics update ensures that health states are kept current in metrics even when +/// they change due to timeouts, cooldowns expiring, or stability periods completing. /// -/// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` -/// (default: 60 seconds). Set to a lower value for faster reconnection testing. -async fn run_disconnect_checker( +/// The 2-second interval provides a good balance between responsiveness and overhead. +/// While disconnect checking traditionally ran at 60s intervals, the faster cadence here +/// is acceptable since the operations are lightweight (just index checks, no I/O). +async fn run_health_and_metrics_checker( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, - check_interval_secs: u64, ) { - let interval = Duration::from_secs(check_interval_secs); - tracing::info!( - interval_secs = check_interval_secs, - "Disconnect checker started with configured interval" - ); + let interval = Duration::from_secs(2); + tracing::info!("Health and metrics checker started with 2s interval"); loop { tokio::select! { _ = tokio::time::sleep(interval) => { - tracing::debug!("Disconnect checker running"); + tracing::debug!("Health and metrics checker running"); let mut manager = sync_manager.lock().await; + + // 1. Check for disconnects and retry disconnected relays manager.check_disconnects().await; manager.retry_disconnected_relays().await; + + // 2. Check for rate limit recovery + manager.check_rate_limit_recovery().await; + + // 3. Update metrics with current health states + if let Some(ref metrics) = manager.metrics { + // Get all tracked relay URLs + let relay_urls: Vec = { + let index = manager.relay_sync_index.read().await; + index.keys().cloned().collect() + }; + + // Update health state for each relay + for relay_url in relay_urls { + let state = manager.health_tracker.get_state(&relay_url); + metrics.record_health_state(&relay_url, state); + } + } } _ = shutdown_rx.recv() => { - tracing::info!("Disconnect checker received shutdown signal"); + tracing::info!("Health and metrics checker received shutdown signal"); break; } } @@ -510,6 +528,45 @@ impl SyncManager { // Drop the lock before async operations drop(pending); + // Wait for rate limiting to clear before pagination continues + if self.health_tracker.is_rate_limited(relay_url) { + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + "Relay is rate limited, waiting before pagination" + ); + + // Loop until rate limit clears, sleeping with jitter between checks + while self.health_tracker.is_rate_limited(relay_url) { + let jitter_secs = 1 + (rand::random::() % 5); // 1-5 seconds + sleep(Duration::from_secs(jitter_secs)).await; + } + + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + "Rate limit cleared, continuing pagination" + ); + let batch_exists = { + let pending = self.pending_sync_index.read().await; + pending + .get(&relay_url_for_pagination) + .map(|batches| batches.iter().any(|b| b.batch_id == batch_id)) + .unwrap_or(false) + }; + + // If we were rate limited, verify batch still exists after waiting + // (batches are wiped during disconnect, so avoid orphaned pagination) + if !batch_exists { + tracing::debug!( + relay = %relay_url_for_pagination, + batch_id = batch_id, + "Batch no longer exists after rate limit wait, skipping pagination" + ); + return; + } + } + // Subscribe to next page and add to outstanding_subs if let Some(conn) = self.connections.get(&relay_url_for_pagination) { match conn.subscribe_filter(next_filter.clone(), true).await { @@ -752,29 +809,22 @@ impl SyncManager { self.try_connect_relay(bootstrap_url).await; } - // 7. Capture config values before moving self into Arc - let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs; - - // 8. Wrap self in Arc for sharing with timer task + // 7. Wrap self in Arc for sharing with timer task let sync_manager = Arc::new(Mutex::new(self)); - // 9. Spawn daily timer task with shutdown receiver + // 8. Spawn daily timer task with shutdown receiver let timer_manager = Arc::clone(&sync_manager); let timer_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { run_daily_timer(timer_manager, timer_shutdown).await; }); - // 10. Spawn disconnect checker task with shutdown receiver + // 9. Spawn health and metrics checker task with shutdown receiver + // This combines disconnect checking, rate limit recovery, and metrics updates let checker_manager = Arc::clone(&sync_manager); let checker_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_disconnect_checker( - checker_manager, - checker_shutdown, - disconnect_check_interval_secs, - ) - .await; + run_health_and_metrics_checker(checker_manager, checker_shutdown).await; }); // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications @@ -876,7 +926,18 @@ impl SyncManager { } } - // Step 2: Check if consolidation is needed BEFORE adding new filters + // Step 2: Check if relay is rate-limited before creating new pending items + if self.health_tracker.is_rate_limited(&action.relay_url) { + tracing::debug!( + relay = %action.relay_url, + repos = action.items.repos.len(), + root_events = action.items.root_events.len(), + "Skipping AddFilters for rate-limited relay, will recompute after cooldown" + ); + return; + } + + // Step 3: Check if consolidation is needed BEFORE adding new filters self.maybe_consolidate(&action.relay_url, action.filters.len()) .await; @@ -954,6 +1015,7 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); + let health_tracker = Arc::clone(&self.health_tracker); tokio::spawn(async move { let mut disconnect_sent = false; @@ -1011,6 +1073,38 @@ impl SyncManager { }) .await; } + RelayEvent::Notice(notice) => { + // Check for rate limiting indicators + let notice_lower = notice.to_lowercase(); + let is_rate_limit = (notice_lower.contains("rate") + && notice_lower.contains("limit")) + || notice_lower.contains("too many") + || notice_lower.contains("slow down") + || notice_lower.contains("throttl"); + + if is_rate_limit { + tracing::warn!( + relay = %relay_url_clone, + notice = %notice, + "Rate limiting NOTICE detected from relay" + ); + + // Mark relay as rate limited + health_tracker.record_rate_limit(&relay_url_clone); + + // Update metrics with new health state + if let Some(ref metrics) = metrics_clone { + let state = health_tracker.get_state(&relay_url_clone); + metrics.record_health_state(&relay_url_clone, state); + } + } else { + tracing::debug!( + relay = %relay_url_clone, + notice = %notice, + "Relay issued notice" + ); + } + } RelayEvent::Closed(reason) => { // CLOSED message means one subscription was closed, not the whole connection // This is normal behavior (e.g., when historic_sync completes) @@ -1901,6 +1995,63 @@ impl SyncManager { } } + /// Check for rate-limited relays that have exceeded cooldown + /// + /// This method is called periodically by run_rate_limit_checker (every 1 second). + /// For each relay in RateLimited state that has exceeded the 65-second cooldown: + /// 1. Clears the rate limit state (sets to Healthy) + /// 2. Recomputes required actions for that relay + /// 3. Submits those actions + async fn check_rate_limit_recovery(&mut self) { + use crate::sync::algorithms::{compute_actions, derive_relay_targets}; + + // Exit rate limiting for relays whose cooldown has expired + let relays_to_recover: Vec = self.health_tracker.exit_expired_rate_limits(); + + if relays_to_recover.is_empty() { + return; + } + + // Recompute actions - could optimise by adding relays: Option<&[]> to derive_relay_targets + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + drop(repo_index); + + for relay_url in relays_to_recover { + tracing::info!( + relay = %relay_url, + "Rate limit cooldown expired, recovering" + ); + + // Clear rate limit state + self.health_tracker.clear_rate_limit(&relay_url); + + // Only compute actions for this specific relay + if let Some(relay_needs) = targets.get(&relay_url) { + let mut single_relay_targets = std::collections::HashMap::new(); + single_relay_targets.insert(relay_url.clone(), relay_needs.clone()); + + let pending = self.pending_sync_index.read().await; + let confirmed = self.relay_sync_index.read().await; + + let actions = compute_actions(&single_relay_targets, &pending, &confirmed); + drop(pending); + drop(confirmed); + + // Submit each action + for action in actions { + tracing::info!( + relay = %action.relay_url, + repo_count = action.items.repos.len(), + event_count = action.items.root_events.len(), + "Submitting recovered actions after rate limit" + ); + self.handle_new_sync_filters(action).await; + } + } + } + } + /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex /// /// This method applies limit(0) to all filters to receive ONLY new events. -- cgit v1.2.3