diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 13:28:11 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 13:28:11 +0000 |
| commit | c34492069abacae67482af4c8356241958a524f7 (patch) | |
| tree | fd9b8ca3c26a96742bad4e9e359a20fc37c998aa /src/sync/mod.rs | |
| parent | eb10e85f199266affd3bca0a3d4cd934f74f3e7f (diff) | |
feat(sync): add Syncing connection status to track historic sync progress
- Add ConnectionStatus::Syncing state between Connecting and Connected
- Track historic_sync_completed and historic_sync_completed_at in RelayState
- Auto-detect sync completion via check_and_complete_historic_sync()
- Update metrics: ngit_sync_relay_connected now shows 0-3 (disconnected/connecting/syncing/connected)
- Update Prometheus metric documentation with new status values
- Add state machine diagram showing Syncing transition
- Operators can now distinguish 'connected but catching up' vs 'fully synced'
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 90 |
1 files changed, 78 insertions, 12 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 00668ac..e5b724d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -90,7 +90,9 @@ pub enum ConnectionStatus { | |||
| 90 | Disconnected, | 90 | Disconnected, |
| 91 | /// Connection attempt in progress | 91 | /// Connection attempt in progress |
| 92 | Connecting, | 92 | Connecting, |
| 93 | /// Successfully connected and subscribed | 93 | /// Successfully connected, historic sync in progress |
| 94 | Syncing, | ||
| 95 | /// Successfully connected, historic sync completed | ||
| 94 | Connected, | 96 | Connected, |
| 95 | } | 97 | } |
| 96 | 98 | ||
| @@ -112,6 +114,11 @@ pub struct RelayState { | |||
| 112 | /// Whether announcement filter historic sync has completed for this relay | 114 | /// Whether announcement filter historic sync has completed for this relay |
| 113 | /// Used to determine if we can use `since` filter on reconnect for Layer 1 | 115 | /// Used to determine if we can use `since` filter on reconnect for Layer 1 |
| 114 | pub announcements_synced: bool, | 116 | pub announcements_synced: bool, |
| 117 | /// Whether initial historic sync has fully completed (all layers) | ||
| 118 | /// Used to transition from Syncing -> Connected status | ||
| 119 | pub historic_sync_completed: bool, | ||
| 120 | /// When historic sync completed (None if never completed or cleared on fresh_start) | ||
| 121 | pub historic_sync_completed_at: Option<Timestamp>, | ||
| 115 | } | 122 | } |
| 116 | 123 | ||
| 117 | impl Default for RelayState { | 124 | impl Default for RelayState { |
| @@ -124,6 +131,8 @@ impl Default for RelayState { | |||
| 124 | last_connected: None, | 131 | last_connected: None, |
| 125 | disconnected_at: None, | 132 | disconnected_at: None, |
| 126 | announcements_synced: false, | 133 | announcements_synced: false, |
| 134 | historic_sync_completed: false, | ||
| 135 | historic_sync_completed_at: None, | ||
| 127 | } | 136 | } |
| 128 | } | 137 | } |
| 129 | } | 138 | } |
| @@ -145,6 +154,8 @@ impl RelayState { | |||
| 145 | self.repos.clear(); | 154 | self.repos.clear(); |
| 146 | self.root_events.clear(); | 155 | self.root_events.clear(); |
| 147 | self.announcements_synced = false; | 156 | self.announcements_synced = false; |
| 157 | self.historic_sync_completed = false; | ||
| 158 | self.historic_sync_completed_at = None; | ||
| 148 | } | 159 | } |
| 149 | } | 160 | } |
| 150 | 161 | ||
| @@ -860,6 +871,55 @@ impl SyncManager { | |||
| 860 | "Batch completed but no RelayState found for relay" | 871 | "Batch completed but no RelayState found for relay" |
| 861 | ); | 872 | ); |
| 862 | } | 873 | } |
| 874 | |||
| 875 | // Release lock before checking if historic sync is complete | ||
| 876 | drop(relay_index); | ||
| 877 | |||
| 878 | // Check if all historic sync is complete (no more pending batches) | ||
| 879 | self.check_and_complete_historic_sync(relay_url).await; | ||
| 880 | } | ||
| 881 | |||
| 882 | /// Check if historic sync is complete and transition to Connected status | ||
| 883 | /// | ||
| 884 | /// This method checks if there are any pending batches for the relay. | ||
| 885 | /// If no pending batches exist and the relay is in Syncing status, | ||
| 886 | /// it transitions to Connected and updates metrics. | ||
| 887 | /// | ||
| 888 | /// Called after each batch is confirmed to detect completion. | ||
| 889 | async fn check_and_complete_historic_sync(&self, relay_url: &str) { | ||
| 890 | // Check if there are any pending batches | ||
| 891 | let has_pending = { | ||
| 892 | let pending = self.pending_sync_index.read().await; | ||
| 893 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) | ||
| 894 | }; | ||
| 895 | |||
| 896 | if has_pending { | ||
| 897 | // Still syncing, don't transition yet | ||
| 898 | return; | ||
| 899 | } | ||
| 900 | |||
| 901 | // No pending batches - check if we should transition to Connected | ||
| 902 | let mut relay_index = self.relay_sync_index.write().await; | ||
| 903 | if let Some(state) = relay_index.get_mut(relay_url) { | ||
| 904 | if state.connection_status == ConnectionStatus::Syncing { | ||
| 905 | // Transition to Connected | ||
| 906 | state.connection_status = ConnectionStatus::Connected; | ||
| 907 | state.historic_sync_completed = true; | ||
| 908 | state.historic_sync_completed_at = Some(Timestamp::now()); | ||
| 909 | |||
| 910 | tracing::info!( | ||
| 911 | relay = %relay_url, | ||
| 912 | repos_synced = state.repos.len(), | ||
| 913 | root_events_synced = state.root_events.len(), | ||
| 914 | "Historic sync complete - transitioned to Connected status" | ||
| 915 | ); | ||
| 916 | |||
| 917 | // Update metrics | ||
| 918 | if let Some(ref metrics) = self.metrics { | ||
| 919 | metrics.record_connection_status(relay_url, ConnectionStatus::Connected); | ||
| 920 | } | ||
| 921 | } | ||
| 922 | } | ||
| 863 | } | 923 | } |
| 864 | 924 | ||
| 865 | /// Perform a daily sync for a specific relay | 925 | /// Perform a daily sync for a specific relay |
| @@ -1087,8 +1147,8 @@ impl SyncManager { | |||
| 1087 | ); | 1147 | ); |
| 1088 | return; | 1148 | return; |
| 1089 | } | 1149 | } |
| 1090 | Some(ConnectionStatus::Connected) => { | 1150 | Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { |
| 1091 | // Continue to subscribe | 1151 | // Continue to subscribe - both Syncing and Connected can accept new filters |
| 1092 | } | 1152 | } |
| 1093 | } | 1153 | } |
| 1094 | 1154 | ||
| @@ -1137,18 +1197,18 @@ impl SyncManager { | |||
| 1137 | index.get(relay_url).and_then(|s| s.last_connected) | 1197 | index.get(relay_url).and_then(|s| s.last_connected) |
| 1138 | }; | 1198 | }; |
| 1139 | 1199 | ||
| 1140 | // 2. Update state to Connected | 1200 | // 2. Update state to Syncing (will transition to Connected after historic sync completes) |
| 1141 | { | 1201 | { |
| 1142 | let mut index = self.relay_sync_index.write().await; | 1202 | let mut index = self.relay_sync_index.write().await; |
| 1143 | let state = index.entry(relay_url.to_string()).or_default(); | 1203 | let state = index.entry(relay_url.to_string()).or_default(); |
| 1144 | state.connection_status = ConnectionStatus::Connected; | 1204 | state.connection_status = ConnectionStatus::Syncing; |
| 1145 | state.last_connected = Some(Timestamp::now()); | 1205 | state.last_connected = Some(Timestamp::now()); |
| 1146 | state.disconnected_at = None; | 1206 | state.disconnected_at = None; |
| 1147 | } | 1207 | } |
| 1148 | 1208 | ||
| 1149 | // Update metrics | 1209 | // Update metrics - record as syncing initially |
| 1150 | if let Some(ref metrics) = self.metrics { | 1210 | if let Some(ref metrics) = self.metrics { |
| 1151 | metrics.set_relay_connected(relay_url, true); | 1211 | metrics.record_connection_status(relay_url, ConnectionStatus::Syncing); |
| 1152 | metrics.inc_connected_count(); | 1212 | metrics.inc_connected_count(); |
| 1153 | } | 1213 | } |
| 1154 | 1214 | ||
| @@ -1454,7 +1514,8 @@ impl SyncManager { | |||
| 1454 | "Cleared sync state in fresh_start" | 1514 | "Cleared sync state in fresh_start" |
| 1455 | ); | 1515 | ); |
| 1456 | } | 1516 | } |
| 1457 | if state.connection_status == ConnectionStatus::Connected { | 1517 | // Only sync if we're connected (either Syncing or fully Connected) |
| 1518 | if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { | ||
| 1458 | drop(index); | 1519 | drop(index); |
| 1459 | self.sync_generic_filters(relay_url, None).await; | 1520 | self.sync_generic_filters(relay_url, None).await; |
| 1460 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) | 1521 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) |
| @@ -1632,13 +1693,18 @@ impl SyncManager { | |||
| 1632 | /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. | 1693 | /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. |
| 1633 | /// On failure, updates state and health tracker. | 1694 | /// On failure, updates state and health tracker. |
| 1634 | async fn try_connect_relay(&mut self, relay_url: &str) { | 1695 | async fn try_connect_relay(&mut self, relay_url: &str) { |
| 1635 | // 1. Mark attempting (optional, helpful for debugging) | 1696 | // 1. Mark attempting and update metrics |
| 1636 | { | 1697 | { |
| 1637 | let mut index = self.relay_sync_index.write().await; | 1698 | let mut index = self.relay_sync_index.write().await; |
| 1638 | if let Some(state) = index.get_mut(relay_url) { | 1699 | if let Some(state) = index.get_mut(relay_url) { |
| 1639 | state.connection_status = ConnectionStatus::Connecting; | 1700 | state.connection_status = ConnectionStatus::Connecting; |
| 1640 | } | 1701 | } |
| 1641 | } | 1702 | } |
| 1703 | |||
| 1704 | // Update metrics to show connecting status | ||
| 1705 | if let Some(ref metrics) = self.metrics { | ||
| 1706 | metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); | ||
| 1707 | } | ||
| 1642 | 1708 | ||
| 1643 | // 2. Record attempt in health tracker | 1709 | // 2. Record attempt in health tracker |
| 1644 | self.health_tracker.record_attempt(relay_url); | 1710 | self.health_tracker.record_attempt(relay_url); |
| @@ -1688,8 +1754,8 @@ impl SyncManager { | |||
| 1688 | // 6. Update metrics | 1754 | // 6. Update metrics |
| 1689 | if let Some(ref metrics) = self.metrics { | 1755 | if let Some(ref metrics) = self.metrics { |
| 1690 | metrics.record_connection_attempt(relay_url, false); | 1756 | metrics.record_connection_attempt(relay_url, false); |
| 1691 | metrics | 1757 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 1692 | .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1758 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); |
| 1693 | } | 1759 | } |
| 1694 | } | 1760 | } |
| 1695 | } | 1761 | } |
| @@ -1806,7 +1872,7 @@ impl SyncManager { | |||
| 1806 | 1872 | ||
| 1807 | // Update metrics | 1873 | // Update metrics |
| 1808 | if let Some(ref metrics) = self.metrics { | 1874 | if let Some(ref metrics) = self.metrics { |
| 1809 | metrics.set_relay_connected(relay_url, false); | 1875 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 1810 | metrics.dec_connected_count(); | 1876 | metrics.dec_connected_count(); |
| 1811 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1877 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); |
| 1812 | } | 1878 | } |