From 83844a528365e657cd5f4d2cda51d72ced9900da Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 12:36:51 +0000 Subject: fix: wire up relay disconnection detection for metrics - Add periodic health check in RelayConnection::run_event_loop that polls nostr-sdk's relay.is_connected() every second to detect dead connections - When event channel closes without explicit Closed/Shutdown, send DisconnectNotification to SyncManager (fixes case where TCP drops silently) - Enable test_relay_connected_status test which validates the ngit_sync_relay_connected metric correctly reflects connection state The issue was that when a remote relay stops abruptly, nostr-sdk's notification receiver blocks indefinitely waiting for data. TCP disconnect detection without keepalive can take minutes. The health check polls nostr-sdk's internal relay status which detects disconnection promptly. --- src/sync/mod.rs | 19 +++++++++ src/sync/relay_connection.rs | 99 ++++++++++++++++++++++++++++++-------------- tests/sync/metrics.rs | 47 ++++++++++++++------- 3 files changed, 119 insertions(+), 46 deletions(-) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c62b478..15c89e3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1251,6 +1251,9 @@ impl SyncManager { let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task tokio::spawn(async move { + // Track whether we've already sent a disconnect notification + let mut disconnect_sent = false; + while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event) => { @@ -1297,6 +1300,7 @@ impl SyncManager { relay_url: relay_url_clone.clone(), }) .await; + disconnect_sent = true; break; } RelayEvent::Shutdown => { @@ -1307,10 +1311,25 @@ impl SyncManager { relay_url: relay_url_clone.clone(), }) .await; + disconnect_sent = true; break; } } } + + // If the event channel closed without a Closed/Shutdown event + // (e.g., connection dropped unexpectedly), still notify SyncManager + if !disconnect_sent { + tracing::info!( + relay = %relay_url_clone, + "Event channel closed, notifying SyncManager of disconnect" + ); + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; + } }); tracing::info!( diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 9a580d2..63e4247 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -124,48 +124,83 @@ impl RelayConnection { /// # Arguments /// * `event_sender` - Channel to send relay events through pub async fn run_event_loop(self, event_sender: mpsc::Sender) { + use std::time::Duration; + use tokio::time::interval; + let mut notifications = self.client.notifications(); let url = self.url.clone(); + + // Check connection status every second to detect dead connections + let mut check_interval = interval(Duration::from_secs(1)); tracing::debug!(relay = %url, "Starting event loop"); - while let Ok(notification) = notifications.recv().await { - match notification { - RelayPoolNotification::Event { event, .. } => { - tracing::trace!(relay = %url, event_id = %event.id, "Received event"); - if event_sender.send(RelayEvent::Event(*event)).await.is_err() { - tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); - break; - } - } - RelayPoolNotification::Message { message, .. } => { - match message { - RelayMessage::EndOfStoredEvents(sub_id) => { - tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); - // Convert Cow to owned SubscriptionId - let owned_sub_id = sub_id.into_owned(); - if event_sender - .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) - .await - .is_err() - { - tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); - break; + loop { + tokio::select! { + // Check for new notifications + notification_result = notifications.recv() => { + match notification_result { + Ok(notification) => { + match notification { + RelayPoolNotification::Event { event, .. } => { + tracing::trace!(relay = %url, event_id = %event.id, "Received event"); + if event_sender.send(RelayEvent::Event(*event)).await.is_err() { + tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); + break; + } + } + RelayPoolNotification::Message { message, .. } => { + match message { + RelayMessage::EndOfStoredEvents(sub_id) => { + tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); + // Convert Cow to owned SubscriptionId + let owned_sub_id = sub_id.into_owned(); + if event_sender + .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) + .await + .is_err() + { + tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); + break; + } + } + RelayMessage::Closed { message: msg, .. } => { + tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); + let _ = event_sender + .send(RelayEvent::Closed(msg.to_string())) + .await; + break; + } + _ => {} + } + } + RelayPoolNotification::Shutdown => { + tracing::info!(relay = %url, "Relay pool shutdown"); + let _ = event_sender.send(RelayEvent::Shutdown).await; + break; + } } } - RelayMessage::Closed { message: msg, .. } => { - tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); - let _ = event_sender - .send(RelayEvent::Closed(msg.to_string())) - .await; + Err(_) => { + // Notification channel closed - connection lost + tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); + break; } - _ => {} } } - RelayPoolNotification::Shutdown => { - tracing::info!(relay = %url, "Relay pool shutdown"); - let _ = event_sender.send(RelayEvent::Shutdown).await; - break; + // Periodic connection health check + _ = check_interval.tick() => { + // Check if relay is still connected via nostr-sdk + if let Ok(relay) = self.client.relay(&self.url).await { + if !relay.is_connected() { + tracing::info!(relay = %url, "Relay disconnected (detected by health check)"); + break; + } + } else { + // Relay not found in client - must be disconnected + tracing::info!(relay = %url, "Relay not found (detected by health check)"); + break; + } } } } diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs index 775159b..3accd0f 100644 --- a/tests/sync/metrics.rs +++ b/tests/sync/metrics.rs @@ -247,7 +247,12 @@ async fn test_startup_sync_event_count() { // 6. Create 3 patch events (Layer 2) that reference the announcement let patches: Vec<_> = (0..3) .map(|i| { - create_event_referencing_repo(&keys, &repo_coord, KIND_PATCH, &format!("Test patch {}", i)) + create_event_referencing_repo( + &keys, + &repo_coord, + KIND_PATCH, + &format!("Test patch {}", i), + ) }) .collect(); println!("Created {} patches", patches.len()); @@ -320,8 +325,12 @@ async fn test_startup_sync_event_count() { .kind(Kind::Custom(KIND_PATCH)) .author(keys.public_key()); - let patches_synced = - crate::common::sync_helpers::wait_for_event_on_relay(syncing_relay.url(), filter, Duration::from_secs(2)).await; + let patches_synced = crate::common::sync_helpers::wait_for_event_on_relay( + syncing_relay.url(), + filter, + Duration::from_secs(2), + ) + .await; println!("Patches synced to syncing relay: {}", patches_synced); // Cleanup @@ -374,12 +383,15 @@ async fn test_connection_failure_increments_counter() { // Wait for initial connection attempt to the unreachable bootstrap relay tokio::time::sleep(Duration::from_secs(2)).await; - + let metrics = harness.get_metrics().await.unwrap(); // Failure counter should be recorded when connecting to unreachable relay let failures = metrics - .counter("ngit_sync_connection_attempts_total", &[("result", "failure")]) + .counter( + "ngit_sync_connection_attempts_total", + &[("result", "failure")], + ) .unwrap_or(0); println!("Connection failures recorded: {}", failures); @@ -413,7 +425,9 @@ async fn test_live_sync_event_count() { // Now add events - these should be "live" not "startup" let keys = Keys::generate(); let events: Vec<_> = (0..2) - .map(|i| create_repo_announcement(&keys, &[&harness.source_domain(0)], &format!("live-{}", i))) + .map(|i| { + create_repo_announcement(&keys, &[&harness.source_domain(0)], &format!("live-{}", i)) + }) .collect(); harness.submit_events(0, &events).await.unwrap(); @@ -434,11 +448,7 @@ async fn test_live_sync_event_count() { /// /// This test validates that the ngit_sync_relay_connected gauge /// correctly reflects the connection state of source relays. -/// -/// NOTE: This test may fail until sync metrics recording is fully wired up. -/// The test documents the expected behavior. #[tokio::test] -#[ignore] // Enable when relay connected status metrics are wired up async fn test_relay_connected_status() { let mut harness = MetricsTestHarness::with_sources(1).await; harness.start_syncing_relay(0).await; @@ -505,7 +515,10 @@ async fn test_health_state_degrades_on_failure() { // Get the relay status (1=healthy, 2=degraded, 3=dead) let status = later.gauge("ngit_sync_relay_status", &[]).unwrap_or(0); - println!("Initial metrics: {:?}", initial.gauge("ngit_sync_relay_status", &[])); + println!( + "Initial metrics: {:?}", + initial.gauge("ngit_sync_relay_status", &[]) + ); println!("Later status: {}", status); assert!( @@ -561,8 +574,14 @@ async fn test_multi_source_aggregate_counts() { let metrics = harness.get_metrics().await.unwrap(); - println!("After stop - Tracked total: {:?}", metrics.relays_tracked_total()); - println!("After stop - Connected total: {:?}", metrics.relays_connected_total()); + println!( + "After stop - Tracked total: {:?}", + metrics.relays_tracked_total() + ); + println!( + "After stop - Connected total: {:?}", + metrics.relays_connected_total() + ); assert_eq!( metrics.relays_tracked_total(), @@ -576,4 +595,4 @@ async fn test_multi_source_aggregate_counts() { ); harness.stop_all().await; -} \ No newline at end of file +} -- cgit v1.2.3