From 83844a528365e657cd5f4d2cda51d72ced9900da Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 12:36:51 +0000 Subject: fix: wire up relay disconnection detection for metrics - Add periodic health check in RelayConnection::run_event_loop that polls nostr-sdk's relay.is_connected() every second to detect dead connections - When event channel closes without explicit Closed/Shutdown, send DisconnectNotification to SyncManager (fixes case where TCP drops silently) - Enable test_relay_connected_status test which validates the ngit_sync_relay_connected metric correctly reflects connection state The issue was that when a remote relay stops abruptly, nostr-sdk's notification receiver blocks indefinitely waiting for data. TCP disconnect detection without keepalive can take minutes. The health check polls nostr-sdk's internal relay status which detects disconnection promptly. --- src/sync/mod.rs | 19 +++++++++ src/sync/relay_connection.rs | 99 ++++++++++++++++++++++++++++++-------------- 2 files changed, 86 insertions(+), 32 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c62b478..15c89e3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1251,6 +1251,9 @@ impl SyncManager { let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task tokio::spawn(async move { + // Track whether we've already sent a disconnect notification + let mut disconnect_sent = false; + while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event) => { @@ -1297,6 +1300,7 @@ impl SyncManager { relay_url: relay_url_clone.clone(), }) .await; + disconnect_sent = true; break; } RelayEvent::Shutdown => { @@ -1307,10 +1311,25 @@ impl SyncManager { relay_url: relay_url_clone.clone(), }) .await; + disconnect_sent = true; break; } } } + + // If the event channel closed without a Closed/Shutdown event + // (e.g., connection dropped unexpectedly), still notify SyncManager + if !disconnect_sent { + tracing::info!( + relay = %relay_url_clone, + "Event channel closed, notifying SyncManager of disconnect" + ); + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; + } }); tracing::info!( diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 9a580d2..63e4247 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -124,48 +124,83 @@ impl RelayConnection { /// # Arguments /// * `event_sender` - Channel to send relay events through pub async fn run_event_loop(self, event_sender: mpsc::Sender) { + use std::time::Duration; + use tokio::time::interval; + let mut notifications = self.client.notifications(); let url = self.url.clone(); + + // Check connection status every second to detect dead connections + let mut check_interval = interval(Duration::from_secs(1)); tracing::debug!(relay = %url, "Starting event loop"); - while let Ok(notification) = notifications.recv().await { - match notification { - RelayPoolNotification::Event { event, .. } => { - tracing::trace!(relay = %url, event_id = %event.id, "Received event"); - if event_sender.send(RelayEvent::Event(*event)).await.is_err() { - tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); - break; - } - } - RelayPoolNotification::Message { message, .. } => { - match message { - RelayMessage::EndOfStoredEvents(sub_id) => { - tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); - // Convert Cow to owned SubscriptionId - let owned_sub_id = sub_id.into_owned(); - if event_sender - .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) - .await - .is_err() - { - tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); - break; + loop { + tokio::select! { + // Check for new notifications + notification_result = notifications.recv() => { + match notification_result { + Ok(notification) => { + match notification { + RelayPoolNotification::Event { event, .. } => { + tracing::trace!(relay = %url, event_id = %event.id, "Received event"); + if event_sender.send(RelayEvent::Event(*event)).await.is_err() { + tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); + break; + } + } + RelayPoolNotification::Message { message, .. } => { + match message { + RelayMessage::EndOfStoredEvents(sub_id) => { + tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); + // Convert Cow to owned SubscriptionId + let owned_sub_id = sub_id.into_owned(); + if event_sender + .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) + .await + .is_err() + { + tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); + break; + } + } + RelayMessage::Closed { message: msg, .. } => { + tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); + let _ = event_sender + .send(RelayEvent::Closed(msg.to_string())) + .await; + break; + } + _ => {} + } + } + RelayPoolNotification::Shutdown => { + tracing::info!(relay = %url, "Relay pool shutdown"); + let _ = event_sender.send(RelayEvent::Shutdown).await; + break; + } } } - RelayMessage::Closed { message: msg, .. } => { - tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); - let _ = event_sender - .send(RelayEvent::Closed(msg.to_string())) - .await; + Err(_) => { + // Notification channel closed - connection lost + tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); + break; } - _ => {} } } - RelayPoolNotification::Shutdown => { - tracing::info!(relay = %url, "Relay pool shutdown"); - let _ = event_sender.send(RelayEvent::Shutdown).await; - break; + // Periodic connection health check + _ = check_interval.tick() => { + // Check if relay is still connected via nostr-sdk + if let Ok(relay) = self.client.relay(&self.url).await { + if !relay.is_connected() { + tracing::info!(relay = %url, "Relay disconnected (detected by health check)"); + break; + } + } else { + // Relay not found in client - must be disconnected + tracing::info!(relay = %url, "Relay not found (detected by health check)"); + break; + } } } } -- cgit v1.2.3