diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 11:19:38 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 11:19:38 +0000 |
| commit | c82684092c7b4f81e49833b0888500fcb9851218 (patch) | |
| tree | 6853940a5bb975aa17e1f91be988e3b3303ff39f | |
| parent | 532a7d0d5d8461bad0fc799aacb5eea0135f79f3 (diff) | |
fix(sync): improve metrics recording and connection failure detection
Changes:
- Fix connection attempt metrics: record success/failure based on actual
connection result instead of pre-emptively recording failure
- Add health tracker integration on connection failure: call
record_failure() and record_health_state() in error path
- Add connection verification in relay_connection.rs: wait 500ms after
connect() then verify is_connected() to detect silent failures
- Add configurable disconnect check interval via
NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS env var
- Update TestRelay with fast test settings: startup_delay=0, jitter=0,
disconnect_check_interval=1s
- Add debug output to metrics tests for investigation
Note: Tests may still fail due to 5-second base backoff in health tracker.
A follow-up task will add NGIT_SYNC_BASE_BACKOFF_SECS config parameter
to allow faster test cycles.
Related: metrics-wiring-plan.md Tasks 1 & 2
| -rw-r--r-- | src/config.rs | 6 | ||||
| -rw-r--r-- | src/sync/mod.rs | 76 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 18 | ||||
| -rw-r--r-- | tests/common/relay.rs | 3 | ||||
| -rw-r--r-- | tests/sync/metrics.rs | 34 |
5 files changed, 107 insertions, 30 deletions
diff --git a/src/config.rs b/src/config.rs index 69a160a..5e74471 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -109,6 +109,11 @@ pub struct Config { | |||
| 109 | /// Set to 0 to disable jitter (useful for testing) | 109 | /// Set to 0 to disable jitter (useful for testing) |
| 110 | #[arg(long, env = "NGIT_SYNC_STARTUP_JITTER_MS", default_value_t = 10_000)] | 110 | #[arg(long, env = "NGIT_SYNC_STARTUP_JITTER_MS", default_value_t = 10_000)] |
| 111 | pub sync_startup_jitter_ms: u64, | 111 | pub sync_startup_jitter_ms: u64, |
| 112 | |||
| 113 | /// Interval in seconds for checking disconnected relays and attempting reconnection (default: 60) | ||
| 114 | /// Set to lower value for faster reconnection testing | ||
| 115 | #[arg(long, env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", default_value_t = 60)] | ||
| 116 | pub sync_disconnect_check_interval_secs: u64, | ||
| 112 | } | 117 | } |
| 113 | 118 | ||
| 114 | impl Config { | 119 | impl Config { |
| @@ -170,6 +175,7 @@ impl Config { | |||
| 170 | sync_reconnect_delay_secs: 10, | 175 | sync_reconnect_delay_secs: 10, |
| 171 | sync_reconnect_lookback_days: 3, | 176 | sync_reconnect_lookback_days: 3, |
| 172 | sync_startup_jitter_ms: 10_000, | 177 | sync_startup_jitter_ms: 10_000, |
| 178 | sync_disconnect_check_interval_secs: 60, | ||
| 173 | } | 179 | } |
| 174 | } | 180 | } |
| 175 | } | 181 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 5039c04..16ad833 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -264,23 +264,30 @@ async fn run_daily_timer( | |||
| 264 | // Disconnect Checker (Phase 8) | 264 | // Disconnect Checker (Phase 8) |
| 265 | // ============================================================================= | 265 | // ============================================================================= |
| 266 | 266 | ||
| 267 | /// Check interval for empty relay cleanup in seconds | ||
| 268 | const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60; | ||
| 269 | |||
| 270 | /// Run the disconnect checker for periodic cleanup of empty relays | 267 | /// Run the disconnect checker for periodic cleanup of empty relays |
| 271 | /// | 268 | /// |
| 272 | /// This function runs in a loop, checking every 60 seconds for relays | 269 | /// This function runs in a loop, checking at the configured interval for relays |
| 273 | /// that have no repos or root events to sync. Non-bootstrap relays | 270 | /// that have no repos or root events to sync. Non-bootstrap relays |
| 274 | /// that are empty will be disconnected to free up resources. | 271 | /// that are empty will be disconnected to free up resources. |
| 275 | /// | 272 | /// |
| 276 | /// Bootstrap relays are never disconnected, even if empty. | 273 | /// Bootstrap relays are never disconnected, even if empty. |
| 274 | /// | ||
| 275 | /// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` | ||
| 276 | /// (default: 60 seconds). Set to a lower value for faster reconnection testing. | ||
| 277 | async fn run_disconnect_checker( | 277 | async fn run_disconnect_checker( |
| 278 | sync_manager: Arc<Mutex<SyncManager>>, | 278 | sync_manager: Arc<Mutex<SyncManager>>, |
| 279 | mut shutdown_rx: broadcast::Receiver<()>, | 279 | mut shutdown_rx: broadcast::Receiver<()>, |
| 280 | check_interval_secs: u64, | ||
| 280 | ) { | 281 | ) { |
| 282 | let interval = Duration::from_secs(check_interval_secs); | ||
| 283 | tracing::info!( | ||
| 284 | interval_secs = check_interval_secs, | ||
| 285 | "Disconnect checker started with configured interval" | ||
| 286 | ); | ||
| 287 | |||
| 281 | loop { | 288 | loop { |
| 282 | tokio::select! { | 289 | tokio::select! { |
| 283 | _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => { | 290 | _ = tokio::time::sleep(interval) => { |
| 284 | tracing::debug!("Disconnect checker running"); | 291 | tracing::debug!("Disconnect checker running"); |
| 285 | 292 | ||
| 286 | let mut manager = sync_manager.lock().await; | 293 | let mut manager = sync_manager.lock().await; |
| @@ -609,21 +616,24 @@ impl SyncManager { | |||
| 609 | self.spawn_relay_connection(bootstrap_url.clone()).await; | 616 | self.spawn_relay_connection(bootstrap_url.clone()).await; |
| 610 | } | 617 | } |
| 611 | 618 | ||
| 612 | // 7. Wrap self in Arc<Mutex> for sharing with timer task | 619 | // 7. Capture config values before moving self into Arc |
| 620 | let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs; | ||
| 621 | |||
| 622 | // 8. Wrap self in Arc<Mutex> for sharing with timer task | ||
| 613 | let sync_manager = Arc::new(Mutex::new(self)); | 623 | let sync_manager = Arc::new(Mutex::new(self)); |
| 614 | 624 | ||
| 615 | // 8. Spawn daily timer task with shutdown receiver | 625 | // 9. Spawn daily timer task with shutdown receiver |
| 616 | let timer_manager = Arc::clone(&sync_manager); | 626 | let timer_manager = Arc::clone(&sync_manager); |
| 617 | let timer_shutdown = shutdown_tx.subscribe(); | 627 | let timer_shutdown = shutdown_tx.subscribe(); |
| 618 | tokio::spawn(async move { | 628 | tokio::spawn(async move { |
| 619 | run_daily_timer(timer_manager, timer_shutdown).await; | 629 | run_daily_timer(timer_manager, timer_shutdown).await; |
| 620 | }); | 630 | }); |
| 621 | 631 | ||
| 622 | // 9. Spawn disconnect checker task with shutdown receiver | 632 | // 10. Spawn disconnect checker task with shutdown receiver |
| 623 | let checker_manager = Arc::clone(&sync_manager); | 633 | let checker_manager = Arc::clone(&sync_manager); |
| 624 | let checker_shutdown = shutdown_tx.subscribe(); | 634 | let checker_shutdown = shutdown_tx.subscribe(); |
| 625 | tokio::spawn(async move { | 635 | tokio::spawn(async move { |
| 626 | run_disconnect_checker(checker_manager, checker_shutdown).await; | 636 | run_disconnect_checker(checker_manager, checker_shutdown, disconnect_check_interval_secs).await; |
| 627 | }); | 637 | }); |
| 628 | 638 | ||
| 629 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 639 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| @@ -1174,27 +1184,39 @@ impl SyncManager { | |||
| 1174 | // Create relay connection | 1184 | // Create relay connection |
| 1175 | let connection = RelayConnection::new(relay_url.clone()); | 1185 | let connection = RelayConnection::new(relay_url.clone()); |
| 1176 | 1186 | ||
| 1177 | // Record connection attempt | ||
| 1178 | if let Some(ref metrics) = self.metrics { | ||
| 1179 | metrics.record_connection_attempt(&relay_url, false); | ||
| 1180 | } | ||
| 1181 | |||
| 1182 | // Connect and subscribe to Layer 1 | 1187 | // Connect and subscribe to Layer 1 |
| 1183 | if let Err(e) = connection.connect_and_subscribe(None).await { | 1188 | match connection.connect_and_subscribe(None).await { |
| 1184 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | 1189 | Ok(_) => { |
| 1185 | // Update state to disconnected on failure | 1190 | // Record successful connection attempt |
| 1186 | { | 1191 | if let Some(ref metrics) = self.metrics { |
| 1187 | let mut index = relay_sync_index.write().await; | 1192 | metrics.record_connection_attempt(&relay_url, true); |
| 1188 | if let Some(state) = index.get_mut(&relay_url) { | ||
| 1189 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 1190 | } | 1193 | } |
| 1191 | } | 1194 | } |
| 1192 | return; | 1195 | Err(e) => { |
| 1193 | } | 1196 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); |
| 1194 | 1197 | ||
| 1195 | // If successful, update connection attempt metric to success | 1198 | // Record failed connection attempt |
| 1196 | if let Some(ref metrics) = self.metrics { | 1199 | if let Some(ref metrics) = self.metrics { |
| 1197 | metrics.record_connection_attempt(&relay_url, true); | 1200 | metrics.record_connection_attempt(&relay_url, false); |
| 1201 | } | ||
| 1202 | |||
| 1203 | // Record failure in health tracker | ||
| 1204 | self.health_tracker.record_failure(&relay_url); | ||
| 1205 | |||
| 1206 | // Record health state in metrics | ||
| 1207 | if let Some(ref metrics) = self.metrics { | ||
| 1208 | metrics.record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); | ||
| 1209 | } | ||
| 1210 | |||
| 1211 | // Update state to disconnected on failure | ||
| 1212 | { | ||
| 1213 | let mut index = relay_sync_index.write().await; | ||
| 1214 | if let Some(state) = index.get_mut(&relay_url) { | ||
| 1215 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 1216 | } | ||
| 1217 | } | ||
| 1218 | return; | ||
| 1219 | } | ||
| 1198 | } | 1220 | } |
| 1199 | 1221 | ||
| 1200 | // Mark as connected in relay sync index | 1222 | // Mark as connected in relay sync index |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 09c9887..d69e112 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -55,7 +55,8 @@ impl RelayConnection { | |||
| 55 | /// This method: | 55 | /// This method: |
| 56 | /// 1. Adds the relay to the client | 56 | /// 1. Adds the relay to the client |
| 57 | /// 2. Establishes the WebSocket connection | 57 | /// 2. Establishes the WebSocket connection |
| 58 | /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618) | 58 | /// 3. Verifies connection was established |
| 59 | /// 4. Subscribes to Layer 1 filter (kinds 30617 + 30618) | ||
| 59 | /// | 60 | /// |
| 60 | /// # Arguments | 61 | /// # Arguments |
| 61 | /// * `since` - Optional timestamp for incremental sync on reconnect | 62 | /// * `since` - Optional timestamp for incremental sync on reconnect |
| @@ -76,6 +77,21 @@ impl RelayConnection { | |||
| 76 | // Establish connection | 77 | // Establish connection |
| 77 | self.client.connect().await; | 78 | self.client.connect().await; |
| 78 | 79 | ||
| 80 | // Wait briefly for connection to establish and check status | ||
| 81 | // nostr-sdk's connect() is async and may not immediately reflect failure | ||
| 82 | tokio::time::sleep(std::time::Duration::from_millis(500)).await; | ||
| 83 | |||
| 84 | // Check if relay is actually connected | ||
| 85 | let relay = self.client.relay(&self.url).await | ||
| 86 | .map_err(|e| format!("Failed to get relay handle for {}: {}", self.url, e))?; | ||
| 87 | |||
| 88 | if !relay.is_connected() { | ||
| 89 | return Err(format!( | ||
| 90 | "Failed to connect to relay {}: connection not established after timeout", | ||
| 91 | self.url | ||
| 92 | )); | ||
| 93 | } | ||
| 94 | |||
| 79 | // Subscribe to Layer 1 (announcements) | 95 | // Subscribe to Layer 1 (announcements) |
| 80 | let filter = build_announcement_filter(since); | 96 | let filter = build_announcement_filter(since); |
| 81 | let output = self | 97 | let output = self |
diff --git a/tests/common/relay.rs b/tests/common/relay.rs index d954e34..b11dde3 100644 --- a/tests/common/relay.rs +++ b/tests/common/relay.rs | |||
| @@ -94,6 +94,9 @@ impl TestRelay { | |||
| 94 | .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation | 94 | .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation |
| 95 | .env("NGIT_OWNER_NPUB", &test_npub) | 95 | .env("NGIT_OWNER_NPUB", &test_npub) |
| 96 | .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) | 96 | .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) |
| 97 | .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests | ||
| 98 | .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests | ||
| 99 | .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests | ||
| 97 | .env("RUST_LOG", "info") // Enable INFO logging for diagnostics | 100 | .env("RUST_LOG", "info") // Enable INFO logging for diagnostics |
| 98 | .stdout(Stdio::null()) // Disable stderr for cleaner test output | 101 | .stdout(Stdio::null()) // Disable stderr for cleaner test output |
| 99 | // .stdout(Stdio::inherit()) // Show stdout for diagnostics | 102 | // .stdout(Stdio::inherit()) // Show stdout for diagnostics |
diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs index 82d681e..e11fe58 100644 --- a/tests/sync/metrics.rs +++ b/tests/sync/metrics.rs | |||
| @@ -368,17 +368,48 @@ async fn test_startup_sync_event_count() { | |||
| 368 | /// NOTE: This test may fail until sync metrics recording is fully wired up. | 368 | /// NOTE: This test may fail until sync metrics recording is fully wired up. |
| 369 | /// The test documents the expected behavior. | 369 | /// The test documents the expected behavior. |
| 370 | #[tokio::test] | 370 | #[tokio::test] |
| 371 | #[ignore] // Enable when metrics recording is implemented | ||
| 372 | async fn test_connection_failure_increments_counter() { | 371 | async fn test_connection_failure_increments_counter() { |
| 373 | let mut harness = MetricsTestHarness::with_sources(0).await; // No sources | 372 | let mut harness = MetricsTestHarness::with_sources(0).await; // No sources |
| 374 | harness.start_syncing_relay_to_nowhere().await; | 373 | harness.start_syncing_relay_to_nowhere().await; |
| 375 | 374 | ||
| 376 | // Wait for initial connection attempts | 375 | // Wait for initial connection attempts |
| 377 | tokio::time::sleep(Duration::from_secs(2)).await; | 376 | tokio::time::sleep(Duration::from_secs(2)).await; |
| 377 | |||
| 378 | // Fetch raw metrics to debug | ||
| 379 | let syncing_url = harness.syncing_relay_url().expect("Syncing relay should be started"); | ||
| 380 | let raw_1 = fetch_metrics(syncing_url) | ||
| 381 | .await | ||
| 382 | .expect("Failed to fetch metrics"); | ||
| 383 | |||
| 384 | // Print all sync-related metrics | ||
| 385 | println!("\n=== RAW METRICS (t1) ==="); | ||
| 386 | for line in raw_1.lines() { | ||
| 387 | if line.contains("sync") || line.contains("connection") { | ||
| 388 | println!("{}", line); | ||
| 389 | } | ||
| 390 | } | ||
| 391 | println!("========================\n"); | ||
| 392 | |||
| 378 | let metrics_1 = harness.get_metrics().await.unwrap(); | 393 | let metrics_1 = harness.get_metrics().await.unwrap(); |
| 379 | 394 | ||
| 380 | // Wait for more attempts | 395 | // Wait for more attempts |
| 381 | tokio::time::sleep(Duration::from_secs(2)).await; | 396 | tokio::time::sleep(Duration::from_secs(2)).await; |
| 397 | |||
| 398 | // Fetch raw metrics again | ||
| 399 | let syncing_url = harness.syncing_relay_url().expect("Syncing relay should be started"); | ||
| 400 | let raw_2 = fetch_metrics(syncing_url) | ||
| 401 | .await | ||
| 402 | .expect("Failed to fetch metrics"); | ||
| 403 | |||
| 404 | // Print all sync-related metrics | ||
| 405 | println!("\n=== RAW METRICS (t2) ==="); | ||
| 406 | for line in raw_2.lines() { | ||
| 407 | if line.contains("sync") || line.contains("connection") { | ||
| 408 | println!("{}", line); | ||
| 409 | } | ||
| 410 | } | ||
| 411 | println!("========================\n"); | ||
| 412 | |||
| 382 | let metrics_2 = harness.get_metrics().await.unwrap(); | 413 | let metrics_2 = harness.get_metrics().await.unwrap(); |
| 383 | 414 | ||
| 384 | // Failure counter should have increased | 415 | // Failure counter should have increased |
| @@ -496,7 +527,6 @@ async fn test_relay_connected_status() { | |||
| 496 | /// NOTE: This test may fail until sync metrics recording is fully wired up. | 527 | /// NOTE: This test may fail until sync metrics recording is fully wired up. |
| 497 | /// The test documents the expected behavior. | 528 | /// The test documents the expected behavior. |
| 498 | #[tokio::test] | 529 | #[tokio::test] |
| 499 | #[ignore] // Ignored until sync metrics are fully wired up | ||
| 500 | async fn test_health_state_degrades_on_failure() { | 530 | async fn test_health_state_degrades_on_failure() { |
| 501 | use crate::common::sync_helpers::MetricsTestHarness; | 531 | use crate::common::sync_helpers::MetricsTestHarness; |
| 502 | 532 | ||