upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 12:36:51 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 12:36:51 +0000
commit83844a528365e657cd5f4d2cda51d72ced9900da (patch)
tree023052e0c7709341aa30ee7d40a62a26e2368cea
parentb0ea9aa56c90fe36604e56707498261d761b9a56 (diff)
fix: wire up relay disconnection detection for metrics
- Add periodic health check in RelayConnection::run_event_loop that polls nostr-sdk's relay.is_connected() every second to detect dead connections - When event channel closes without explicit Closed/Shutdown, send DisconnectNotification to SyncManager (fixes case where TCP drops silently) - Enable test_relay_connected_status test which validates the ngit_sync_relay_connected metric correctly reflects connection state The issue was that when a remote relay stops abruptly, nostr-sdk's notification receiver blocks indefinitely waiting for data. TCP disconnect detection without keepalive can take minutes. The health check polls nostr-sdk's internal relay status which detects disconnection promptly.
-rw-r--r--src/sync/mod.rs19
-rw-r--r--src/sync/relay_connection.rs99
-rw-r--r--tests/sync/metrics.rs47
3 files changed, 119 insertions, 46 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index c62b478..15c89e3 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1251,6 +1251,9 @@ impl SyncManager {
1251 let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task 1251 let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task
1252 let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task 1252 let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task
1253 tokio::spawn(async move { 1253 tokio::spawn(async move {
1254 // Track whether we've already sent a disconnect notification
1255 let mut disconnect_sent = false;
1256
1254 while let Some(relay_event) = event_rx.recv().await { 1257 while let Some(relay_event) = event_rx.recv().await {
1255 match relay_event { 1258 match relay_event {
1256 RelayEvent::Event(event) => { 1259 RelayEvent::Event(event) => {
@@ -1297,6 +1300,7 @@ impl SyncManager {
1297 relay_url: relay_url_clone.clone(), 1300 relay_url: relay_url_clone.clone(),
1298 }) 1301 })
1299 .await; 1302 .await;
1303 disconnect_sent = true;
1300 break; 1304 break;
1301 } 1305 }
1302 RelayEvent::Shutdown => { 1306 RelayEvent::Shutdown => {
@@ -1307,10 +1311,25 @@ impl SyncManager {
1307 relay_url: relay_url_clone.clone(), 1311 relay_url: relay_url_clone.clone(),
1308 }) 1312 })
1309 .await; 1313 .await;
1314 disconnect_sent = true;
1310 break; 1315 break;
1311 } 1316 }
1312 } 1317 }
1313 } 1318 }
1319
1320 // If the event channel closed without a Closed/Shutdown event
1321 // (e.g., connection dropped unexpectedly), still notify SyncManager
1322 if !disconnect_sent {
1323 tracing::info!(
1324 relay = %relay_url_clone,
1325 "Event channel closed, notifying SyncManager of disconnect"
1326 );
1327 let _ = disconnect_tx
1328 .send(DisconnectNotification {
1329 relay_url: relay_url_clone.clone(),
1330 })
1331 .await;
1332 }
1314 }); 1333 });
1315 1334
1316 tracing::info!( 1335 tracing::info!(
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 9a580d2..63e4247 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -124,48 +124,83 @@ impl RelayConnection {
124 /// # Arguments 124 /// # Arguments
125 /// * `event_sender` - Channel to send relay events through 125 /// * `event_sender` - Channel to send relay events through
126 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { 126 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
127 use std::time::Duration;
128 use tokio::time::interval;
129
127 let mut notifications = self.client.notifications(); 130 let mut notifications = self.client.notifications();
128 let url = self.url.clone(); 131 let url = self.url.clone();
132
133 // Check connection status every second to detect dead connections
134 let mut check_interval = interval(Duration::from_secs(1));
129 135
130 tracing::debug!(relay = %url, "Starting event loop"); 136 tracing::debug!(relay = %url, "Starting event loop");
131 137
132 while let Ok(notification) = notifications.recv().await { 138 loop {
133 match notification { 139 tokio::select! {
134 RelayPoolNotification::Event { event, .. } => { 140 // Check for new notifications
135 tracing::trace!(relay = %url, event_id = %event.id, "Received event"); 141 notification_result = notifications.recv() => {
136 if event_sender.send(RelayEvent::Event(*event)).await.is_err() { 142 match notification_result {
137 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 143 Ok(notification) => {
138 break; 144 match notification {
139 } 145 RelayPoolNotification::Event { event, .. } => {
140 } 146 tracing::trace!(relay = %url, event_id = %event.id, "Received event");
141 RelayPoolNotification::Message { message, .. } => { 147 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
142 match message { 148 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
143 RelayMessage::EndOfStoredEvents(sub_id) => { 149 break;
144 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); 150 }
145 // Convert Cow<SubscriptionId> to owned SubscriptionId 151 }
146 let owned_sub_id = sub_id.into_owned(); 152 RelayPoolNotification::Message { message, .. } => {
147 if event_sender 153 match message {
148 .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) 154 RelayMessage::EndOfStoredEvents(sub_id) => {
149 .await 155 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE");
150 .is_err() 156 // Convert Cow<SubscriptionId> to owned SubscriptionId
151 { 157 let owned_sub_id = sub_id.into_owned();
152 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 158 if event_sender
153 break; 159 .send(RelayEvent::EndOfStoredEvents(owned_sub_id))
160 .await
161 .is_err()
162 {
163 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
164 break;
165 }
166 }
167 RelayMessage::Closed { message: msg, .. } => {
168 tracing::info!(relay = %url, message = %msg, "Relay closed subscription");
169 let _ = event_sender
170 .send(RelayEvent::Closed(msg.to_string()))
171 .await;
172 break;
173 }
174 _ => {}
175 }
176 }
177 RelayPoolNotification::Shutdown => {
178 tracing::info!(relay = %url, "Relay pool shutdown");
179 let _ = event_sender.send(RelayEvent::Shutdown).await;
180 break;
181 }
154 } 182 }
155 } 183 }
156 RelayMessage::Closed { message: msg, .. } => { 184 Err(_) => {
157 tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); 185 // Notification channel closed - connection lost
158 let _ = event_sender 186 tracing::debug!(relay = %url, "Notification channel error, stopping event loop");
159 .send(RelayEvent::Closed(msg.to_string())) 187 break;
160 .await;
161 } 188 }
162 _ => {}
163 } 189 }
164 } 190 }
165 RelayPoolNotification::Shutdown => { 191 // Periodic connection health check
166 tracing::info!(relay = %url, "Relay pool shutdown"); 192 _ = check_interval.tick() => {
167 let _ = event_sender.send(RelayEvent::Shutdown).await; 193 // Check if relay is still connected via nostr-sdk
168 break; 194 if let Ok(relay) = self.client.relay(&self.url).await {
195 if !relay.is_connected() {
196 tracing::info!(relay = %url, "Relay disconnected (detected by health check)");
197 break;
198 }
199 } else {
200 // Relay not found in client - must be disconnected
201 tracing::info!(relay = %url, "Relay not found (detected by health check)");
202 break;
203 }
169 } 204 }
170 } 205 }
171 } 206 }
diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs
index 775159b..3accd0f 100644
--- a/tests/sync/metrics.rs
+++ b/tests/sync/metrics.rs
@@ -247,7 +247,12 @@ async fn test_startup_sync_event_count() {
247 // 6. Create 3 patch events (Layer 2) that reference the announcement 247 // 6. Create 3 patch events (Layer 2) that reference the announcement
248 let patches: Vec<_> = (0..3) 248 let patches: Vec<_> = (0..3)
249 .map(|i| { 249 .map(|i| {
250 create_event_referencing_repo(&keys, &repo_coord, KIND_PATCH, &format!("Test patch {}", i)) 250 create_event_referencing_repo(
251 &keys,
252 &repo_coord,
253 KIND_PATCH,
254 &format!("Test patch {}", i),
255 )
251 }) 256 })
252 .collect(); 257 .collect();
253 println!("Created {} patches", patches.len()); 258 println!("Created {} patches", patches.len());
@@ -320,8 +325,12 @@ async fn test_startup_sync_event_count() {
320 .kind(Kind::Custom(KIND_PATCH)) 325 .kind(Kind::Custom(KIND_PATCH))
321 .author(keys.public_key()); 326 .author(keys.public_key());
322 327
323 let patches_synced = 328 let patches_synced = crate::common::sync_helpers::wait_for_event_on_relay(
324 crate::common::sync_helpers::wait_for_event_on_relay(syncing_relay.url(), filter, Duration::from_secs(2)).await; 329 syncing_relay.url(),
330 filter,
331 Duration::from_secs(2),
332 )
333 .await;
325 println!("Patches synced to syncing relay: {}", patches_synced); 334 println!("Patches synced to syncing relay: {}", patches_synced);
326 335
327 // Cleanup 336 // Cleanup
@@ -374,12 +383,15 @@ async fn test_connection_failure_increments_counter() {
374 383
375 // Wait for initial connection attempt to the unreachable bootstrap relay 384 // Wait for initial connection attempt to the unreachable bootstrap relay
376 tokio::time::sleep(Duration::from_secs(2)).await; 385 tokio::time::sleep(Duration::from_secs(2)).await;
377 386
378 let metrics = harness.get_metrics().await.unwrap(); 387 let metrics = harness.get_metrics().await.unwrap();
379 388
380 // Failure counter should be recorded when connecting to unreachable relay 389 // Failure counter should be recorded when connecting to unreachable relay
381 let failures = metrics 390 let failures = metrics
382 .counter("ngit_sync_connection_attempts_total", &[("result", "failure")]) 391 .counter(
392 "ngit_sync_connection_attempts_total",
393 &[("result", "failure")],
394 )
383 .unwrap_or(0); 395 .unwrap_or(0);
384 396
385 println!("Connection failures recorded: {}", failures); 397 println!("Connection failures recorded: {}", failures);
@@ -413,7 +425,9 @@ async fn test_live_sync_event_count() {
413 // Now add events - these should be "live" not "startup" 425 // Now add events - these should be "live" not "startup"
414 let keys = Keys::generate(); 426 let keys = Keys::generate();
415 let events: Vec<_> = (0..2) 427 let events: Vec<_> = (0..2)
416 .map(|i| create_repo_announcement(&keys, &[&harness.source_domain(0)], &format!("live-{}", i))) 428 .map(|i| {
429 create_repo_announcement(&keys, &[&harness.source_domain(0)], &format!("live-{}", i))
430 })
417 .collect(); 431 .collect();
418 harness.submit_events(0, &events).await.unwrap(); 432 harness.submit_events(0, &events).await.unwrap();
419 433
@@ -434,11 +448,7 @@ async fn test_live_sync_event_count() {
434/// 448///
435/// This test validates that the ngit_sync_relay_connected gauge 449/// This test validates that the ngit_sync_relay_connected gauge
436/// correctly reflects the connection state of source relays. 450/// correctly reflects the connection state of source relays.
437///
438/// NOTE: This test may fail until sync metrics recording is fully wired up.
439/// The test documents the expected behavior.
440#[tokio::test] 451#[tokio::test]
441#[ignore] // Enable when relay connected status metrics are wired up
442async fn test_relay_connected_status() { 452async fn test_relay_connected_status() {
443 let mut harness = MetricsTestHarness::with_sources(1).await; 453 let mut harness = MetricsTestHarness::with_sources(1).await;
444 harness.start_syncing_relay(0).await; 454 harness.start_syncing_relay(0).await;
@@ -505,7 +515,10 @@ async fn test_health_state_degrades_on_failure() {
505 // Get the relay status (1=healthy, 2=degraded, 3=dead) 515 // Get the relay status (1=healthy, 2=degraded, 3=dead)
506 let status = later.gauge("ngit_sync_relay_status", &[]).unwrap_or(0); 516 let status = later.gauge("ngit_sync_relay_status", &[]).unwrap_or(0);
507 517
508 println!("Initial metrics: {:?}", initial.gauge("ngit_sync_relay_status", &[])); 518 println!(
519 "Initial metrics: {:?}",
520 initial.gauge("ngit_sync_relay_status", &[])
521 );
509 println!("Later status: {}", status); 522 println!("Later status: {}", status);
510 523
511 assert!( 524 assert!(
@@ -561,8 +574,14 @@ async fn test_multi_source_aggregate_counts() {
561 574
562 let metrics = harness.get_metrics().await.unwrap(); 575 let metrics = harness.get_metrics().await.unwrap();
563 576
564 println!("After stop - Tracked total: {:?}", metrics.relays_tracked_total()); 577 println!(
565 println!("After stop - Connected total: {:?}", metrics.relays_connected_total()); 578 "After stop - Tracked total: {:?}",
579 metrics.relays_tracked_total()
580 );
581 println!(
582 "After stop - Connected total: {:?}",
583 metrics.relays_connected_total()
584 );
566 585
567 assert_eq!( 586 assert_eq!(
568 metrics.relays_tracked_total(), 587 metrics.relays_tracked_total(),
@@ -576,4 +595,4 @@ async fn test_multi_source_aggregate_counts() {
576 ); 595 );
577 596
578 harness.stop_all().await; 597 harness.stop_all().await;
579} \ No newline at end of file 598}