upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 13:28:11 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 13:28:11 +0000
commitc34492069abacae67482af4c8356241958a524f7 (patch)
treefd9b8ca3c26a96742bad4e9e359a20fc37c998aa /src/sync/mod.rs
parenteb10e85f199266affd3bca0a3d4cd934f74f3e7f (diff)
feat(sync): add Syncing connection status to track historic sync progress
- Add ConnectionStatus::Syncing state between Connecting and Connected - Track historic_sync_completed and historic_sync_completed_at in RelayState - Auto-detect sync completion via check_and_complete_historic_sync() - Update metrics: ngit_sync_relay_connected now shows 0-3 (disconnected/connecting/syncing/connected) - Update Prometheus metric documentation with new status values - Add state machine diagram showing Syncing transition - Operators can now distinguish 'connected but catching up' vs 'fully synced'
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs90
1 files changed, 78 insertions, 12 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 00668ac..e5b724d 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -90,7 +90,9 @@ pub enum ConnectionStatus {
90 Disconnected, 90 Disconnected,
91 /// Connection attempt in progress 91 /// Connection attempt in progress
92 Connecting, 92 Connecting,
93 /// Successfully connected and subscribed 93 /// Successfully connected, historic sync in progress
94 Syncing,
95 /// Successfully connected, historic sync completed
94 Connected, 96 Connected,
95} 97}
96 98
@@ -112,6 +114,11 @@ pub struct RelayState {
112 /// Whether announcement filter historic sync has completed for this relay 114 /// Whether announcement filter historic sync has completed for this relay
113 /// Used to determine if we can use `since` filter on reconnect for Layer 1 115 /// Used to determine if we can use `since` filter on reconnect for Layer 1
114 pub announcements_synced: bool, 116 pub announcements_synced: bool,
117 /// Whether initial historic sync has fully completed (all layers)
118 /// Used to transition from Syncing -> Connected status
119 pub historic_sync_completed: bool,
120 /// When historic sync completed (None if never completed or cleared on fresh_start)
121 pub historic_sync_completed_at: Option<Timestamp>,
115} 122}
116 123
117impl Default for RelayState { 124impl Default for RelayState {
@@ -124,6 +131,8 @@ impl Default for RelayState {
124 last_connected: None, 131 last_connected: None,
125 disconnected_at: None, 132 disconnected_at: None,
126 announcements_synced: false, 133 announcements_synced: false,
134 historic_sync_completed: false,
135 historic_sync_completed_at: None,
127 } 136 }
128 } 137 }
129} 138}
@@ -145,6 +154,8 @@ impl RelayState {
145 self.repos.clear(); 154 self.repos.clear();
146 self.root_events.clear(); 155 self.root_events.clear();
147 self.announcements_synced = false; 156 self.announcements_synced = false;
157 self.historic_sync_completed = false;
158 self.historic_sync_completed_at = None;
148 } 159 }
149} 160}
150 161
@@ -860,6 +871,55 @@ impl SyncManager {
860 "Batch completed but no RelayState found for relay" 871 "Batch completed but no RelayState found for relay"
861 ); 872 );
862 } 873 }
874
875 // Release lock before checking if historic sync is complete
876 drop(relay_index);
877
878 // Check if all historic sync is complete (no more pending batches)
879 self.check_and_complete_historic_sync(relay_url).await;
880 }
881
882 /// Check if historic sync is complete and transition to Connected status
883 ///
884 /// This method checks if there are any pending batches for the relay.
885 /// If no pending batches exist and the relay is in Syncing status,
886 /// it transitions to Connected and updates metrics.
887 ///
888 /// Called after each batch is confirmed to detect completion.
889 async fn check_and_complete_historic_sync(&self, relay_url: &str) {
890 // Check if there are any pending batches
891 let has_pending = {
892 let pending = self.pending_sync_index.read().await;
893 pending.get(relay_url).map_or(false, |batches| !batches.is_empty())
894 };
895
896 if has_pending {
897 // Still syncing, don't transition yet
898 return;
899 }
900
901 // No pending batches - check if we should transition to Connected
902 let mut relay_index = self.relay_sync_index.write().await;
903 if let Some(state) = relay_index.get_mut(relay_url) {
904 if state.connection_status == ConnectionStatus::Syncing {
905 // Transition to Connected
906 state.connection_status = ConnectionStatus::Connected;
907 state.historic_sync_completed = true;
908 state.historic_sync_completed_at = Some(Timestamp::now());
909
910 tracing::info!(
911 relay = %relay_url,
912 repos_synced = state.repos.len(),
913 root_events_synced = state.root_events.len(),
914 "Historic sync complete - transitioned to Connected status"
915 );
916
917 // Update metrics
918 if let Some(ref metrics) = self.metrics {
919 metrics.record_connection_status(relay_url, ConnectionStatus::Connected);
920 }
921 }
922 }
863 } 923 }
864 924
865 /// Perform a daily sync for a specific relay 925 /// Perform a daily sync for a specific relay
@@ -1087,8 +1147,8 @@ impl SyncManager {
1087 ); 1147 );
1088 return; 1148 return;
1089 } 1149 }
1090 Some(ConnectionStatus::Connected) => { 1150 Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => {
1091 // Continue to subscribe 1151 // Continue to subscribe - both Syncing and Connected can accept new filters
1092 } 1152 }
1093 } 1153 }
1094 1154
@@ -1137,18 +1197,18 @@ impl SyncManager {
1137 index.get(relay_url).and_then(|s| s.last_connected) 1197 index.get(relay_url).and_then(|s| s.last_connected)
1138 }; 1198 };
1139 1199
1140 // 2. Update state to Connected 1200 // 2. Update state to Syncing (will transition to Connected after historic sync completes)
1141 { 1201 {
1142 let mut index = self.relay_sync_index.write().await; 1202 let mut index = self.relay_sync_index.write().await;
1143 let state = index.entry(relay_url.to_string()).or_default(); 1203 let state = index.entry(relay_url.to_string()).or_default();
1144 state.connection_status = ConnectionStatus::Connected; 1204 state.connection_status = ConnectionStatus::Syncing;
1145 state.last_connected = Some(Timestamp::now()); 1205 state.last_connected = Some(Timestamp::now());
1146 state.disconnected_at = None; 1206 state.disconnected_at = None;
1147 } 1207 }
1148 1208
1149 // Update metrics 1209 // Update metrics - record as syncing initially
1150 if let Some(ref metrics) = self.metrics { 1210 if let Some(ref metrics) = self.metrics {
1151 metrics.set_relay_connected(relay_url, true); 1211 metrics.record_connection_status(relay_url, ConnectionStatus::Syncing);
1152 metrics.inc_connected_count(); 1212 metrics.inc_connected_count();
1153 } 1213 }
1154 1214
@@ -1454,7 +1514,8 @@ impl SyncManager {
1454 "Cleared sync state in fresh_start" 1514 "Cleared sync state in fresh_start"
1455 ); 1515 );
1456 } 1516 }
1457 if state.connection_status == ConnectionStatus::Connected { 1517 // Only sync if we're connected (either Syncing or fully Connected)
1518 if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) {
1458 drop(index); 1519 drop(index);
1459 self.sync_generic_filters(relay_url, None).await; 1520 self.sync_generic_filters(relay_url, None).await;
1460 // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) 1521 // Step 5: compute_actions for L2+L3 (will be triggered by EOSE)
@@ -1632,13 +1693,18 @@ impl SyncManager {
1632 /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. 1693 /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect.
1633 /// On failure, updates state and health tracker. 1694 /// On failure, updates state and health tracker.
1634 async fn try_connect_relay(&mut self, relay_url: &str) { 1695 async fn try_connect_relay(&mut self, relay_url: &str) {
1635 // 1. Mark attempting (optional, helpful for debugging) 1696 // 1. Mark attempting and update metrics
1636 { 1697 {
1637 let mut index = self.relay_sync_index.write().await; 1698 let mut index = self.relay_sync_index.write().await;
1638 if let Some(state) = index.get_mut(relay_url) { 1699 if let Some(state) = index.get_mut(relay_url) {
1639 state.connection_status = ConnectionStatus::Connecting; 1700 state.connection_status = ConnectionStatus::Connecting;
1640 } 1701 }
1641 } 1702 }
1703
1704 // Update metrics to show connecting status
1705 if let Some(ref metrics) = self.metrics {
1706 metrics.record_connection_status(relay_url, ConnectionStatus::Connecting);
1707 }
1642 1708
1643 // 2. Record attempt in health tracker 1709 // 2. Record attempt in health tracker
1644 self.health_tracker.record_attempt(relay_url); 1710 self.health_tracker.record_attempt(relay_url);
@@ -1688,8 +1754,8 @@ impl SyncManager {
1688 // 6. Update metrics 1754 // 6. Update metrics
1689 if let Some(ref metrics) = self.metrics { 1755 if let Some(ref metrics) = self.metrics {
1690 metrics.record_connection_attempt(relay_url, false); 1756 metrics.record_connection_attempt(relay_url, false);
1691 metrics 1757 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
1692 .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 1758 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1693 } 1759 }
1694 } 1760 }
1695 } 1761 }
@@ -1806,7 +1872,7 @@ impl SyncManager {
1806 1872
1807 // Update metrics 1873 // Update metrics
1808 if let Some(ref metrics) = self.metrics { 1874 if let Some(ref metrics) = self.metrics {
1809 metrics.set_relay_connected(relay_url, false); 1875 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
1810 metrics.dec_connected_count(); 1876 metrics.dec_connected_count();
1811 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 1877 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1812 } 1878 }