diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:23:46 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:23:46 +0000 |
| commit | 541f34a207047b26547154e7d631005d456f12fd (patch) | |
| tree | 446cffc4b3bbc32bf61933b5ab41a044a35d6f3b /src/sync/mod.rs | |
| parent | b10a6cc91dab4c3d83d62fe8cb357c78f2cd4d1e (diff) | |
sync: add req rate-limit detection and cooldown
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 215 |
1 files changed, 183 insertions, 32 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6f59b19..1f95ff7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -33,6 +33,7 @@ pub use self_subscriber::SelfSubscriber; | |||
| 33 | 33 | ||
| 34 | // Re-export health tracking types | 34 | // Re-export health tracking types |
| 35 | pub use health::RelayHealthTracker; | 35 | pub use health::RelayHealthTracker; |
| 36 | use tokio::time::sleep; | ||
| 36 | 37 | ||
| 37 | use std::collections::{HashMap, HashSet}; | 38 | use std::collections::{HashMap, HashSet}; |
| 38 | use std::sync::Arc; | 39 | use std::sync::Arc; |
| @@ -303,42 +304,59 @@ async fn run_daily_timer( | |||
| 303 | } | 304 | } |
| 304 | } | 305 | } |
| 305 | 306 | ||
| 306 | // ============================================================================= | 307 | // Combined Health and Metrics Checker |
| 307 | // Disconnect Checker | ||
| 308 | // ============================================================================= | ||
| 309 | 308 | ||
| 310 | /// Run the disconnect checker for periodic cleanup of empty relays | 309 | /// Run the combined health and metrics checker |
| 311 | /// | 310 | /// |
| 312 | /// This function runs in a loop, checking at the configured interval for relays | 311 | /// This function runs in a loop with a 2-second interval, performing three tasks: |
| 313 | /// that have no repos or root events to sync. Non-bootstrap relays | 312 | /// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones |
| 314 | /// that are empty will be disconnected to free up resources. | 313 | /// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired |
| 314 | /// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker | ||
| 315 | /// | 315 | /// |
| 316 | /// Bootstrap relays are never disconnected, even if empty. | 316 | /// The metrics update ensures that health states are kept current in metrics even when |
| 317 | /// they change due to timeouts, cooldowns expiring, or stability periods completing. | ||
| 317 | /// | 318 | /// |
| 318 | /// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` | 319 | /// The 2-second interval provides a good balance between responsiveness and overhead. |
| 319 | /// (default: 60 seconds). Set to a lower value for faster reconnection testing. | 320 | /// While disconnect checking traditionally ran at 60s intervals, the faster cadence here |
| 320 | async fn run_disconnect_checker( | 321 | /// is acceptable since the operations are lightweight (just index checks, no I/O). |
| 322 | async fn run_health_and_metrics_checker( | ||
| 321 | sync_manager: Arc<Mutex<SyncManager>>, | 323 | sync_manager: Arc<Mutex<SyncManager>>, |
| 322 | mut shutdown_rx: broadcast::Receiver<()>, | 324 | mut shutdown_rx: broadcast::Receiver<()>, |
| 323 | check_interval_secs: u64, | ||
| 324 | ) { | 325 | ) { |
| 325 | let interval = Duration::from_secs(check_interval_secs); | 326 | let interval = Duration::from_secs(2); |
| 326 | tracing::info!( | 327 | tracing::info!("Health and metrics checker started with 2s interval"); |
| 327 | interval_secs = check_interval_secs, | ||
| 328 | "Disconnect checker started with configured interval" | ||
| 329 | ); | ||
| 330 | 328 | ||
| 331 | loop { | 329 | loop { |
| 332 | tokio::select! { | 330 | tokio::select! { |
| 333 | _ = tokio::time::sleep(interval) => { | 331 | _ = tokio::time::sleep(interval) => { |
| 334 | tracing::debug!("Disconnect checker running"); | 332 | tracing::debug!("Health and metrics checker running"); |
| 335 | 333 | ||
| 336 | let mut manager = sync_manager.lock().await; | 334 | let mut manager = sync_manager.lock().await; |
| 335 | |||
| 336 | // 1. Check for disconnects and retry disconnected relays | ||
| 337 | manager.check_disconnects().await; | 337 | manager.check_disconnects().await; |
| 338 | manager.retry_disconnected_relays().await; | 338 | manager.retry_disconnected_relays().await; |
| 339 | |||
| 340 | // 2. Check for rate limit recovery | ||
| 341 | manager.check_rate_limit_recovery().await; | ||
| 342 | |||
| 343 | // 3. Update metrics with current health states | ||
| 344 | if let Some(ref metrics) = manager.metrics { | ||
| 345 | // Get all tracked relay URLs | ||
| 346 | let relay_urls: Vec<String> = { | ||
| 347 | let index = manager.relay_sync_index.read().await; | ||
| 348 | index.keys().cloned().collect() | ||
| 349 | }; | ||
| 350 | |||
| 351 | // Update health state for each relay | ||
| 352 | for relay_url in relay_urls { | ||
| 353 | let state = manager.health_tracker.get_state(&relay_url); | ||
| 354 | metrics.record_health_state(&relay_url, state); | ||
| 355 | } | ||
| 356 | } | ||
| 339 | } | 357 | } |
| 340 | _ = shutdown_rx.recv() => { | 358 | _ = shutdown_rx.recv() => { |
| 341 | tracing::info!("Disconnect checker received shutdown signal"); | 359 | tracing::info!("Health and metrics checker received shutdown signal"); |
| 342 | break; | 360 | break; |
| 343 | } | 361 | } |
| 344 | } | 362 | } |
| @@ -510,6 +528,45 @@ impl SyncManager { | |||
| 510 | // Drop the lock before async operations | 528 | // Drop the lock before async operations |
| 511 | drop(pending); | 529 | drop(pending); |
| 512 | 530 | ||
| 531 | // Wait for rate limiting to clear before pagination continues | ||
| 532 | if self.health_tracker.is_rate_limited(relay_url) { | ||
| 533 | tracing::debug!( | ||
| 534 | relay = %relay_url, | ||
| 535 | batch_id = batch_id, | ||
| 536 | "Relay is rate limited, waiting before pagination" | ||
| 537 | ); | ||
| 538 | |||
| 539 | // Loop until rate limit clears, sleeping with jitter between checks | ||
| 540 | while self.health_tracker.is_rate_limited(relay_url) { | ||
| 541 | let jitter_secs = 1 + (rand::random::<u64>() % 5); // 1-5 seconds | ||
| 542 | sleep(Duration::from_secs(jitter_secs)).await; | ||
| 543 | } | ||
| 544 | |||
| 545 | tracing::debug!( | ||
| 546 | relay = %relay_url, | ||
| 547 | batch_id = batch_id, | ||
| 548 | "Rate limit cleared, continuing pagination" | ||
| 549 | ); | ||
| 550 | let batch_exists = { | ||
| 551 | let pending = self.pending_sync_index.read().await; | ||
| 552 | pending | ||
| 553 | .get(&relay_url_for_pagination) | ||
| 554 | .map(|batches| batches.iter().any(|b| b.batch_id == batch_id)) | ||
| 555 | .unwrap_or(false) | ||
| 556 | }; | ||
| 557 | |||
| 558 | // If we were rate limited, verify batch still exists after waiting | ||
| 559 | // (batches are wiped during disconnect, so avoid orphaned pagination) | ||
| 560 | if !batch_exists { | ||
| 561 | tracing::debug!( | ||
| 562 | relay = %relay_url_for_pagination, | ||
| 563 | batch_id = batch_id, | ||
| 564 | "Batch no longer exists after rate limit wait, skipping pagination" | ||
| 565 | ); | ||
| 566 | return; | ||
| 567 | } | ||
| 568 | } | ||
| 569 | |||
| 513 | // Subscribe to next page and add to outstanding_subs | 570 | // Subscribe to next page and add to outstanding_subs |
| 514 | if let Some(conn) = self.connections.get(&relay_url_for_pagination) { | 571 | if let Some(conn) = self.connections.get(&relay_url_for_pagination) { |
| 515 | match conn.subscribe_filter(next_filter.clone(), true).await { | 572 | match conn.subscribe_filter(next_filter.clone(), true).await { |
| @@ -752,29 +809,22 @@ impl SyncManager { | |||
| 752 | self.try_connect_relay(bootstrap_url).await; | 809 | self.try_connect_relay(bootstrap_url).await; |
| 753 | } | 810 | } |
| 754 | 811 | ||
| 755 | // 7. Capture config values before moving self into Arc | 812 | // 7. Wrap self in Arc<Mutex> for sharing with timer task |
| 756 | let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs; | ||
| 757 | |||
| 758 | // 8. Wrap self in Arc<Mutex> for sharing with timer task | ||
| 759 | let sync_manager = Arc::new(Mutex::new(self)); | 813 | let sync_manager = Arc::new(Mutex::new(self)); |
| 760 | 814 | ||
| 761 | // 9. Spawn daily timer task with shutdown receiver | 815 | // 8. Spawn daily timer task with shutdown receiver |
| 762 | let timer_manager = Arc::clone(&sync_manager); | 816 | let timer_manager = Arc::clone(&sync_manager); |
| 763 | let timer_shutdown = shutdown_tx.subscribe(); | 817 | let timer_shutdown = shutdown_tx.subscribe(); |
| 764 | tokio::spawn(async move { | 818 | tokio::spawn(async move { |
| 765 | run_daily_timer(timer_manager, timer_shutdown).await; | 819 | run_daily_timer(timer_manager, timer_shutdown).await; |
| 766 | }); | 820 | }); |
| 767 | 821 | ||
| 768 | // 10. Spawn disconnect checker task with shutdown receiver | 822 | // 9. Spawn health and metrics checker task with shutdown receiver |
| 823 | // This combines disconnect checking, rate limit recovery, and metrics updates | ||
| 769 | let checker_manager = Arc::clone(&sync_manager); | 824 | let checker_manager = Arc::clone(&sync_manager); |
| 770 | let checker_shutdown = shutdown_tx.subscribe(); | 825 | let checker_shutdown = shutdown_tx.subscribe(); |
| 771 | tokio::spawn(async move { | 826 | tokio::spawn(async move { |
| 772 | run_disconnect_checker( | 827 | run_health_and_metrics_checker(checker_manager, checker_shutdown).await; |
| 773 | checker_manager, | ||
| 774 | checker_shutdown, | ||
| 775 | disconnect_check_interval_secs, | ||
| 776 | ) | ||
| 777 | .await; | ||
| 778 | }); | 828 | }); |
| 779 | 829 | ||
| 780 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 830 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| @@ -876,7 +926,18 @@ impl SyncManager { | |||
| 876 | } | 926 | } |
| 877 | } | 927 | } |
| 878 | 928 | ||
| 879 | // Step 2: Check if consolidation is needed BEFORE adding new filters | 929 | // Step 2: Check if relay is rate-limited before creating new pending items |
| 930 | if self.health_tracker.is_rate_limited(&action.relay_url) { | ||
| 931 | tracing::debug!( | ||
| 932 | relay = %action.relay_url, | ||
| 933 | repos = action.items.repos.len(), | ||
| 934 | root_events = action.items.root_events.len(), | ||
| 935 | "Skipping AddFilters for rate-limited relay, will recompute after cooldown" | ||
| 936 | ); | ||
| 937 | return; | ||
| 938 | } | ||
| 939 | |||
| 940 | // Step 3: Check if consolidation is needed BEFORE adding new filters | ||
| 880 | self.maybe_consolidate(&action.relay_url, action.filters.len()) | 941 | self.maybe_consolidate(&action.relay_url, action.filters.len()) |
| 881 | .await; | 942 | .await; |
| 882 | 943 | ||
| @@ -954,6 +1015,7 @@ impl SyncManager { | |||
| 954 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); | 1015 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); |
| 955 | let metrics_clone = self.metrics.clone(); | 1016 | let metrics_clone = self.metrics.clone(); |
| 956 | let pending_sync_index = Arc::clone(&self.pending_sync_index); | 1017 | let pending_sync_index = Arc::clone(&self.pending_sync_index); |
| 1018 | let health_tracker = Arc::clone(&self.health_tracker); | ||
| 957 | 1019 | ||
| 958 | tokio::spawn(async move { | 1020 | tokio::spawn(async move { |
| 959 | let mut disconnect_sent = false; | 1021 | let mut disconnect_sent = false; |
| @@ -1011,6 +1073,38 @@ impl SyncManager { | |||
| 1011 | }) | 1073 | }) |
| 1012 | .await; | 1074 | .await; |
| 1013 | } | 1075 | } |
| 1076 | RelayEvent::Notice(notice) => { | ||
| 1077 | // Check for rate limiting indicators | ||
| 1078 | let notice_lower = notice.to_lowercase(); | ||
| 1079 | let is_rate_limit = (notice_lower.contains("rate") | ||
| 1080 | && notice_lower.contains("limit")) | ||
| 1081 | || notice_lower.contains("too many") | ||
| 1082 | || notice_lower.contains("slow down") | ||
| 1083 | || notice_lower.contains("throttl"); | ||
| 1084 | |||
| 1085 | if is_rate_limit { | ||
| 1086 | tracing::warn!( | ||
| 1087 | relay = %relay_url_clone, | ||
| 1088 | notice = %notice, | ||
| 1089 | "Rate limiting NOTICE detected from relay" | ||
| 1090 | ); | ||
| 1091 | |||
| 1092 | // Mark relay as rate limited | ||
| 1093 | health_tracker.record_rate_limit(&relay_url_clone); | ||
| 1094 | |||
| 1095 | // Update metrics with new health state | ||
| 1096 | if let Some(ref metrics) = metrics_clone { | ||
| 1097 | let state = health_tracker.get_state(&relay_url_clone); | ||
| 1098 | metrics.record_health_state(&relay_url_clone, state); | ||
| 1099 | } | ||
| 1100 | } else { | ||
| 1101 | tracing::debug!( | ||
| 1102 | relay = %relay_url_clone, | ||
| 1103 | notice = %notice, | ||
| 1104 | "Relay issued notice" | ||
| 1105 | ); | ||
| 1106 | } | ||
| 1107 | } | ||
| 1014 | RelayEvent::Closed(reason) => { | 1108 | RelayEvent::Closed(reason) => { |
| 1015 | // CLOSED message means one subscription was closed, not the whole connection | 1109 | // CLOSED message means one subscription was closed, not the whole connection |
| 1016 | // This is normal behavior (e.g., when historic_sync completes) | 1110 | // This is normal behavior (e.g., when historic_sync completes) |
| @@ -1901,6 +1995,63 @@ impl SyncManager { | |||
| 1901 | } | 1995 | } |
| 1902 | } | 1996 | } |
| 1903 | 1997 | ||
| 1998 | /// Check for rate-limited relays that have exceeded cooldown | ||
| 1999 | /// | ||
| 2000 | /// This method is called periodically by run_rate_limit_checker (every 1 second). | ||
| 2001 | /// For each relay in RateLimited state that has exceeded the 65-second cooldown: | ||
| 2002 | /// 1. Clears the rate limit state (sets to Healthy) | ||
| 2003 | /// 2. Recomputes required actions for that relay | ||
| 2004 | /// 3. Submits those actions | ||
| 2005 | async fn check_rate_limit_recovery(&mut self) { | ||
| 2006 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | ||
| 2007 | |||
| 2008 | // Exit rate limiting for relays whose cooldown has expired | ||
| 2009 | let relays_to_recover: Vec<String> = self.health_tracker.exit_expired_rate_limits(); | ||
| 2010 | |||
| 2011 | if relays_to_recover.is_empty() { | ||
| 2012 | return; | ||
| 2013 | } | ||
| 2014 | |||
| 2015 | // Recompute actions - could optimise by adding relays: Option<&[]> to derive_relay_targets | ||
| 2016 | let repo_index = self.repo_sync_index.read().await; | ||
| 2017 | let targets = derive_relay_targets(&repo_index); | ||
| 2018 | drop(repo_index); | ||
| 2019 | |||
| 2020 | for relay_url in relays_to_recover { | ||
| 2021 | tracing::info!( | ||
| 2022 | relay = %relay_url, | ||
| 2023 | "Rate limit cooldown expired, recovering" | ||
| 2024 | ); | ||
| 2025 | |||
| 2026 | // Clear rate limit state | ||
| 2027 | self.health_tracker.clear_rate_limit(&relay_url); | ||
| 2028 | |||
| 2029 | // Only compute actions for this specific relay | ||
| 2030 | if let Some(relay_needs) = targets.get(&relay_url) { | ||
| 2031 | let mut single_relay_targets = std::collections::HashMap::new(); | ||
| 2032 | single_relay_targets.insert(relay_url.clone(), relay_needs.clone()); | ||
| 2033 | |||
| 2034 | let pending = self.pending_sync_index.read().await; | ||
| 2035 | let confirmed = self.relay_sync_index.read().await; | ||
| 2036 | |||
| 2037 | let actions = compute_actions(&single_relay_targets, &pending, &confirmed); | ||
| 2038 | drop(pending); | ||
| 2039 | drop(confirmed); | ||
| 2040 | |||
| 2041 | // Submit each action | ||
| 2042 | for action in actions { | ||
| 2043 | tracing::info!( | ||
| 2044 | relay = %action.relay_url, | ||
| 2045 | repo_count = action.items.repos.len(), | ||
| 2046 | event_count = action.items.root_events.len(), | ||
| 2047 | "Submitting recovered actions after rate limit" | ||
| 2048 | ); | ||
| 2049 | self.handle_new_sync_filters(action).await; | ||
| 2050 | } | ||
| 2051 | } | ||
| 2052 | } | ||
| 2053 | } | ||
| 2054 | |||
| 1904 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex | 2055 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex |
| 1905 | /// | 2056 | /// |
| 1906 | /// This method applies limit(0) to all filters to receive ONLY new events. | 2057 | /// This method applies limit(0) to all filters to receive ONLY new events. |