upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:23:46 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:23:46 +0000
commit541f34a207047b26547154e7d631005d456f12fd (patch)
tree446cffc4b3bbc32bf61933b5ab41a044a35d6f3b /src/sync/mod.rs
parentb10a6cc91dab4c3d83d62fe8cb357c78f2cd4d1e (diff)
sync: add req rate-limit detection and cooldown
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs215
1 files changed, 183 insertions, 32 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 6f59b19..1f95ff7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -33,6 +33,7 @@ pub use self_subscriber::SelfSubscriber;
33 33
34// Re-export health tracking types 34// Re-export health tracking types
35pub use health::RelayHealthTracker; 35pub use health::RelayHealthTracker;
36use tokio::time::sleep;
36 37
37use std::collections::{HashMap, HashSet}; 38use std::collections::{HashMap, HashSet};
38use std::sync::Arc; 39use std::sync::Arc;
@@ -303,42 +304,59 @@ async fn run_daily_timer(
303 } 304 }
304} 305}
305 306
306// ============================================================================= 307// Combined Health and Metrics Checker
307// Disconnect Checker
308// =============================================================================
309 308
310/// Run the disconnect checker for periodic cleanup of empty relays 309/// Run the combined health and metrics checker
311/// 310///
312/// This function runs in a loop, checking at the configured interval for relays 311/// This function runs in a loop with a 2-second interval, performing three tasks:
313/// that have no repos or root events to sync. Non-bootstrap relays 312/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones
314/// that are empty will be disconnected to free up resources. 313/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired
314/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker
315/// 315///
316/// Bootstrap relays are never disconnected, even if empty. 316/// The metrics update ensures that health states are kept current in metrics even when
317/// they change due to timeouts, cooldowns expiring, or stability periods completing.
317/// 318///
318/// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` 319/// The 2-second interval provides a good balance between responsiveness and overhead.
319/// (default: 60 seconds). Set to a lower value for faster reconnection testing. 320/// While disconnect checking traditionally ran at 60s intervals, the faster cadence here
320async fn run_disconnect_checker( 321/// is acceptable since the operations are lightweight (just index checks, no I/O).
322async fn run_health_and_metrics_checker(
321 sync_manager: Arc<Mutex<SyncManager>>, 323 sync_manager: Arc<Mutex<SyncManager>>,
322 mut shutdown_rx: broadcast::Receiver<()>, 324 mut shutdown_rx: broadcast::Receiver<()>,
323 check_interval_secs: u64,
324) { 325) {
325 let interval = Duration::from_secs(check_interval_secs); 326 let interval = Duration::from_secs(2);
326 tracing::info!( 327 tracing::info!("Health and metrics checker started with 2s interval");
327 interval_secs = check_interval_secs,
328 "Disconnect checker started with configured interval"
329 );
330 328
331 loop { 329 loop {
332 tokio::select! { 330 tokio::select! {
333 _ = tokio::time::sleep(interval) => { 331 _ = tokio::time::sleep(interval) => {
334 tracing::debug!("Disconnect checker running"); 332 tracing::debug!("Health and metrics checker running");
335 333
336 let mut manager = sync_manager.lock().await; 334 let mut manager = sync_manager.lock().await;
335
336 // 1. Check for disconnects and retry disconnected relays
337 manager.check_disconnects().await; 337 manager.check_disconnects().await;
338 manager.retry_disconnected_relays().await; 338 manager.retry_disconnected_relays().await;
339
340 // 2. Check for rate limit recovery
341 manager.check_rate_limit_recovery().await;
342
343 // 3. Update metrics with current health states
344 if let Some(ref metrics) = manager.metrics {
345 // Get all tracked relay URLs
346 let relay_urls: Vec<String> = {
347 let index = manager.relay_sync_index.read().await;
348 index.keys().cloned().collect()
349 };
350
351 // Update health state for each relay
352 for relay_url in relay_urls {
353 let state = manager.health_tracker.get_state(&relay_url);
354 metrics.record_health_state(&relay_url, state);
355 }
356 }
339 } 357 }
340 _ = shutdown_rx.recv() => { 358 _ = shutdown_rx.recv() => {
341 tracing::info!("Disconnect checker received shutdown signal"); 359 tracing::info!("Health and metrics checker received shutdown signal");
342 break; 360 break;
343 } 361 }
344 } 362 }
@@ -510,6 +528,45 @@ impl SyncManager {
510 // Drop the lock before async operations 528 // Drop the lock before async operations
511 drop(pending); 529 drop(pending);
512 530
531 // Wait for rate limiting to clear before pagination continues
532 if self.health_tracker.is_rate_limited(relay_url) {
533 tracing::debug!(
534 relay = %relay_url,
535 batch_id = batch_id,
536 "Relay is rate limited, waiting before pagination"
537 );
538
539 // Loop until rate limit clears, sleeping with jitter between checks
540 while self.health_tracker.is_rate_limited(relay_url) {
541 let jitter_secs = 1 + (rand::random::<u64>() % 5); // 1-5 seconds
542 sleep(Duration::from_secs(jitter_secs)).await;
543 }
544
545 tracing::debug!(
546 relay = %relay_url,
547 batch_id = batch_id,
548 "Rate limit cleared, continuing pagination"
549 );
550 let batch_exists = {
551 let pending = self.pending_sync_index.read().await;
552 pending
553 .get(&relay_url_for_pagination)
554 .map(|batches| batches.iter().any(|b| b.batch_id == batch_id))
555 .unwrap_or(false)
556 };
557
558 // If we were rate limited, verify batch still exists after waiting
559 // (batches are wiped during disconnect, so avoid orphaned pagination)
560 if !batch_exists {
561 tracing::debug!(
562 relay = %relay_url_for_pagination,
563 batch_id = batch_id,
564 "Batch no longer exists after rate limit wait, skipping pagination"
565 );
566 return;
567 }
568 }
569
513 // Subscribe to next page and add to outstanding_subs 570 // Subscribe to next page and add to outstanding_subs
514 if let Some(conn) = self.connections.get(&relay_url_for_pagination) { 571 if let Some(conn) = self.connections.get(&relay_url_for_pagination) {
515 match conn.subscribe_filter(next_filter.clone(), true).await { 572 match conn.subscribe_filter(next_filter.clone(), true).await {
@@ -752,29 +809,22 @@ impl SyncManager {
752 self.try_connect_relay(bootstrap_url).await; 809 self.try_connect_relay(bootstrap_url).await;
753 } 810 }
754 811
755 // 7. Capture config values before moving self into Arc 812 // 7. Wrap self in Arc<Mutex> for sharing with timer task
756 let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs;
757
758 // 8. Wrap self in Arc<Mutex> for sharing with timer task
759 let sync_manager = Arc::new(Mutex::new(self)); 813 let sync_manager = Arc::new(Mutex::new(self));
760 814
761 // 9. Spawn daily timer task with shutdown receiver 815 // 8. Spawn daily timer task with shutdown receiver
762 let timer_manager = Arc::clone(&sync_manager); 816 let timer_manager = Arc::clone(&sync_manager);
763 let timer_shutdown = shutdown_tx.subscribe(); 817 let timer_shutdown = shutdown_tx.subscribe();
764 tokio::spawn(async move { 818 tokio::spawn(async move {
765 run_daily_timer(timer_manager, timer_shutdown).await; 819 run_daily_timer(timer_manager, timer_shutdown).await;
766 }); 820 });
767 821
768 // 10. Spawn disconnect checker task with shutdown receiver 822 // 9. Spawn health and metrics checker task with shutdown receiver
823 // This combines disconnect checking, rate limit recovery, and metrics updates
769 let checker_manager = Arc::clone(&sync_manager); 824 let checker_manager = Arc::clone(&sync_manager);
770 let checker_shutdown = shutdown_tx.subscribe(); 825 let checker_shutdown = shutdown_tx.subscribe();
771 tokio::spawn(async move { 826 tokio::spawn(async move {
772 run_disconnect_checker( 827 run_health_and_metrics_checker(checker_manager, checker_shutdown).await;
773 checker_manager,
774 checker_shutdown,
775 disconnect_check_interval_secs,
776 )
777 .await;
778 }); 828 });
779 829
780 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 830 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
@@ -876,7 +926,18 @@ impl SyncManager {
876 } 926 }
877 } 927 }
878 928
879 // Step 2: Check if consolidation is needed BEFORE adding new filters 929 // Step 2: Check if relay is rate-limited before creating new pending items
930 if self.health_tracker.is_rate_limited(&action.relay_url) {
931 tracing::debug!(
932 relay = %action.relay_url,
933 repos = action.items.repos.len(),
934 root_events = action.items.root_events.len(),
935 "Skipping AddFilters for rate-limited relay, will recompute after cooldown"
936 );
937 return;
938 }
939
940 // Step 3: Check if consolidation is needed BEFORE adding new filters
880 self.maybe_consolidate(&action.relay_url, action.filters.len()) 941 self.maybe_consolidate(&action.relay_url, action.filters.len())
881 .await; 942 .await;
882 943
@@ -954,6 +1015,7 @@ impl SyncManager {
954 let eose_tx = self.eose_tx.as_ref().unwrap().clone(); 1015 let eose_tx = self.eose_tx.as_ref().unwrap().clone();
955 let metrics_clone = self.metrics.clone(); 1016 let metrics_clone = self.metrics.clone();
956 let pending_sync_index = Arc::clone(&self.pending_sync_index); 1017 let pending_sync_index = Arc::clone(&self.pending_sync_index);
1018 let health_tracker = Arc::clone(&self.health_tracker);
957 1019
958 tokio::spawn(async move { 1020 tokio::spawn(async move {
959 let mut disconnect_sent = false; 1021 let mut disconnect_sent = false;
@@ -1011,6 +1073,38 @@ impl SyncManager {
1011 }) 1073 })
1012 .await; 1074 .await;
1013 } 1075 }
1076 RelayEvent::Notice(notice) => {
1077 // Check for rate limiting indicators
1078 let notice_lower = notice.to_lowercase();
1079 let is_rate_limit = (notice_lower.contains("rate")
1080 && notice_lower.contains("limit"))
1081 || notice_lower.contains("too many")
1082 || notice_lower.contains("slow down")
1083 || notice_lower.contains("throttl");
1084
1085 if is_rate_limit {
1086 tracing::warn!(
1087 relay = %relay_url_clone,
1088 notice = %notice,
1089 "Rate limiting NOTICE detected from relay"
1090 );
1091
1092 // Mark relay as rate limited
1093 health_tracker.record_rate_limit(&relay_url_clone);
1094
1095 // Update metrics with new health state
1096 if let Some(ref metrics) = metrics_clone {
1097 let state = health_tracker.get_state(&relay_url_clone);
1098 metrics.record_health_state(&relay_url_clone, state);
1099 }
1100 } else {
1101 tracing::debug!(
1102 relay = %relay_url_clone,
1103 notice = %notice,
1104 "Relay issued notice"
1105 );
1106 }
1107 }
1014 RelayEvent::Closed(reason) => { 1108 RelayEvent::Closed(reason) => {
1015 // CLOSED message means one subscription was closed, not the whole connection 1109 // CLOSED message means one subscription was closed, not the whole connection
1016 // This is normal behavior (e.g., when historic_sync completes) 1110 // This is normal behavior (e.g., when historic_sync completes)
@@ -1901,6 +1995,63 @@ impl SyncManager {
1901 } 1995 }
1902 } 1996 }
1903 1997
1998 /// Check for rate-limited relays that have exceeded cooldown
1999 ///
2000 /// This method is called periodically by run_rate_limit_checker (every 1 second).
2001 /// For each relay in RateLimited state that has exceeded the 65-second cooldown:
2002 /// 1. Clears the rate limit state (sets to Healthy)
2003 /// 2. Recomputes required actions for that relay
2004 /// 3. Submits those actions
2005 async fn check_rate_limit_recovery(&mut self) {
2006 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
2007
2008 // Exit rate limiting for relays whose cooldown has expired
2009 let relays_to_recover: Vec<String> = self.health_tracker.exit_expired_rate_limits();
2010
2011 if relays_to_recover.is_empty() {
2012 return;
2013 }
2014
2015 // Recompute actions - could optimise by adding relays: Option<&[]> to derive_relay_targets
2016 let repo_index = self.repo_sync_index.read().await;
2017 let targets = derive_relay_targets(&repo_index);
2018 drop(repo_index);
2019
2020 for relay_url in relays_to_recover {
2021 tracing::info!(
2022 relay = %relay_url,
2023 "Rate limit cooldown expired, recovering"
2024 );
2025
2026 // Clear rate limit state
2027 self.health_tracker.clear_rate_limit(&relay_url);
2028
2029 // Only compute actions for this specific relay
2030 if let Some(relay_needs) = targets.get(&relay_url) {
2031 let mut single_relay_targets = std::collections::HashMap::new();
2032 single_relay_targets.insert(relay_url.clone(), relay_needs.clone());
2033
2034 let pending = self.pending_sync_index.read().await;
2035 let confirmed = self.relay_sync_index.read().await;
2036
2037 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
2038 drop(pending);
2039 drop(confirmed);
2040
2041 // Submit each action
2042 for action in actions {
2043 tracing::info!(
2044 relay = %action.relay_url,
2045 repo_count = action.items.repos.len(),
2046 event_count = action.items.root_events.len(),
2047 "Submitting recovered actions after rate limit"
2048 );
2049 self.handle_new_sync_filters(action).await;
2050 }
2051 }
2052 }
2053 }
2054
1904 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex 2055 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex
1905 /// 2056 ///
1906 /// This method applies limit(0) to all filters to receive ONLY new events. 2057 /// This method applies limit(0) to all filters to receive ONLY new events.