upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 11:19:38 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 11:19:38 +0000
commitc82684092c7b4f81e49833b0888500fcb9851218 (patch)
tree6853940a5bb975aa17e1f91be988e3b3303ff39f
parent532a7d0d5d8461bad0fc799aacb5eea0135f79f3 (diff)
fix(sync): improve metrics recording and connection failure detection
Changes: - Fix connection attempt metrics: record success/failure based on actual connection result instead of pre-emptively recording failure - Add health tracker integration on connection failure: call record_failure() and record_health_state() in error path - Add connection verification in relay_connection.rs: wait 500ms after connect() then verify is_connected() to detect silent failures - Add configurable disconnect check interval via NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS env var - Update TestRelay with fast test settings: startup_delay=0, jitter=0, disconnect_check_interval=1s - Add debug output to metrics tests for investigation Note: Tests may still fail due to 5-second base backoff in health tracker. A follow-up task will add NGIT_SYNC_BASE_BACKOFF_SECS config parameter to allow faster test cycles. Related: metrics-wiring-plan.md Tasks 1 & 2
-rw-r--r--src/config.rs6
-rw-r--r--src/sync/mod.rs76
-rw-r--r--src/sync/relay_connection.rs18
-rw-r--r--tests/common/relay.rs3
-rw-r--r--tests/sync/metrics.rs34
5 files changed, 107 insertions, 30 deletions
diff --git a/src/config.rs b/src/config.rs
index 69a160a..5e74471 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -109,6 +109,11 @@ pub struct Config {
109 /// Set to 0 to disable jitter (useful for testing) 109 /// Set to 0 to disable jitter (useful for testing)
110 #[arg(long, env = "NGIT_SYNC_STARTUP_JITTER_MS", default_value_t = 10_000)] 110 #[arg(long, env = "NGIT_SYNC_STARTUP_JITTER_MS", default_value_t = 10_000)]
111 pub sync_startup_jitter_ms: u64, 111 pub sync_startup_jitter_ms: u64,
112
113 /// Interval in seconds for checking disconnected relays and attempting reconnection (default: 60)
114 /// Set to lower value for faster reconnection testing
115 #[arg(long, env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", default_value_t = 60)]
116 pub sync_disconnect_check_interval_secs: u64,
112} 117}
113 118
114impl Config { 119impl Config {
@@ -170,6 +175,7 @@ impl Config {
170 sync_reconnect_delay_secs: 10, 175 sync_reconnect_delay_secs: 10,
171 sync_reconnect_lookback_days: 3, 176 sync_reconnect_lookback_days: 3,
172 sync_startup_jitter_ms: 10_000, 177 sync_startup_jitter_ms: 10_000,
178 sync_disconnect_check_interval_secs: 60,
173 } 179 }
174 } 180 }
175} 181}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 5039c04..16ad833 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -264,23 +264,30 @@ async fn run_daily_timer(
264// Disconnect Checker (Phase 8) 264// Disconnect Checker (Phase 8)
265// ============================================================================= 265// =============================================================================
266 266
267/// Check interval for empty relay cleanup in seconds
268const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60;
269
270/// Run the disconnect checker for periodic cleanup of empty relays 267/// Run the disconnect checker for periodic cleanup of empty relays
271/// 268///
272/// This function runs in a loop, checking every 60 seconds for relays 269/// This function runs in a loop, checking at the configured interval for relays
273/// that have no repos or root events to sync. Non-bootstrap relays 270/// that have no repos or root events to sync. Non-bootstrap relays
274/// that are empty will be disconnected to free up resources. 271/// that are empty will be disconnected to free up resources.
275/// 272///
276/// Bootstrap relays are never disconnected, even if empty. 273/// Bootstrap relays are never disconnected, even if empty.
274///
275/// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS`
276/// (default: 60 seconds). Set to a lower value for faster reconnection testing.
277async fn run_disconnect_checker( 277async fn run_disconnect_checker(
278 sync_manager: Arc<Mutex<SyncManager>>, 278 sync_manager: Arc<Mutex<SyncManager>>,
279 mut shutdown_rx: broadcast::Receiver<()>, 279 mut shutdown_rx: broadcast::Receiver<()>,
280 check_interval_secs: u64,
280) { 281) {
282 let interval = Duration::from_secs(check_interval_secs);
283 tracing::info!(
284 interval_secs = check_interval_secs,
285 "Disconnect checker started with configured interval"
286 );
287
281 loop { 288 loop {
282 tokio::select! { 289 tokio::select! {
283 _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => { 290 _ = tokio::time::sleep(interval) => {
284 tracing::debug!("Disconnect checker running"); 291 tracing::debug!("Disconnect checker running");
285 292
286 let mut manager = sync_manager.lock().await; 293 let mut manager = sync_manager.lock().await;
@@ -609,21 +616,24 @@ impl SyncManager {
609 self.spawn_relay_connection(bootstrap_url.clone()).await; 616 self.spawn_relay_connection(bootstrap_url.clone()).await;
610 } 617 }
611 618
612 // 7. Wrap self in Arc<Mutex> for sharing with timer task 619 // 7. Capture config values before moving self into Arc
620 let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs;
621
622 // 8. Wrap self in Arc<Mutex> for sharing with timer task
613 let sync_manager = Arc::new(Mutex::new(self)); 623 let sync_manager = Arc::new(Mutex::new(self));
614 624
615 // 8. Spawn daily timer task with shutdown receiver 625 // 9. Spawn daily timer task with shutdown receiver
616 let timer_manager = Arc::clone(&sync_manager); 626 let timer_manager = Arc::clone(&sync_manager);
617 let timer_shutdown = shutdown_tx.subscribe(); 627 let timer_shutdown = shutdown_tx.subscribe();
618 tokio::spawn(async move { 628 tokio::spawn(async move {
619 run_daily_timer(timer_manager, timer_shutdown).await; 629 run_daily_timer(timer_manager, timer_shutdown).await;
620 }); 630 });
621 631
622 // 9. Spawn disconnect checker task with shutdown receiver 632 // 10. Spawn disconnect checker task with shutdown receiver
623 let checker_manager = Arc::clone(&sync_manager); 633 let checker_manager = Arc::clone(&sync_manager);
624 let checker_shutdown = shutdown_tx.subscribe(); 634 let checker_shutdown = shutdown_tx.subscribe();
625 tokio::spawn(async move { 635 tokio::spawn(async move {
626 run_disconnect_checker(checker_manager, checker_shutdown).await; 636 run_disconnect_checker(checker_manager, checker_shutdown, disconnect_check_interval_secs).await;
627 }); 637 });
628 638
629 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 639 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
@@ -1174,27 +1184,39 @@ impl SyncManager {
1174 // Create relay connection 1184 // Create relay connection
1175 let connection = RelayConnection::new(relay_url.clone()); 1185 let connection = RelayConnection::new(relay_url.clone());
1176 1186
1177 // Record connection attempt
1178 if let Some(ref metrics) = self.metrics {
1179 metrics.record_connection_attempt(&relay_url, false);
1180 }
1181
1182 // Connect and subscribe to Layer 1 1187 // Connect and subscribe to Layer 1
1183 if let Err(e) = connection.connect_and_subscribe(None).await { 1188 match connection.connect_and_subscribe(None).await {
1184 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); 1189 Ok(_) => {
1185 // Update state to disconnected on failure 1190 // Record successful connection attempt
1186 { 1191 if let Some(ref metrics) = self.metrics {
1187 let mut index = relay_sync_index.write().await; 1192 metrics.record_connection_attempt(&relay_url, true);
1188 if let Some(state) = index.get_mut(&relay_url) {
1189 state.connection_status = ConnectionStatus::Disconnected;
1190 } 1193 }
1191 } 1194 }
1192 return; 1195 Err(e) => {
1193 } 1196 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
1194 1197
1195 // If successful, update connection attempt metric to success 1198 // Record failed connection attempt
1196 if let Some(ref metrics) = self.metrics { 1199 if let Some(ref metrics) = self.metrics {
1197 metrics.record_connection_attempt(&relay_url, true); 1200 metrics.record_connection_attempt(&relay_url, false);
1201 }
1202
1203 // Record failure in health tracker
1204 self.health_tracker.record_failure(&relay_url);
1205
1206 // Record health state in metrics
1207 if let Some(ref metrics) = self.metrics {
1208 metrics.record_health_state(&relay_url, self.health_tracker.get_state(&relay_url));
1209 }
1210
1211 // Update state to disconnected on failure
1212 {
1213 let mut index = relay_sync_index.write().await;
1214 if let Some(state) = index.get_mut(&relay_url) {
1215 state.connection_status = ConnectionStatus::Disconnected;
1216 }
1217 }
1218 return;
1219 }
1198 } 1220 }
1199 1221
1200 // Mark as connected in relay sync index 1222 // Mark as connected in relay sync index
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 09c9887..d69e112 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -55,7 +55,8 @@ impl RelayConnection {
55 /// This method: 55 /// This method:
56 /// 1. Adds the relay to the client 56 /// 1. Adds the relay to the client
57 /// 2. Establishes the WebSocket connection 57 /// 2. Establishes the WebSocket connection
58 /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618) 58 /// 3. Verifies connection was established
59 /// 4. Subscribes to Layer 1 filter (kinds 30617 + 30618)
59 /// 60 ///
60 /// # Arguments 61 /// # Arguments
61 /// * `since` - Optional timestamp for incremental sync on reconnect 62 /// * `since` - Optional timestamp for incremental sync on reconnect
@@ -76,6 +77,21 @@ impl RelayConnection {
76 // Establish connection 77 // Establish connection
77 self.client.connect().await; 78 self.client.connect().await;
78 79
80 // Wait briefly for connection to establish and check status
81 // nostr-sdk's connect() is async and may not immediately reflect failure
82 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
83
84 // Check if relay is actually connected
85 let relay = self.client.relay(&self.url).await
86 .map_err(|e| format!("Failed to get relay handle for {}: {}", self.url, e))?;
87
88 if !relay.is_connected() {
89 return Err(format!(
90 "Failed to connect to relay {}: connection not established after timeout",
91 self.url
92 ));
93 }
94
79 // Subscribe to Layer 1 (announcements) 95 // Subscribe to Layer 1 (announcements)
80 let filter = build_announcement_filter(since); 96 let filter = build_announcement_filter(since);
81 let output = self 97 let output = self
diff --git a/tests/common/relay.rs b/tests/common/relay.rs
index d954e34..b11dde3 100644
--- a/tests/common/relay.rs
+++ b/tests/common/relay.rs
@@ -94,6 +94,9 @@ impl TestRelay {
94 .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation 94 .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation
95 .env("NGIT_OWNER_NPUB", &test_npub) 95 .env("NGIT_OWNER_NPUB", &test_npub)
96 .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) 96 .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default)
97 .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests
98 .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests
99 .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests
97 .env("RUST_LOG", "info") // Enable INFO logging for diagnostics 100 .env("RUST_LOG", "info") // Enable INFO logging for diagnostics
98 .stdout(Stdio::null()) // Disable stderr for cleaner test output 101 .stdout(Stdio::null()) // Disable stderr for cleaner test output
99 // .stdout(Stdio::inherit()) // Show stdout for diagnostics 102 // .stdout(Stdio::inherit()) // Show stdout for diagnostics
diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs
index 82d681e..e11fe58 100644
--- a/tests/sync/metrics.rs
+++ b/tests/sync/metrics.rs
@@ -368,17 +368,48 @@ async fn test_startup_sync_event_count() {
368/// NOTE: This test may fail until sync metrics recording is fully wired up. 368/// NOTE: This test may fail until sync metrics recording is fully wired up.
369/// The test documents the expected behavior. 369/// The test documents the expected behavior.
370#[tokio::test] 370#[tokio::test]
371#[ignore] // Enable when metrics recording is implemented
372async fn test_connection_failure_increments_counter() { 371async fn test_connection_failure_increments_counter() {
373 let mut harness = MetricsTestHarness::with_sources(0).await; // No sources 372 let mut harness = MetricsTestHarness::with_sources(0).await; // No sources
374 harness.start_syncing_relay_to_nowhere().await; 373 harness.start_syncing_relay_to_nowhere().await;
375 374
376 // Wait for initial connection attempts 375 // Wait for initial connection attempts
377 tokio::time::sleep(Duration::from_secs(2)).await; 376 tokio::time::sleep(Duration::from_secs(2)).await;
377
378 // Fetch raw metrics to debug
379 let syncing_url = harness.syncing_relay_url().expect("Syncing relay should be started");
380 let raw_1 = fetch_metrics(syncing_url)
381 .await
382 .expect("Failed to fetch metrics");
383
384 // Print all sync-related metrics
385 println!("\n=== RAW METRICS (t1) ===");
386 for line in raw_1.lines() {
387 if line.contains("sync") || line.contains("connection") {
388 println!("{}", line);
389 }
390 }
391 println!("========================\n");
392
378 let metrics_1 = harness.get_metrics().await.unwrap(); 393 let metrics_1 = harness.get_metrics().await.unwrap();
379 394
380 // Wait for more attempts 395 // Wait for more attempts
381 tokio::time::sleep(Duration::from_secs(2)).await; 396 tokio::time::sleep(Duration::from_secs(2)).await;
397
398 // Fetch raw metrics again
399 let syncing_url = harness.syncing_relay_url().expect("Syncing relay should be started");
400 let raw_2 = fetch_metrics(syncing_url)
401 .await
402 .expect("Failed to fetch metrics");
403
404 // Print all sync-related metrics
405 println!("\n=== RAW METRICS (t2) ===");
406 for line in raw_2.lines() {
407 if line.contains("sync") || line.contains("connection") {
408 println!("{}", line);
409 }
410 }
411 println!("========================\n");
412
382 let metrics_2 = harness.get_metrics().await.unwrap(); 413 let metrics_2 = harness.get_metrics().await.unwrap();
383 414
384 // Failure counter should have increased 415 // Failure counter should have increased
@@ -496,7 +527,6 @@ async fn test_relay_connected_status() {
496/// NOTE: This test may fail until sync metrics recording is fully wired up. 527/// NOTE: This test may fail until sync metrics recording is fully wired up.
497/// The test documents the expected behavior. 528/// The test documents the expected behavior.
498#[tokio::test] 529#[tokio::test]
499#[ignore] // Ignored until sync metrics are fully wired up
500async fn test_health_state_degrades_on_failure() { 530async fn test_health_state_degrades_on_failure() {
501 use crate::common::sync_helpers::MetricsTestHarness; 531 use crate::common::sync_helpers::MetricsTestHarness;
502 532