From c34492069abacae67482af4c8356241958a524f7 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 13:28:11 +0000 Subject: feat(sync): add Syncing connection status to track historic sync progress - Add ConnectionStatus::Syncing state between Connecting and Connected - Track historic_sync_completed and historic_sync_completed_at in RelayState - Auto-detect sync completion via check_and_complete_historic_sync() - Update metrics: ngit_sync_relay_connected now shows 0-3 (disconnected/connecting/syncing/connected) - Update Prometheus metric documentation with new status values - Add state machine diagram showing Syncing transition - Operators can now distinguish 'connected but catching up' vs 'fully synced' --- src/sync/metrics.rs | 29 ++++++++++++++++- src/sync/mod.rs | 90 ++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 106 insertions(+), 13 deletions(-) (limited to 'src/sync') diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 453a79c..db7dd20 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs @@ -53,7 +53,7 @@ impl SyncMetrics { let relay_connected = IntGaugeVec::new( Opts::new( "ngit_sync_relay_connected", - "Relay connection status (1=connected, 0=disconnected)", + "Relay connection status (0=disconnected, 1=connecting, 2=syncing, 3=connected)", ), &["relay"], )?; @@ -201,6 +201,33 @@ impl SyncMetrics { .set(state_value); } + /// Record relay connection status change. + /// + /// Maps connection status to numeric values for Prometheus: + /// - Disconnected = 0 (not connected) + /// - Connecting = 1 (connection attempt in progress) + /// - Syncing = 2 (connected, historic sync in progress) + /// - Connected = 3 (connected, historic sync complete) + /// + /// This is separate from health state and provides more granular connection lifecycle tracking. + /// + /// # Arguments + /// + /// * `relay` - The relay URL + /// * `status` - The current connection status + pub fn record_connection_status(&self, relay: &str, status: super::ConnectionStatus) { + use super::ConnectionStatus; + let status_value = match status { + ConnectionStatus::Disconnected => 0, + ConnectionStatus::Connecting => 1, + ConnectionStatus::Syncing => 2, + ConnectionStatus::Connected => 3, + }; + self.relay_connected + .with_label_values(&[relay]) + .set(status_value); + } + /// Record relay failure count. /// /// # Arguments diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 00668ac..e5b724d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -90,7 +90,9 @@ pub enum ConnectionStatus { Disconnected, /// Connection attempt in progress Connecting, - /// Successfully connected and subscribed + /// Successfully connected, historic sync in progress + Syncing, + /// Successfully connected, historic sync completed Connected, } @@ -112,6 +114,11 @@ pub struct RelayState { /// Whether announcement filter historic sync has completed for this relay /// Used to determine if we can use `since` filter on reconnect for Layer 1 pub announcements_synced: bool, + /// Whether initial historic sync has fully completed (all layers) + /// Used to transition from Syncing -> Connected status + pub historic_sync_completed: bool, + /// When historic sync completed (None if never completed or cleared on fresh_start) + pub historic_sync_completed_at: Option, } impl Default for RelayState { @@ -124,6 +131,8 @@ impl Default for RelayState { last_connected: None, disconnected_at: None, announcements_synced: false, + historic_sync_completed: false, + historic_sync_completed_at: None, } } } @@ -145,6 +154,8 @@ impl RelayState { self.repos.clear(); self.root_events.clear(); self.announcements_synced = false; + self.historic_sync_completed = false; + self.historic_sync_completed_at = None; } } @@ -860,6 +871,55 @@ impl SyncManager { "Batch completed but no RelayState found for relay" ); } + + // Release lock before checking if historic sync is complete + drop(relay_index); + + // Check if all historic sync is complete (no more pending batches) + self.check_and_complete_historic_sync(relay_url).await; + } + + /// Check if historic sync is complete and transition to Connected status + /// + /// This method checks if there are any pending batches for the relay. + /// If no pending batches exist and the relay is in Syncing status, + /// it transitions to Connected and updates metrics. + /// + /// Called after each batch is confirmed to detect completion. + async fn check_and_complete_historic_sync(&self, relay_url: &str) { + // Check if there are any pending batches + let has_pending = { + let pending = self.pending_sync_index.read().await; + pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) + }; + + if has_pending { + // Still syncing, don't transition yet + return; + } + + // No pending batches - check if we should transition to Connected + let mut relay_index = self.relay_sync_index.write().await; + if let Some(state) = relay_index.get_mut(relay_url) { + if state.connection_status == ConnectionStatus::Syncing { + // Transition to Connected + state.connection_status = ConnectionStatus::Connected; + state.historic_sync_completed = true; + state.historic_sync_completed_at = Some(Timestamp::now()); + + tracing::info!( + relay = %relay_url, + repos_synced = state.repos.len(), + root_events_synced = state.root_events.len(), + "Historic sync complete - transitioned to Connected status" + ); + + // Update metrics + if let Some(ref metrics) = self.metrics { + metrics.record_connection_status(relay_url, ConnectionStatus::Connected); + } + } + } } /// Perform a daily sync for a specific relay @@ -1087,8 +1147,8 @@ impl SyncManager { ); return; } - Some(ConnectionStatus::Connected) => { - // Continue to subscribe + Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { + // Continue to subscribe - both Syncing and Connected can accept new filters } } @@ -1137,18 +1197,18 @@ impl SyncManager { index.get(relay_url).and_then(|s| s.last_connected) }; - // 2. Update state to Connected + // 2. Update state to Syncing (will transition to Connected after historic sync completes) { let mut index = self.relay_sync_index.write().await; let state = index.entry(relay_url.to_string()).or_default(); - state.connection_status = ConnectionStatus::Connected; + state.connection_status = ConnectionStatus::Syncing; state.last_connected = Some(Timestamp::now()); state.disconnected_at = None; } - // Update metrics + // Update metrics - record as syncing initially if let Some(ref metrics) = self.metrics { - metrics.set_relay_connected(relay_url, true); + metrics.record_connection_status(relay_url, ConnectionStatus::Syncing); metrics.inc_connected_count(); } @@ -1454,7 +1514,8 @@ impl SyncManager { "Cleared sync state in fresh_start" ); } - if state.connection_status == ConnectionStatus::Connected { + // Only sync if we're connected (either Syncing or fully Connected) + if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { drop(index); self.sync_generic_filters(relay_url, None).await; // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) @@ -1632,13 +1693,18 @@ impl SyncManager { /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. /// On failure, updates state and health tracker. async fn try_connect_relay(&mut self, relay_url: &str) { - // 1. Mark attempting (optional, helpful for debugging) + // 1. Mark attempting and update metrics { let mut index = self.relay_sync_index.write().await; if let Some(state) = index.get_mut(relay_url) { state.connection_status = ConnectionStatus::Connecting; } } + + // Update metrics to show connecting status + if let Some(ref metrics) = self.metrics { + metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); + } // 2. Record attempt in health tracker self.health_tracker.record_attempt(relay_url); @@ -1688,8 +1754,8 @@ impl SyncManager { // 6. Update metrics if let Some(ref metrics) = self.metrics { metrics.record_connection_attempt(relay_url, false); - metrics - .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); + metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); } } } @@ -1806,7 +1872,7 @@ impl SyncManager { // Update metrics if let Some(ref metrics) = self.metrics { - metrics.set_relay_connected(relay_url, false); + metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); metrics.dec_connected_count(); metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); } -- cgit v1.2.3