From c82684092c7b4f81e49833b0888500fcb9851218 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 11:19:38 +0000 Subject: fix(sync): improve metrics recording and connection failure detection Changes: - Fix connection attempt metrics: record success/failure based on actual connection result instead of pre-emptively recording failure - Add health tracker integration on connection failure: call record_failure() and record_health_state() in error path - Add connection verification in relay_connection.rs: wait 500ms after connect() then verify is_connected() to detect silent failures - Add configurable disconnect check interval via NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS env var - Update TestRelay with fast test settings: startup_delay=0, jitter=0, disconnect_check_interval=1s - Add debug output to metrics tests for investigation Note: Tests may still fail due to 5-second base backoff in health tracker. A follow-up task will add NGIT_SYNC_BASE_BACKOFF_SECS config parameter to allow faster test cycles. Related: metrics-wiring-plan.md Tasks 1 & 2 --- src/sync/mod.rs | 76 ++++++++++++++++++++++++++++---------------- src/sync/relay_connection.rs | 18 ++++++++++- 2 files changed, 66 insertions(+), 28 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 5039c04..16ad833 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -264,23 +264,30 @@ async fn run_daily_timer( // Disconnect Checker (Phase 8) // ============================================================================= -/// Check interval for empty relay cleanup in seconds -const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60; - /// Run the disconnect checker for periodic cleanup of empty relays /// -/// This function runs in a loop, checking every 60 seconds for relays +/// This function runs in a loop, checking at the configured interval for relays /// that have no repos or root events to sync. Non-bootstrap relays /// that are empty will be disconnected to free up resources. /// /// Bootstrap relays are never disconnected, even if empty. +/// +/// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` +/// (default: 60 seconds). Set to a lower value for faster reconnection testing. async fn run_disconnect_checker( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, + check_interval_secs: u64, ) { + let interval = Duration::from_secs(check_interval_secs); + tracing::info!( + interval_secs = check_interval_secs, + "Disconnect checker started with configured interval" + ); + loop { tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => { + _ = tokio::time::sleep(interval) => { tracing::debug!("Disconnect checker running"); let mut manager = sync_manager.lock().await; @@ -609,21 +616,24 @@ impl SyncManager { self.spawn_relay_connection(bootstrap_url.clone()).await; } - // 7. Wrap self in Arc for sharing with timer task + // 7. Capture config values before moving self into Arc + let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs; + + // 8. Wrap self in Arc for sharing with timer task let sync_manager = Arc::new(Mutex::new(self)); - // 8. Spawn daily timer task with shutdown receiver + // 9. Spawn daily timer task with shutdown receiver let timer_manager = Arc::clone(&sync_manager); let timer_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { run_daily_timer(timer_manager, timer_shutdown).await; }); - // 9. Spawn disconnect checker task with shutdown receiver + // 10. Spawn disconnect checker task with shutdown receiver let checker_manager = Arc::clone(&sync_manager); let checker_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_disconnect_checker(checker_manager, checker_shutdown).await; + run_disconnect_checker(checker_manager, checker_shutdown, disconnect_check_interval_secs).await; }); // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications @@ -1174,27 +1184,39 @@ impl SyncManager { // Create relay connection let connection = RelayConnection::new(relay_url.clone()); - // Record connection attempt - if let Some(ref metrics) = self.metrics { - metrics.record_connection_attempt(&relay_url, false); - } - // Connect and subscribe to Layer 1 - if let Err(e) = connection.connect_and_subscribe(None).await { - tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); - // Update state to disconnected on failure - { - let mut index = relay_sync_index.write().await; - if let Some(state) = index.get_mut(&relay_url) { - state.connection_status = ConnectionStatus::Disconnected; + match connection.connect_and_subscribe(None).await { + Ok(_) => { + // Record successful connection attempt + if let Some(ref metrics) = self.metrics { + metrics.record_connection_attempt(&relay_url, true); } } - return; - } - - // If successful, update connection attempt metric to success - if let Some(ref metrics) = self.metrics { - metrics.record_connection_attempt(&relay_url, true); + Err(e) => { + tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); + + // Record failed connection attempt + if let Some(ref metrics) = self.metrics { + metrics.record_connection_attempt(&relay_url, false); + } + + // Record failure in health tracker + self.health_tracker.record_failure(&relay_url); + + // Record health state in metrics + if let Some(ref metrics) = self.metrics { + metrics.record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); + } + + // Update state to disconnected on failure + { + let mut index = relay_sync_index.write().await; + if let Some(state) = index.get_mut(&relay_url) { + state.connection_status = ConnectionStatus::Disconnected; + } + } + return; + } } // Mark as connected in relay sync index diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 09c9887..d69e112 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -55,7 +55,8 @@ impl RelayConnection { /// This method: /// 1. Adds the relay to the client /// 2. Establishes the WebSocket connection - /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618) + /// 3. Verifies connection was established + /// 4. Subscribes to Layer 1 filter (kinds 30617 + 30618) /// /// # Arguments /// * `since` - Optional timestamp for incremental sync on reconnect @@ -76,6 +77,21 @@ impl RelayConnection { // Establish connection self.client.connect().await; + // Wait briefly for connection to establish and check status + // nostr-sdk's connect() is async and may not immediately reflect failure + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // Check if relay is actually connected + let relay = self.client.relay(&self.url).await + .map_err(|e| format!("Failed to get relay handle for {}: {}", self.url, e))?; + + if !relay.is_connected() { + return Err(format!( + "Failed to connect to relay {}: connection not established after timeout", + self.url + )); + } + // Subscribe to Layer 1 (announcements) let filter = build_announcement_filter(since); let output = self -- cgit v1.2.3