From c34492069abacae67482af4c8356241958a524f7 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 13:28:11 +0000 Subject: feat(sync): add Syncing connection status to track historic sync progress - Add ConnectionStatus::Syncing state between Connecting and Connected - Track historic_sync_completed and historic_sync_completed_at in RelayState - Auto-detect sync completion via check_and_complete_historic_sync() - Update metrics: ngit_sync_relay_connected now shows 0-3 (disconnected/connecting/syncing/connected) - Update Prometheus metric documentation with new status values - Add state machine diagram showing Syncing transition - Operators can now distinguish 'connected but catching up' vs 'fully synced' --- src/sync/mod.rs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 12 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 00668ac..e5b724d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -90,7 +90,9 @@ pub enum ConnectionStatus { Disconnected, /// Connection attempt in progress Connecting, - /// Successfully connected and subscribed + /// Successfully connected, historic sync in progress + Syncing, + /// Successfully connected, historic sync completed Connected, } @@ -112,6 +114,11 @@ pub struct RelayState { /// Whether announcement filter historic sync has completed for this relay /// Used to determine if we can use `since` filter on reconnect for Layer 1 pub announcements_synced: bool, + /// Whether initial historic sync has fully completed (all layers) + /// Used to transition from Syncing -> Connected status + pub historic_sync_completed: bool, + /// When historic sync completed (None if never completed or cleared on fresh_start) + pub historic_sync_completed_at: Option, } impl Default for RelayState { @@ -124,6 +131,8 @@ impl Default for RelayState { last_connected: None, disconnected_at: None, announcements_synced: false, + historic_sync_completed: false, + historic_sync_completed_at: None, } } } @@ -145,6 +154,8 @@ impl RelayState { self.repos.clear(); self.root_events.clear(); self.announcements_synced = false; + self.historic_sync_completed = false; + self.historic_sync_completed_at = None; } } @@ -860,6 +871,55 @@ impl SyncManager { "Batch completed but no RelayState found for relay" ); } + + // Release lock before checking if historic sync is complete + drop(relay_index); + + // Check if all historic sync is complete (no more pending batches) + self.check_and_complete_historic_sync(relay_url).await; + } + + /// Check if historic sync is complete and transition to Connected status + /// + /// This method checks if there are any pending batches for the relay. + /// If no pending batches exist and the relay is in Syncing status, + /// it transitions to Connected and updates metrics. + /// + /// Called after each batch is confirmed to detect completion. + async fn check_and_complete_historic_sync(&self, relay_url: &str) { + // Check if there are any pending batches + let has_pending = { + let pending = self.pending_sync_index.read().await; + pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) + }; + + if has_pending { + // Still syncing, don't transition yet + return; + } + + // No pending batches - check if we should transition to Connected + let mut relay_index = self.relay_sync_index.write().await; + if let Some(state) = relay_index.get_mut(relay_url) { + if state.connection_status == ConnectionStatus::Syncing { + // Transition to Connected + state.connection_status = ConnectionStatus::Connected; + state.historic_sync_completed = true; + state.historic_sync_completed_at = Some(Timestamp::now()); + + tracing::info!( + relay = %relay_url, + repos_synced = state.repos.len(), + root_events_synced = state.root_events.len(), + "Historic sync complete - transitioned to Connected status" + ); + + // Update metrics + if let Some(ref metrics) = self.metrics { + metrics.record_connection_status(relay_url, ConnectionStatus::Connected); + } + } + } } /// Perform a daily sync for a specific relay @@ -1087,8 +1147,8 @@ impl SyncManager { ); return; } - Some(ConnectionStatus::Connected) => { - // Continue to subscribe + Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { + // Continue to subscribe - both Syncing and Connected can accept new filters } } @@ -1137,18 +1197,18 @@ impl SyncManager { index.get(relay_url).and_then(|s| s.last_connected) }; - // 2. Update state to Connected + // 2. Update state to Syncing (will transition to Connected after historic sync completes) { let mut index = self.relay_sync_index.write().await; let state = index.entry(relay_url.to_string()).or_default(); - state.connection_status = ConnectionStatus::Connected; + state.connection_status = ConnectionStatus::Syncing; state.last_connected = Some(Timestamp::now()); state.disconnected_at = None; } - // Update metrics + // Update metrics - record as syncing initially if let Some(ref metrics) = self.metrics { - metrics.set_relay_connected(relay_url, true); + metrics.record_connection_status(relay_url, ConnectionStatus::Syncing); metrics.inc_connected_count(); } @@ -1454,7 +1514,8 @@ impl SyncManager { "Cleared sync state in fresh_start" ); } - if state.connection_status == ConnectionStatus::Connected { + // Only sync if we're connected (either Syncing or fully Connected) + if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { drop(index); self.sync_generic_filters(relay_url, None).await; // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) @@ -1632,13 +1693,18 @@ impl SyncManager { /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. /// On failure, updates state and health tracker. async fn try_connect_relay(&mut self, relay_url: &str) { - // 1. Mark attempting (optional, helpful for debugging) + // 1. Mark attempting and update metrics { let mut index = self.relay_sync_index.write().await; if let Some(state) = index.get_mut(relay_url) { state.connection_status = ConnectionStatus::Connecting; } } + + // Update metrics to show connecting status + if let Some(ref metrics) = self.metrics { + metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); + } // 2. Record attempt in health tracker self.health_tracker.record_attempt(relay_url); @@ -1688,8 +1754,8 @@ impl SyncManager { // 6. Update metrics if let Some(ref metrics) = self.metrics { metrics.record_connection_attempt(relay_url, false); - metrics - .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); + metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); } } } @@ -1806,7 +1872,7 @@ impl SyncManager { // Update metrics if let Some(ref metrics) = self.metrics { - metrics.set_relay_connected(relay_url, false); + metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); metrics.dec_connected_count(); metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); } -- cgit v1.2.3