diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 11:19:38 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 11:19:38 +0000 |
| commit | c82684092c7b4f81e49833b0888500fcb9851218 (patch) | |
| tree | 6853940a5bb975aa17e1f91be988e3b3303ff39f /src/sync | |
| parent | 532a7d0d5d8461bad0fc799aacb5eea0135f79f3 (diff) | |
fix(sync): improve metrics recording and connection failure detection
Changes:
- Fix connection attempt metrics: record success/failure based on actual
connection result instead of pre-emptively recording failure
- Add health tracker integration on connection failure: call
record_failure() and record_health_state() in error path
- Add connection verification in relay_connection.rs: wait 500ms after
connect() then verify is_connected() to detect silent failures
- Add configurable disconnect check interval via
NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS env var
- Update TestRelay with fast test settings: startup_delay=0, jitter=0,
disconnect_check_interval=1s
- Add debug output to metrics tests for investigation
Note: Tests may still fail due to 5-second base backoff in health tracker.
A follow-up task will add NGIT_SYNC_BASE_BACKOFF_SECS config parameter
to allow faster test cycles.
Related: metrics-wiring-plan.md Tasks 1 & 2
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 76 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 18 |
2 files changed, 66 insertions, 28 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 5039c04..16ad833 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -264,23 +264,30 @@ async fn run_daily_timer( | |||
| 264 | // Disconnect Checker (Phase 8) | 264 | // Disconnect Checker (Phase 8) |
| 265 | // ============================================================================= | 265 | // ============================================================================= |
| 266 | 266 | ||
| 267 | /// Check interval for empty relay cleanup in seconds | ||
| 268 | const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60; | ||
| 269 | |||
| 270 | /// Run the disconnect checker for periodic cleanup of empty relays | 267 | /// Run the disconnect checker for periodic cleanup of empty relays |
| 271 | /// | 268 | /// |
| 272 | /// This function runs in a loop, checking every 60 seconds for relays | 269 | /// This function runs in a loop, checking at the configured interval for relays |
| 273 | /// that have no repos or root events to sync. Non-bootstrap relays | 270 | /// that have no repos or root events to sync. Non-bootstrap relays |
| 274 | /// that are empty will be disconnected to free up resources. | 271 | /// that are empty will be disconnected to free up resources. |
| 275 | /// | 272 | /// |
| 276 | /// Bootstrap relays are never disconnected, even if empty. | 273 | /// Bootstrap relays are never disconnected, even if empty. |
| 274 | /// | ||
| 275 | /// The check interval is configurable via `NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS` | ||
| 276 | /// (default: 60 seconds). Set to a lower value for faster reconnection testing. | ||
| 277 | async fn run_disconnect_checker( | 277 | async fn run_disconnect_checker( |
| 278 | sync_manager: Arc<Mutex<SyncManager>>, | 278 | sync_manager: Arc<Mutex<SyncManager>>, |
| 279 | mut shutdown_rx: broadcast::Receiver<()>, | 279 | mut shutdown_rx: broadcast::Receiver<()>, |
| 280 | check_interval_secs: u64, | ||
| 280 | ) { | 281 | ) { |
| 282 | let interval = Duration::from_secs(check_interval_secs); | ||
| 283 | tracing::info!( | ||
| 284 | interval_secs = check_interval_secs, | ||
| 285 | "Disconnect checker started with configured interval" | ||
| 286 | ); | ||
| 287 | |||
| 281 | loop { | 288 | loop { |
| 282 | tokio::select! { | 289 | tokio::select! { |
| 283 | _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => { | 290 | _ = tokio::time::sleep(interval) => { |
| 284 | tracing::debug!("Disconnect checker running"); | 291 | tracing::debug!("Disconnect checker running"); |
| 285 | 292 | ||
| 286 | let mut manager = sync_manager.lock().await; | 293 | let mut manager = sync_manager.lock().await; |
| @@ -609,21 +616,24 @@ impl SyncManager { | |||
| 609 | self.spawn_relay_connection(bootstrap_url.clone()).await; | 616 | self.spawn_relay_connection(bootstrap_url.clone()).await; |
| 610 | } | 617 | } |
| 611 | 618 | ||
| 612 | // 7. Wrap self in Arc<Mutex> for sharing with timer task | 619 | // 7. Capture config values before moving self into Arc |
| 620 | let disconnect_check_interval_secs = self.config.sync_disconnect_check_interval_secs; | ||
| 621 | |||
| 622 | // 8. Wrap self in Arc<Mutex> for sharing with timer task | ||
| 613 | let sync_manager = Arc::new(Mutex::new(self)); | 623 | let sync_manager = Arc::new(Mutex::new(self)); |
| 614 | 624 | ||
| 615 | // 8. Spawn daily timer task with shutdown receiver | 625 | // 9. Spawn daily timer task with shutdown receiver |
| 616 | let timer_manager = Arc::clone(&sync_manager); | 626 | let timer_manager = Arc::clone(&sync_manager); |
| 617 | let timer_shutdown = shutdown_tx.subscribe(); | 627 | let timer_shutdown = shutdown_tx.subscribe(); |
| 618 | tokio::spawn(async move { | 628 | tokio::spawn(async move { |
| 619 | run_daily_timer(timer_manager, timer_shutdown).await; | 629 | run_daily_timer(timer_manager, timer_shutdown).await; |
| 620 | }); | 630 | }); |
| 621 | 631 | ||
| 622 | // 9. Spawn disconnect checker task with shutdown receiver | 632 | // 10. Spawn disconnect checker task with shutdown receiver |
| 623 | let checker_manager = Arc::clone(&sync_manager); | 633 | let checker_manager = Arc::clone(&sync_manager); |
| 624 | let checker_shutdown = shutdown_tx.subscribe(); | 634 | let checker_shutdown = shutdown_tx.subscribe(); |
| 625 | tokio::spawn(async move { | 635 | tokio::spawn(async move { |
| 626 | run_disconnect_checker(checker_manager, checker_shutdown).await; | 636 | run_disconnect_checker(checker_manager, checker_shutdown, disconnect_check_interval_secs).await; |
| 627 | }); | 637 | }); |
| 628 | 638 | ||
| 629 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 639 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| @@ -1174,27 +1184,39 @@ impl SyncManager { | |||
| 1174 | // Create relay connection | 1184 | // Create relay connection |
| 1175 | let connection = RelayConnection::new(relay_url.clone()); | 1185 | let connection = RelayConnection::new(relay_url.clone()); |
| 1176 | 1186 | ||
| 1177 | // Record connection attempt | ||
| 1178 | if let Some(ref metrics) = self.metrics { | ||
| 1179 | metrics.record_connection_attempt(&relay_url, false); | ||
| 1180 | } | ||
| 1181 | |||
| 1182 | // Connect and subscribe to Layer 1 | 1187 | // Connect and subscribe to Layer 1 |
| 1183 | if let Err(e) = connection.connect_and_subscribe(None).await { | 1188 | match connection.connect_and_subscribe(None).await { |
| 1184 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | 1189 | Ok(_) => { |
| 1185 | // Update state to disconnected on failure | 1190 | // Record successful connection attempt |
| 1186 | { | 1191 | if let Some(ref metrics) = self.metrics { |
| 1187 | let mut index = relay_sync_index.write().await; | 1192 | metrics.record_connection_attempt(&relay_url, true); |
| 1188 | if let Some(state) = index.get_mut(&relay_url) { | ||
| 1189 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 1190 | } | 1193 | } |
| 1191 | } | 1194 | } |
| 1192 | return; | 1195 | Err(e) => { |
| 1193 | } | 1196 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); |
| 1194 | 1197 | ||
| 1195 | // If successful, update connection attempt metric to success | 1198 | // Record failed connection attempt |
| 1196 | if let Some(ref metrics) = self.metrics { | 1199 | if let Some(ref metrics) = self.metrics { |
| 1197 | metrics.record_connection_attempt(&relay_url, true); | 1200 | metrics.record_connection_attempt(&relay_url, false); |
| 1201 | } | ||
| 1202 | |||
| 1203 | // Record failure in health tracker | ||
| 1204 | self.health_tracker.record_failure(&relay_url); | ||
| 1205 | |||
| 1206 | // Record health state in metrics | ||
| 1207 | if let Some(ref metrics) = self.metrics { | ||
| 1208 | metrics.record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); | ||
| 1209 | } | ||
| 1210 | |||
| 1211 | // Update state to disconnected on failure | ||
| 1212 | { | ||
| 1213 | let mut index = relay_sync_index.write().await; | ||
| 1214 | if let Some(state) = index.get_mut(&relay_url) { | ||
| 1215 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 1216 | } | ||
| 1217 | } | ||
| 1218 | return; | ||
| 1219 | } | ||
| 1198 | } | 1220 | } |
| 1199 | 1221 | ||
| 1200 | // Mark as connected in relay sync index | 1222 | // Mark as connected in relay sync index |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 09c9887..d69e112 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -55,7 +55,8 @@ impl RelayConnection { | |||
| 55 | /// This method: | 55 | /// This method: |
| 56 | /// 1. Adds the relay to the client | 56 | /// 1. Adds the relay to the client |
| 57 | /// 2. Establishes the WebSocket connection | 57 | /// 2. Establishes the WebSocket connection |
| 58 | /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618) | 58 | /// 3. Verifies connection was established |
| 59 | /// 4. Subscribes to Layer 1 filter (kinds 30617 + 30618) | ||
| 59 | /// | 60 | /// |
| 60 | /// # Arguments | 61 | /// # Arguments |
| 61 | /// * `since` - Optional timestamp for incremental sync on reconnect | 62 | /// * `since` - Optional timestamp for incremental sync on reconnect |
| @@ -76,6 +77,21 @@ impl RelayConnection { | |||
| 76 | // Establish connection | 77 | // Establish connection |
| 77 | self.client.connect().await; | 78 | self.client.connect().await; |
| 78 | 79 | ||
| 80 | // Wait briefly for connection to establish and check status | ||
| 81 | // nostr-sdk's connect() is async and may not immediately reflect failure | ||
| 82 | tokio::time::sleep(std::time::Duration::from_millis(500)).await; | ||
| 83 | |||
| 84 | // Check if relay is actually connected | ||
| 85 | let relay = self.client.relay(&self.url).await | ||
| 86 | .map_err(|e| format!("Failed to get relay handle for {}: {}", self.url, e))?; | ||
| 87 | |||
| 88 | if !relay.is_connected() { | ||
| 89 | return Err(format!( | ||
| 90 | "Failed to connect to relay {}: connection not established after timeout", | ||
| 91 | self.url | ||
| 92 | )); | ||
| 93 | } | ||
| 94 | |||
| 79 | // Subscribe to Layer 1 (announcements) | 95 | // Subscribe to Layer 1 (announcements) |
| 80 | let filter = build_announcement_filter(since); | 96 | let filter = build_announcement_filter(since); |
| 81 | let output = self | 97 | let output = self |