diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 12:36:51 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 12:36:51 +0000 |
| commit | 83844a528365e657cd5f4d2cda51d72ced9900da (patch) | |
| tree | 023052e0c7709341aa30ee7d40a62a26e2368cea /src/sync | |
| parent | b0ea9aa56c90fe36604e56707498261d761b9a56 (diff) | |
fix: wire up relay disconnection detection for metrics
- Add periodic health check in RelayConnection::run_event_loop that polls
nostr-sdk's relay.is_connected() every second to detect dead connections
- When event channel closes without explicit Closed/Shutdown, send
DisconnectNotification to SyncManager (fixes case where TCP drops silently)
- Enable test_relay_connected_status test which validates the
ngit_sync_relay_connected metric correctly reflects connection state
The issue was that when a remote relay stops abruptly, nostr-sdk's
notification receiver blocks indefinitely waiting for data. TCP disconnect
detection without keepalive can take minutes. The health check polls
nostr-sdk's internal relay status which detects disconnection promptly.
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 19 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 99 |
2 files changed, 86 insertions, 32 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c62b478..15c89e3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1251,6 +1251,9 @@ impl SyncManager { | |||
| 1251 | let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task | 1251 | let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task |
| 1252 | let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task | 1252 | let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task |
| 1253 | tokio::spawn(async move { | 1253 | tokio::spawn(async move { |
| 1254 | // Track whether we've already sent a disconnect notification | ||
| 1255 | let mut disconnect_sent = false; | ||
| 1256 | |||
| 1254 | while let Some(relay_event) = event_rx.recv().await { | 1257 | while let Some(relay_event) = event_rx.recv().await { |
| 1255 | match relay_event { | 1258 | match relay_event { |
| 1256 | RelayEvent::Event(event) => { | 1259 | RelayEvent::Event(event) => { |
| @@ -1297,6 +1300,7 @@ impl SyncManager { | |||
| 1297 | relay_url: relay_url_clone.clone(), | 1300 | relay_url: relay_url_clone.clone(), |
| 1298 | }) | 1301 | }) |
| 1299 | .await; | 1302 | .await; |
| 1303 | disconnect_sent = true; | ||
| 1300 | break; | 1304 | break; |
| 1301 | } | 1305 | } |
| 1302 | RelayEvent::Shutdown => { | 1306 | RelayEvent::Shutdown => { |
| @@ -1307,10 +1311,25 @@ impl SyncManager { | |||
| 1307 | relay_url: relay_url_clone.clone(), | 1311 | relay_url: relay_url_clone.clone(), |
| 1308 | }) | 1312 | }) |
| 1309 | .await; | 1313 | .await; |
| 1314 | disconnect_sent = true; | ||
| 1310 | break; | 1315 | break; |
| 1311 | } | 1316 | } |
| 1312 | } | 1317 | } |
| 1313 | } | 1318 | } |
| 1319 | |||
| 1320 | // If the event channel closed without a Closed/Shutdown event | ||
| 1321 | // (e.g., connection dropped unexpectedly), still notify SyncManager | ||
| 1322 | if !disconnect_sent { | ||
| 1323 | tracing::info!( | ||
| 1324 | relay = %relay_url_clone, | ||
| 1325 | "Event channel closed, notifying SyncManager of disconnect" | ||
| 1326 | ); | ||
| 1327 | let _ = disconnect_tx | ||
| 1328 | .send(DisconnectNotification { | ||
| 1329 | relay_url: relay_url_clone.clone(), | ||
| 1330 | }) | ||
| 1331 | .await; | ||
| 1332 | } | ||
| 1314 | }); | 1333 | }); |
| 1315 | 1334 | ||
| 1316 | tracing::info!( | 1335 | tracing::info!( |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 9a580d2..63e4247 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -124,48 +124,83 @@ impl RelayConnection { | |||
| 124 | /// # Arguments | 124 | /// # Arguments |
| 125 | /// * `event_sender` - Channel to send relay events through | 125 | /// * `event_sender` - Channel to send relay events through |
| 126 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { | 126 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { |
| 127 | use std::time::Duration; | ||
| 128 | use tokio::time::interval; | ||
| 129 | |||
| 127 | let mut notifications = self.client.notifications(); | 130 | let mut notifications = self.client.notifications(); |
| 128 | let url = self.url.clone(); | 131 | let url = self.url.clone(); |
| 132 | |||
| 133 | // Check connection status every second to detect dead connections | ||
| 134 | let mut check_interval = interval(Duration::from_secs(1)); | ||
| 129 | 135 | ||
| 130 | tracing::debug!(relay = %url, "Starting event loop"); | 136 | tracing::debug!(relay = %url, "Starting event loop"); |
| 131 | 137 | ||
| 132 | while let Ok(notification) = notifications.recv().await { | 138 | loop { |
| 133 | match notification { | 139 | tokio::select! { |
| 134 | RelayPoolNotification::Event { event, .. } => { | 140 | // Check for new notifications |
| 135 | tracing::trace!(relay = %url, event_id = %event.id, "Received event"); | 141 | notification_result = notifications.recv() => { |
| 136 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | 142 | match notification_result { |
| 137 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 143 | Ok(notification) => { |
| 138 | break; | 144 | match notification { |
| 139 | } | 145 | RelayPoolNotification::Event { event, .. } => { |
| 140 | } | 146 | tracing::trace!(relay = %url, event_id = %event.id, "Received event"); |
| 141 | RelayPoolNotification::Message { message, .. } => { | 147 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { |
| 142 | match message { | 148 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); |
| 143 | RelayMessage::EndOfStoredEvents(sub_id) => { | 149 | break; |
| 144 | tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); | 150 | } |
| 145 | // Convert Cow<SubscriptionId> to owned SubscriptionId | 151 | } |
| 146 | let owned_sub_id = sub_id.into_owned(); | 152 | RelayPoolNotification::Message { message, .. } => { |
| 147 | if event_sender | 153 | match message { |
| 148 | .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) | 154 | RelayMessage::EndOfStoredEvents(sub_id) => { |
| 149 | .await | 155 | tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); |
| 150 | .is_err() | 156 | // Convert Cow<SubscriptionId> to owned SubscriptionId |
| 151 | { | 157 | let owned_sub_id = sub_id.into_owned(); |
| 152 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 158 | if event_sender |
| 153 | break; | 159 | .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) |
| 160 | .await | ||
| 161 | .is_err() | ||
| 162 | { | ||
| 163 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | ||
| 164 | break; | ||
| 165 | } | ||
| 166 | } | ||
| 167 | RelayMessage::Closed { message: msg, .. } => { | ||
| 168 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | ||
| 169 | let _ = event_sender | ||
| 170 | .send(RelayEvent::Closed(msg.to_string())) | ||
| 171 | .await; | ||
| 172 | break; | ||
| 173 | } | ||
| 174 | _ => {} | ||
| 175 | } | ||
| 176 | } | ||
| 177 | RelayPoolNotification::Shutdown => { | ||
| 178 | tracing::info!(relay = %url, "Relay pool shutdown"); | ||
| 179 | let _ = event_sender.send(RelayEvent::Shutdown).await; | ||
| 180 | break; | ||
| 181 | } | ||
| 154 | } | 182 | } |
| 155 | } | 183 | } |
| 156 | RelayMessage::Closed { message: msg, .. } => { | 184 | Err(_) => { |
| 157 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | 185 | // Notification channel closed - connection lost |
| 158 | let _ = event_sender | 186 | tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); |
| 159 | .send(RelayEvent::Closed(msg.to_string())) | 187 | break; |
| 160 | .await; | ||
| 161 | } | 188 | } |
| 162 | _ => {} | ||
| 163 | } | 189 | } |
| 164 | } | 190 | } |
| 165 | RelayPoolNotification::Shutdown => { | 191 | // Periodic connection health check |
| 166 | tracing::info!(relay = %url, "Relay pool shutdown"); | 192 | _ = check_interval.tick() => { |
| 167 | let _ = event_sender.send(RelayEvent::Shutdown).await; | 193 | // Check if relay is still connected via nostr-sdk |
| 168 | break; | 194 | if let Ok(relay) = self.client.relay(&self.url).await { |
| 195 | if !relay.is_connected() { | ||
| 196 | tracing::info!(relay = %url, "Relay disconnected (detected by health check)"); | ||
| 197 | break; | ||
| 198 | } | ||
| 199 | } else { | ||
| 200 | // Relay not found in client - must be disconnected | ||
| 201 | tracing::info!(relay = %url, "Relay not found (detected by health check)"); | ||
| 202 | break; | ||
| 203 | } | ||
| 169 | } | 204 | } |
| 170 | } | 205 | } |
| 171 | } | 206 | } |