upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 14:12:24 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 14:12:24 +0000
commit93a1684f068603b354ba3c05957a25459c73de05 (patch)
tree324e6d0e2a6a34fd4804ef94133cd35233081bb9 /src/sync/mod.rs
parentc34492069abacae67482af4c8356241958a524f7 (diff)
feat(sync): add ConnectedDegraded status for failed historic sync
- Add ConnectionStatus::ConnectedDegraded (status=4 in metrics) - Track batch failures via PendingBatch.failed field - Track relay-level failures via RelayState.historic_sync_had_failures - Transition to ConnectedDegraded when any batch fails during historic sync - Add is_live_sync_active() helper for cleaner match patterns - Update state machine diagram with ConnectedDegraded transitions - Update metrics docs with status=4 and example queries Fixes issue where relays with failed negentropy retries would incorrectly transition to Connected status despite missing data. Now operators can distinguish 'fully synced' vs 'degraded (partial data)'.
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs104
1 files changed, 89 insertions, 15 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e5b724d..2031ef4 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -94,6 +94,18 @@ pub enum ConnectionStatus {
94 Syncing, 94 Syncing,
95 /// Successfully connected, historic sync completed 95 /// Successfully connected, historic sync completed
96 Connected, 96 Connected,
97 /// Successfully connected, historic sync failed but live sync active
98 ConnectedDegraded,
99}
100
101impl ConnectionStatus {
102 /// Returns true if live sync is active (can accept new filters)
103 pub fn is_live_sync_active(&self) -> bool {
104 matches!(
105 self,
106 ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedDegraded
107 )
108 }
97} 109}
98 110
99/// Complete state for a single relay - combines sync needs with connection lifecycle 111/// Complete state for a single relay - combines sync needs with connection lifecycle
@@ -119,6 +131,10 @@ pub struct RelayState {
119 pub historic_sync_completed: bool, 131 pub historic_sync_completed: bool,
120 /// When historic sync completed (None if never completed or cleared on fresh_start) 132 /// When historic sync completed (None if never completed or cleared on fresh_start)
121 pub historic_sync_completed_at: Option<Timestamp>, 133 pub historic_sync_completed_at: Option<Timestamp>,
134 /// Whether any batch failed during historic sync
135 /// Set to true when retry protection triggers or other failures occur
136 /// Used to transition to ConnectedDegraded instead of Connected
137 pub historic_sync_had_failures: bool,
122} 138}
123 139
124impl Default for RelayState { 140impl Default for RelayState {
@@ -133,6 +149,7 @@ impl Default for RelayState {
133 announcements_synced: false, 149 announcements_synced: false,
134 historic_sync_completed: false, 150 historic_sync_completed: false,
135 historic_sync_completed_at: None, 151 historic_sync_completed_at: None,
152 historic_sync_had_failures: false,
136 } 153 }
137 } 154 }
138} 155}
@@ -156,6 +173,7 @@ impl RelayState {
156 self.announcements_synced = false; 173 self.announcements_synced = false;
157 self.historic_sync_completed = false; 174 self.historic_sync_completed = false;
158 self.historic_sync_completed_at = None; 175 self.historic_sync_completed_at = None;
176 self.historic_sync_had_failures = false;
159 } 177 }
160} 178}
161 179
@@ -216,6 +234,9 @@ pub struct PendingBatch {
216 /// Number of retry attempts for missing events (Negentropy only) 234 /// Number of retry attempts for missing events (Negentropy only)
217 /// Used to prevent infinite retry loops when relay consistently fails 235 /// Used to prevent infinite retry loops when relay consistently fails
218 pub retry_count: usize, 236 pub retry_count: usize,
237 /// Whether this batch failed (completed with missing data)
238 /// Set to true when retry protection triggers or other failures occur
239 pub failed: bool,
219} 240}
220 241
221/// Items included in a pending batch 242/// Items included in a pending batch
@@ -682,9 +703,10 @@ impl SyncManager {
682 // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) 703 // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total)
683 ); 704 );
684 705
685 // Extract and complete batch with partial results 706 // Extract and complete batch with partial results, marking as failed
686 let batch_idx_for_completion = batch_idx; 707 let batch_idx_for_completion = batch_idx;
687 let completed_batch = batches.remove(batch_idx_for_completion); 708 let mut completed_batch = batches.remove(batch_idx_for_completion);
709 completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition
688 if batches.is_empty() { 710 if batches.is_empty() {
689 pending.remove(relay_url); 711 pending.remove(relay_url);
690 } 712 }
@@ -849,6 +871,16 @@ impl SyncManager {
849 ); 871 );
850 } 872 }
851 873
874 // Track if this batch failed (for ConnectedDegraded transition)
875 if batch.failed {
876 state.historic_sync_had_failures = true;
877 tracing::warn!(
878 relay = %relay_url,
879 batch_id = batch_id,
880 "Batch failed - will transition to ConnectedDegraded instead of Connected"
881 );
882 }
883
852 // DEBUG TRACING: Log the root events being confirmed 884 // DEBUG TRACING: Log the root events being confirmed
853 tracing::info!( 885 tracing::info!(
854 relay = %relay_url, 886 relay = %relay_url,
@@ -862,6 +894,7 @@ impl SyncManager {
862 all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), 894 all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
863 is_generic_filter = is_generic_filter, 895 is_generic_filter = is_generic_filter,
864 announcements_synced = state.announcements_synced, 896 announcements_synced = state.announcements_synced,
897 had_failures = state.historic_sync_had_failures,
865 "Batch confirmed - items moved from pending to confirmed" 898 "Batch confirmed - items moved from pending to confirmed"
866 ); 899 );
867 } else { 900 } else {
@@ -881,13 +914,24 @@ impl SyncManager {
881 914
882 /// Check if historic sync is complete and transition to Connected status 915 /// Check if historic sync is complete and transition to Connected status
883 /// 916 ///
884 /// This method checks if there are any pending batches for the relay. 917 /// This method uses a double-check pattern to avoid race conditions with
885 /// If no pending batches exist and the relay is in Syncing status, 918 /// the self-subscriber's batching window. The sequence is:
886 /// it transitions to Connected and updates metrics. 919 ///
920 /// 1. First check: Are there pending batches?
921 /// 2. Wait for batch window + buffer (6 seconds)
922 /// 3. Second check: Are there still no pending batches?
923 /// 4. If still no pending batches, transition to Connected
924 ///
925 /// This ensures that events received just before the first check have time
926 /// to be batched and create Layer 2/3 filters before we mark sync complete.
927 ///
928 /// The 6-second delay is based on:
929 /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS)
930 /// - Buffer for processing: 1 second
887 /// 931 ///
888 /// Called after each batch is confirmed to detect completion. 932 /// Called after each batch is confirmed to detect completion.
889 async fn check_and_complete_historic_sync(&self, relay_url: &str) { 933 async fn check_and_complete_historic_sync(&self, relay_url: &str) {
890 // Check if there are any pending batches 934 // First check: Are there any pending batches?
891 let has_pending = { 935 let has_pending = {
892 let pending = self.pending_sync_index.read().await; 936 let pending = self.pending_sync_index.read().await;
893 pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) 937 pending.get(relay_url).map_or(false, |batches| !batches.is_empty())
@@ -898,12 +942,33 @@ impl SyncManager {
898 return; 942 return;
899 } 943 }
900 944
901 // No pending batches - check if we should transition to Connected 945 // Wait for self-subscriber batch window + buffer to catch any in-flight events
946 // that might create new Layer 2/3 filters
947 tokio::time::sleep(Duration::from_millis(6000)).await;
948
949 // Second check: Are there still no pending batches?
950 let has_pending = {
951 let pending = self.pending_sync_index.read().await;
952 pending.get(relay_url).map_or(false, |batches| !batches.is_empty())
953 };
954
955 if has_pending {
956 // New batches appeared during the wait - still syncing
957 return;
958 }
959
960 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded
902 let mut relay_index = self.relay_sync_index.write().await; 961 let mut relay_index = self.relay_sync_index.write().await;
903 if let Some(state) = relay_index.get_mut(relay_url) { 962 if let Some(state) = relay_index.get_mut(relay_url) {
904 if state.connection_status == ConnectionStatus::Syncing { 963 if state.connection_status == ConnectionStatus::Syncing {
905 // Transition to Connected 964 // Check if any batches failed during historic sync
906 state.connection_status = ConnectionStatus::Connected; 965 let new_status = if state.historic_sync_had_failures {
966 ConnectionStatus::ConnectedDegraded
967 } else {
968 ConnectionStatus::Connected
969 };
970
971 state.connection_status = new_status;
907 state.historic_sync_completed = true; 972 state.historic_sync_completed = true;
908 state.historic_sync_completed_at = Some(Timestamp::now()); 973 state.historic_sync_completed_at = Some(Timestamp::now());
909 974
@@ -911,12 +976,15 @@ impl SyncManager {
911 relay = %relay_url, 976 relay = %relay_url,
912 repos_synced = state.repos.len(), 977 repos_synced = state.repos.len(),
913 root_events_synced = state.root_events.len(), 978 root_events_synced = state.root_events.len(),
914 "Historic sync complete - transitioned to Connected status" 979 had_failures = state.historic_sync_had_failures,
980 status = ?new_status,
981 "Historic sync complete - transitioned to {} status",
982 if state.historic_sync_had_failures { "ConnectedDegraded" } else { "Connected" }
915 ); 983 );
916 984
917 // Update metrics 985 // Update metrics
918 if let Some(ref metrics) = self.metrics { 986 if let Some(ref metrics) = self.metrics {
919 metrics.record_connection_status(relay_url, ConnectionStatus::Connected); 987 metrics.record_connection_status(relay_url, new_status);
920 } 988 }
921 } 989 }
922 } 990 }
@@ -1147,8 +1215,8 @@ impl SyncManager {
1147 ); 1215 );
1148 return; 1216 return;
1149 } 1217 }
1150 Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { 1218 Some(status) if status.is_live_sync_active() => {
1151 // Continue to subscribe - both Syncing and Connected can accept new filters 1219 // Continue to subscribe - live sync is active, can accept new filters
1152 } 1220 }
1153 } 1221 }
1154 1222
@@ -1514,8 +1582,8 @@ impl SyncManager {
1514 "Cleared sync state in fresh_start" 1582 "Cleared sync state in fresh_start"
1515 ); 1583 );
1516 } 1584 }
1517 // Only sync if we're connected (either Syncing or fully Connected) 1585 // Only sync if we're connected (live sync active)
1518 if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { 1586 if state.connection_status.is_live_sync_active() {
1519 drop(index); 1587 drop(index);
1520 self.sync_generic_filters(relay_url, None).await; 1588 self.sync_generic_filters(relay_url, None).await;
1521 // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) 1589 // Step 5: compute_actions for L2+L3 (will be triggered by EOSE)
@@ -2474,6 +2542,7 @@ impl SyncManager {
2474 requested_event_ids: None, // Will be set after negentropy diff 2542 requested_event_ids: None, // Will be set after negentropy diff
2475 received_event_ids: None, // Will be set after negentropy diff 2543 received_event_ids: None, // Will be set after negentropy diff
2476 retry_count: 0, 2544 retry_count: 0,
2545 failed: false,
2477 }; 2546 };
2478 2547
2479 // Add to pending_sync_index 2548 // Add to pending_sync_index
@@ -2712,6 +2781,7 @@ impl SyncManager {
2712 requested_event_ids: None, // Not used for REQ+EOSE 2781 requested_event_ids: None, // Not used for REQ+EOSE
2713 received_event_ids: None, // Not used for REQ+EOSE 2782 received_event_ids: None, // Not used for REQ+EOSE
2714 retry_count: 0, // Not used for REQ+EOSE 2783 retry_count: 0, // Not used for REQ+EOSE
2784 failed: false,
2715 }; 2785 };
2716 2786
2717 // Add to pending_sync_index 2787 // Add to pending_sync_index
@@ -2925,12 +2995,14 @@ mod tests {
2925 requested_event_ids: Some(HashSet::new()), 2995 requested_event_ids: Some(HashSet::new()),
2926 received_event_ids: Some(HashSet::new()), 2996 received_event_ids: Some(HashSet::new()),
2927 retry_count: 0, 2997 retry_count: 0,
2998 failed: false,
2928 }; 2999 };
2929 3000
2930 assert!(batch.requested_event_ids.is_some()); 3001 assert!(batch.requested_event_ids.is_some());
2931 assert!(batch.received_event_ids.is_some()); 3002 assert!(batch.received_event_ids.is_some());
2932 assert_eq!(batch.sync_method, SyncMethod::Negentropy); 3003 assert_eq!(batch.sync_method, SyncMethod::Negentropy);
2933 assert_eq!(batch.retry_count, 0); 3004 assert_eq!(batch.retry_count, 0);
3005 assert!(!batch.failed);
2934 } 3006 }
2935 3007
2936 #[test] 3008 #[test]
@@ -2945,10 +3017,12 @@ mod tests {
2945 requested_event_ids: None, 3017 requested_event_ids: None,
2946 received_event_ids: None, 3018 received_event_ids: None,
2947 retry_count: 0, 3019 retry_count: 0,
3020 failed: false,
2948 }; 3021 };
2949 3022
2950 assert!(batch.requested_event_ids.is_none()); 3023 assert!(batch.requested_event_ids.is_none());
2951 assert!(batch.received_event_ids.is_none()); 3024 assert!(batch.received_event_ids.is_none());
2952 assert_eq!(batch.sync_method, SyncMethod::ReqEose); 3025 assert_eq!(batch.sync_method, SyncMethod::ReqEose);
3026 assert!(!batch.failed);
2953 } 3027 }
2954} 3028}