diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 12:36:51 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 12:36:51 +0000 |
| commit | 83844a528365e657cd5f4d2cda51d72ced9900da (patch) | |
| tree | 023052e0c7709341aa30ee7d40a62a26e2368cea | |
| parent | b0ea9aa56c90fe36604e56707498261d761b9a56 (diff) | |
fix: wire up relay disconnection detection for metrics
- Add periodic health check in RelayConnection::run_event_loop that polls
nostr-sdk's relay.is_connected() every second to detect dead connections
- When event channel closes without explicit Closed/Shutdown, send
DisconnectNotification to SyncManager (fixes case where TCP drops silently)
- Enable test_relay_connected_status test which validates the
ngit_sync_relay_connected metric correctly reflects connection state
The issue was that when a remote relay stops abruptly, nostr-sdk's
notification receiver blocks indefinitely waiting for data. TCP disconnect
detection without keepalive can take minutes. The health check polls
nostr-sdk's internal relay status which detects disconnection promptly.
| -rw-r--r-- | src/sync/mod.rs | 19 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 99 | ||||
| -rw-r--r-- | tests/sync/metrics.rs | 47 |
3 files changed, 119 insertions, 46 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c62b478..15c89e3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1251,6 +1251,9 @@ impl SyncManager { | |||
| 1251 | let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task | 1251 | let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task |
| 1252 | let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task | 1252 | let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task |
| 1253 | tokio::spawn(async move { | 1253 | tokio::spawn(async move { |
| 1254 | // Track whether we've already sent a disconnect notification | ||
| 1255 | let mut disconnect_sent = false; | ||
| 1256 | |||
| 1254 | while let Some(relay_event) = event_rx.recv().await { | 1257 | while let Some(relay_event) = event_rx.recv().await { |
| 1255 | match relay_event { | 1258 | match relay_event { |
| 1256 | RelayEvent::Event(event) => { | 1259 | RelayEvent::Event(event) => { |
| @@ -1297,6 +1300,7 @@ impl SyncManager { | |||
| 1297 | relay_url: relay_url_clone.clone(), | 1300 | relay_url: relay_url_clone.clone(), |
| 1298 | }) | 1301 | }) |
| 1299 | .await; | 1302 | .await; |
| 1303 | disconnect_sent = true; | ||
| 1300 | break; | 1304 | break; |
| 1301 | } | 1305 | } |
| 1302 | RelayEvent::Shutdown => { | 1306 | RelayEvent::Shutdown => { |
| @@ -1307,10 +1311,25 @@ impl SyncManager { | |||
| 1307 | relay_url: relay_url_clone.clone(), | 1311 | relay_url: relay_url_clone.clone(), |
| 1308 | }) | 1312 | }) |
| 1309 | .await; | 1313 | .await; |
| 1314 | disconnect_sent = true; | ||
| 1310 | break; | 1315 | break; |
| 1311 | } | 1316 | } |
| 1312 | } | 1317 | } |
| 1313 | } | 1318 | } |
| 1319 | |||
| 1320 | // If the event channel closed without a Closed/Shutdown event | ||
| 1321 | // (e.g., connection dropped unexpectedly), still notify SyncManager | ||
| 1322 | if !disconnect_sent { | ||
| 1323 | tracing::info!( | ||
| 1324 | relay = %relay_url_clone, | ||
| 1325 | "Event channel closed, notifying SyncManager of disconnect" | ||
| 1326 | ); | ||
| 1327 | let _ = disconnect_tx | ||
| 1328 | .send(DisconnectNotification { | ||
| 1329 | relay_url: relay_url_clone.clone(), | ||
| 1330 | }) | ||
| 1331 | .await; | ||
| 1332 | } | ||
| 1314 | }); | 1333 | }); |
| 1315 | 1334 | ||
| 1316 | tracing::info!( | 1335 | tracing::info!( |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 9a580d2..63e4247 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -124,48 +124,83 @@ impl RelayConnection { | |||
| 124 | /// # Arguments | 124 | /// # Arguments |
| 125 | /// * `event_sender` - Channel to send relay events through | 125 | /// * `event_sender` - Channel to send relay events through |
| 126 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { | 126 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { |
| 127 | use std::time::Duration; | ||
| 128 | use tokio::time::interval; | ||
| 129 | |||
| 127 | let mut notifications = self.client.notifications(); | 130 | let mut notifications = self.client.notifications(); |
| 128 | let url = self.url.clone(); | 131 | let url = self.url.clone(); |
| 132 | |||
| 133 | // Check connection status every second to detect dead connections | ||
| 134 | let mut check_interval = interval(Duration::from_secs(1)); | ||
| 129 | 135 | ||
| 130 | tracing::debug!(relay = %url, "Starting event loop"); | 136 | tracing::debug!(relay = %url, "Starting event loop"); |
| 131 | 137 | ||
| 132 | while let Ok(notification) = notifications.recv().await { | 138 | loop { |
| 133 | match notification { | 139 | tokio::select! { |
| 134 | RelayPoolNotification::Event { event, .. } => { | 140 | // Check for new notifications |
| 135 | tracing::trace!(relay = %url, event_id = %event.id, "Received event"); | 141 | notification_result = notifications.recv() => { |
| 136 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | 142 | match notification_result { |
| 137 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 143 | Ok(notification) => { |
| 138 | break; | 144 | match notification { |
| 139 | } | 145 | RelayPoolNotification::Event { event, .. } => { |
| 140 | } | 146 | tracing::trace!(relay = %url, event_id = %event.id, "Received event"); |
| 141 | RelayPoolNotification::Message { message, .. } => { | 147 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { |
| 142 | match message { | 148 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); |
| 143 | RelayMessage::EndOfStoredEvents(sub_id) => { | 149 | break; |
| 144 | tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); | 150 | } |
| 145 | // Convert Cow<SubscriptionId> to owned SubscriptionId | 151 | } |
| 146 | let owned_sub_id = sub_id.into_owned(); | 152 | RelayPoolNotification::Message { message, .. } => { |
| 147 | if event_sender | 153 | match message { |
| 148 | .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) | 154 | RelayMessage::EndOfStoredEvents(sub_id) => { |
| 149 | .await | 155 | tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); |
| 150 | .is_err() | 156 | // Convert Cow<SubscriptionId> to owned SubscriptionId |
| 151 | { | 157 | let owned_sub_id = sub_id.into_owned(); |
| 152 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 158 | if event_sender |
| 153 | break; | 159 | .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) |
| 160 | .await | ||
| 161 | .is_err() | ||
| 162 | { | ||
| 163 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | ||
| 164 | break; | ||
| 165 | } | ||
| 166 | } | ||
| 167 | RelayMessage::Closed { message: msg, .. } => { | ||
| 168 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | ||
| 169 | let _ = event_sender | ||
| 170 | .send(RelayEvent::Closed(msg.to_string())) | ||
| 171 | .await; | ||
| 172 | break; | ||
| 173 | } | ||
| 174 | _ => {} | ||
| 175 | } | ||
| 176 | } | ||
| 177 | RelayPoolNotification::Shutdown => { | ||
| 178 | tracing::info!(relay = %url, "Relay pool shutdown"); | ||
| 179 | let _ = event_sender.send(RelayEvent::Shutdown).await; | ||
| 180 | break; | ||
| 181 | } | ||
| 154 | } | 182 | } |
| 155 | } | 183 | } |
| 156 | RelayMessage::Closed { message: msg, .. } => { | 184 | Err(_) => { |
| 157 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | 185 | // Notification channel closed - connection lost |
| 158 | let _ = event_sender | 186 | tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); |
| 159 | .send(RelayEvent::Closed(msg.to_string())) | 187 | break; |
| 160 | .await; | ||
| 161 | } | 188 | } |
| 162 | _ => {} | ||
| 163 | } | 189 | } |
| 164 | } | 190 | } |
| 165 | RelayPoolNotification::Shutdown => { | 191 | // Periodic connection health check |
| 166 | tracing::info!(relay = %url, "Relay pool shutdown"); | 192 | _ = check_interval.tick() => { |
| 167 | let _ = event_sender.send(RelayEvent::Shutdown).await; | 193 | // Check if relay is still connected via nostr-sdk |
| 168 | break; | 194 | if let Ok(relay) = self.client.relay(&self.url).await { |
| 195 | if !relay.is_connected() { | ||
| 196 | tracing::info!(relay = %url, "Relay disconnected (detected by health check)"); | ||
| 197 | break; | ||
| 198 | } | ||
| 199 | } else { | ||
| 200 | // Relay not found in client - must be disconnected | ||
| 201 | tracing::info!(relay = %url, "Relay not found (detected by health check)"); | ||
| 202 | break; | ||
| 203 | } | ||
| 169 | } | 204 | } |
| 170 | } | 205 | } |
| 171 | } | 206 | } |
diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs index 775159b..3accd0f 100644 --- a/tests/sync/metrics.rs +++ b/tests/sync/metrics.rs | |||
| @@ -247,7 +247,12 @@ async fn test_startup_sync_event_count() { | |||
| 247 | // 6. Create 3 patch events (Layer 2) that reference the announcement | 247 | // 6. Create 3 patch events (Layer 2) that reference the announcement |
| 248 | let patches: Vec<_> = (0..3) | 248 | let patches: Vec<_> = (0..3) |
| 249 | .map(|i| { | 249 | .map(|i| { |
| 250 | create_event_referencing_repo(&keys, &repo_coord, KIND_PATCH, &format!("Test patch {}", i)) | 250 | create_event_referencing_repo( |
| 251 | &keys, | ||
| 252 | &repo_coord, | ||
| 253 | KIND_PATCH, | ||
| 254 | &format!("Test patch {}", i), | ||
| 255 | ) | ||
| 251 | }) | 256 | }) |
| 252 | .collect(); | 257 | .collect(); |
| 253 | println!("Created {} patches", patches.len()); | 258 | println!("Created {} patches", patches.len()); |
| @@ -320,8 +325,12 @@ async fn test_startup_sync_event_count() { | |||
| 320 | .kind(Kind::Custom(KIND_PATCH)) | 325 | .kind(Kind::Custom(KIND_PATCH)) |
| 321 | .author(keys.public_key()); | 326 | .author(keys.public_key()); |
| 322 | 327 | ||
| 323 | let patches_synced = | 328 | let patches_synced = crate::common::sync_helpers::wait_for_event_on_relay( |
| 324 | crate::common::sync_helpers::wait_for_event_on_relay(syncing_relay.url(), filter, Duration::from_secs(2)).await; | 329 | syncing_relay.url(), |
| 330 | filter, | ||
| 331 | Duration::from_secs(2), | ||
| 332 | ) | ||
| 333 | .await; | ||
| 325 | println!("Patches synced to syncing relay: {}", patches_synced); | 334 | println!("Patches synced to syncing relay: {}", patches_synced); |
| 326 | 335 | ||
| 327 | // Cleanup | 336 | // Cleanup |
| @@ -374,12 +383,15 @@ async fn test_connection_failure_increments_counter() { | |||
| 374 | 383 | ||
| 375 | // Wait for initial connection attempt to the unreachable bootstrap relay | 384 | // Wait for initial connection attempt to the unreachable bootstrap relay |
| 376 | tokio::time::sleep(Duration::from_secs(2)).await; | 385 | tokio::time::sleep(Duration::from_secs(2)).await; |
| 377 | 386 | ||
| 378 | let metrics = harness.get_metrics().await.unwrap(); | 387 | let metrics = harness.get_metrics().await.unwrap(); |
| 379 | 388 | ||
| 380 | // Failure counter should be recorded when connecting to unreachable relay | 389 | // Failure counter should be recorded when connecting to unreachable relay |
| 381 | let failures = metrics | 390 | let failures = metrics |
| 382 | .counter("ngit_sync_connection_attempts_total", &[("result", "failure")]) | 391 | .counter( |
| 392 | "ngit_sync_connection_attempts_total", | ||
| 393 | &[("result", "failure")], | ||
| 394 | ) | ||
| 383 | .unwrap_or(0); | 395 | .unwrap_or(0); |
| 384 | 396 | ||
| 385 | println!("Connection failures recorded: {}", failures); | 397 | println!("Connection failures recorded: {}", failures); |
| @@ -413,7 +425,9 @@ async fn test_live_sync_event_count() { | |||
| 413 | // Now add events - these should be "live" not "startup" | 425 | // Now add events - these should be "live" not "startup" |
| 414 | let keys = Keys::generate(); | 426 | let keys = Keys::generate(); |
| 415 | let events: Vec<_> = (0..2) | 427 | let events: Vec<_> = (0..2) |
| 416 | .map(|i| create_repo_announcement(&keys, &[&harness.source_domain(0)], &format!("live-{}", i))) | 428 | .map(|i| { |
| 429 | create_repo_announcement(&keys, &[&harness.source_domain(0)], &format!("live-{}", i)) | ||
| 430 | }) | ||
| 417 | .collect(); | 431 | .collect(); |
| 418 | harness.submit_events(0, &events).await.unwrap(); | 432 | harness.submit_events(0, &events).await.unwrap(); |
| 419 | 433 | ||
| @@ -434,11 +448,7 @@ async fn test_live_sync_event_count() { | |||
| 434 | /// | 448 | /// |
| 435 | /// This test validates that the ngit_sync_relay_connected gauge | 449 | /// This test validates that the ngit_sync_relay_connected gauge |
| 436 | /// correctly reflects the connection state of source relays. | 450 | /// correctly reflects the connection state of source relays. |
| 437 | /// | ||
| 438 | /// NOTE: This test may fail until sync metrics recording is fully wired up. | ||
| 439 | /// The test documents the expected behavior. | ||
| 440 | #[tokio::test] | 451 | #[tokio::test] |
| 441 | #[ignore] // Enable when relay connected status metrics are wired up | ||
| 442 | async fn test_relay_connected_status() { | 452 | async fn test_relay_connected_status() { |
| 443 | let mut harness = MetricsTestHarness::with_sources(1).await; | 453 | let mut harness = MetricsTestHarness::with_sources(1).await; |
| 444 | harness.start_syncing_relay(0).await; | 454 | harness.start_syncing_relay(0).await; |
| @@ -505,7 +515,10 @@ async fn test_health_state_degrades_on_failure() { | |||
| 505 | // Get the relay status (1=healthy, 2=degraded, 3=dead) | 515 | // Get the relay status (1=healthy, 2=degraded, 3=dead) |
| 506 | let status = later.gauge("ngit_sync_relay_status", &[]).unwrap_or(0); | 516 | let status = later.gauge("ngit_sync_relay_status", &[]).unwrap_or(0); |
| 507 | 517 | ||
| 508 | println!("Initial metrics: {:?}", initial.gauge("ngit_sync_relay_status", &[])); | 518 | println!( |
| 519 | "Initial metrics: {:?}", | ||
| 520 | initial.gauge("ngit_sync_relay_status", &[]) | ||
| 521 | ); | ||
| 509 | println!("Later status: {}", status); | 522 | println!("Later status: {}", status); |
| 510 | 523 | ||
| 511 | assert!( | 524 | assert!( |
| @@ -561,8 +574,14 @@ async fn test_multi_source_aggregate_counts() { | |||
| 561 | 574 | ||
| 562 | let metrics = harness.get_metrics().await.unwrap(); | 575 | let metrics = harness.get_metrics().await.unwrap(); |
| 563 | 576 | ||
| 564 | println!("After stop - Tracked total: {:?}", metrics.relays_tracked_total()); | 577 | println!( |
| 565 | println!("After stop - Connected total: {:?}", metrics.relays_connected_total()); | 578 | "After stop - Tracked total: {:?}", |
| 579 | metrics.relays_tracked_total() | ||
| 580 | ); | ||
| 581 | println!( | ||
| 582 | "After stop - Connected total: {:?}", | ||
| 583 | metrics.relays_connected_total() | ||
| 584 | ); | ||
| 566 | 585 | ||
| 567 | assert_eq!( | 586 | assert_eq!( |
| 568 | metrics.relays_tracked_total(), | 587 | metrics.relays_tracked_total(), |
| @@ -576,4 +595,4 @@ async fn test_multi_source_aggregate_counts() { | |||
| 576 | ); | 595 | ); |
| 577 | 596 | ||
| 578 | harness.stop_all().await; | 597 | harness.stop_all().await; |
| 579 | } \ No newline at end of file | 598 | } |