upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/algorithms.rs2
-rw-r--r--src/sync/metrics.rs4
-rw-r--r--src/sync/mod.rs104
3 files changed, 94 insertions, 16 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index e083dc8..7536f41 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -405,6 +405,7 @@ mod tests {
405 requested_event_ids: None, 405 requested_event_ids: None,
406 received_event_ids: None, 406 received_event_ids: None,
407 retry_count: 0, 407 retry_count: 0,
408 failed: false,
408 }], 409 }],
409 ); 410 );
410 411
@@ -520,6 +521,7 @@ mod tests {
520 requested_event_ids: None, 521 requested_event_ids: None,
521 received_event_ids: None, 522 received_event_ids: None,
522 retry_count: 0, 523 retry_count: 0,
524 failed: false,
523 }], 525 }],
524 ); 526 );
525 527
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index db7dd20..0f56911 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -53,7 +53,7 @@ impl SyncMetrics {
53 let relay_connected = IntGaugeVec::new( 53 let relay_connected = IntGaugeVec::new(
54 Opts::new( 54 Opts::new(
55 "ngit_sync_relay_connected", 55 "ngit_sync_relay_connected",
56 "Relay connection status (0=disconnected, 1=connecting, 2=syncing, 3=connected)", 56 "Relay connection status (0=disconnected, 1=connecting, 2=syncing, 3=connected, 4=connected_degraded)",
57 ), 57 ),
58 &["relay"], 58 &["relay"],
59 )?; 59 )?;
@@ -208,6 +208,7 @@ impl SyncMetrics {
208 /// - Connecting = 1 (connection attempt in progress) 208 /// - Connecting = 1 (connection attempt in progress)
209 /// - Syncing = 2 (connected, historic sync in progress) 209 /// - Syncing = 2 (connected, historic sync in progress)
210 /// - Connected = 3 (connected, historic sync complete) 210 /// - Connected = 3 (connected, historic sync complete)
211 /// - ConnectedDegraded = 4 (connected, historic sync failed but live sync active)
211 /// 212 ///
212 /// This is separate from health state and provides more granular connection lifecycle tracking. 213 /// This is separate from health state and provides more granular connection lifecycle tracking.
213 /// 214 ///
@@ -222,6 +223,7 @@ impl SyncMetrics {
222 ConnectionStatus::Connecting => 1, 223 ConnectionStatus::Connecting => 1,
223 ConnectionStatus::Syncing => 2, 224 ConnectionStatus::Syncing => 2,
224 ConnectionStatus::Connected => 3, 225 ConnectionStatus::Connected => 3,
226 ConnectionStatus::ConnectedDegraded => 4,
225 }; 227 };
226 self.relay_connected 228 self.relay_connected
227 .with_label_values(&[relay]) 229 .with_label_values(&[relay])
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e5b724d..2031ef4 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -94,6 +94,18 @@ pub enum ConnectionStatus {
94 Syncing, 94 Syncing,
95 /// Successfully connected, historic sync completed 95 /// Successfully connected, historic sync completed
96 Connected, 96 Connected,
97 /// Successfully connected, historic sync failed but live sync active
98 ConnectedDegraded,
99}
100
101impl ConnectionStatus {
102 /// Returns true if live sync is active (can accept new filters)
103 pub fn is_live_sync_active(&self) -> bool {
104 matches!(
105 self,
106 ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedDegraded
107 )
108 }
97} 109}
98 110
99/// Complete state for a single relay - combines sync needs with connection lifecycle 111/// Complete state for a single relay - combines sync needs with connection lifecycle
@@ -119,6 +131,10 @@ pub struct RelayState {
119 pub historic_sync_completed: bool, 131 pub historic_sync_completed: bool,
120 /// When historic sync completed (None if never completed or cleared on fresh_start) 132 /// When historic sync completed (None if never completed or cleared on fresh_start)
121 pub historic_sync_completed_at: Option<Timestamp>, 133 pub historic_sync_completed_at: Option<Timestamp>,
134 /// Whether any batch failed during historic sync
135 /// Set to true when retry protection triggers or other failures occur
136 /// Used to transition to ConnectedDegraded instead of Connected
137 pub historic_sync_had_failures: bool,
122} 138}
123 139
124impl Default for RelayState { 140impl Default for RelayState {
@@ -133,6 +149,7 @@ impl Default for RelayState {
133 announcements_synced: false, 149 announcements_synced: false,
134 historic_sync_completed: false, 150 historic_sync_completed: false,
135 historic_sync_completed_at: None, 151 historic_sync_completed_at: None,
152 historic_sync_had_failures: false,
136 } 153 }
137 } 154 }
138} 155}
@@ -156,6 +173,7 @@ impl RelayState {
156 self.announcements_synced = false; 173 self.announcements_synced = false;
157 self.historic_sync_completed = false; 174 self.historic_sync_completed = false;
158 self.historic_sync_completed_at = None; 175 self.historic_sync_completed_at = None;
176 self.historic_sync_had_failures = false;
159 } 177 }
160} 178}
161 179
@@ -216,6 +234,9 @@ pub struct PendingBatch {
216 /// Number of retry attempts for missing events (Negentropy only) 234 /// Number of retry attempts for missing events (Negentropy only)
217 /// Used to prevent infinite retry loops when relay consistently fails 235 /// Used to prevent infinite retry loops when relay consistently fails
218 pub retry_count: usize, 236 pub retry_count: usize,
237 /// Whether this batch failed (completed with missing data)
238 /// Set to true when retry protection triggers or other failures occur
239 pub failed: bool,
219} 240}
220 241
221/// Items included in a pending batch 242/// Items included in a pending batch
@@ -682,9 +703,10 @@ impl SyncManager {
682 // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) 703 // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total)
683 ); 704 );
684 705
685 // Extract and complete batch with partial results 706 // Extract and complete batch with partial results, marking as failed
686 let batch_idx_for_completion = batch_idx; 707 let batch_idx_for_completion = batch_idx;
687 let completed_batch = batches.remove(batch_idx_for_completion); 708 let mut completed_batch = batches.remove(batch_idx_for_completion);
709 completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition
688 if batches.is_empty() { 710 if batches.is_empty() {
689 pending.remove(relay_url); 711 pending.remove(relay_url);
690 } 712 }
@@ -849,6 +871,16 @@ impl SyncManager {
849 ); 871 );
850 } 872 }
851 873
874 // Track if this batch failed (for ConnectedDegraded transition)
875 if batch.failed {
876 state.historic_sync_had_failures = true;
877 tracing::warn!(
878 relay = %relay_url,
879 batch_id = batch_id,
880 "Batch failed - will transition to ConnectedDegraded instead of Connected"
881 );
882 }
883
852 // DEBUG TRACING: Log the root events being confirmed 884 // DEBUG TRACING: Log the root events being confirmed
853 tracing::info!( 885 tracing::info!(
854 relay = %relay_url, 886 relay = %relay_url,
@@ -862,6 +894,7 @@ impl SyncManager {
862 all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), 894 all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
863 is_generic_filter = is_generic_filter, 895 is_generic_filter = is_generic_filter,
864 announcements_synced = state.announcements_synced, 896 announcements_synced = state.announcements_synced,
897 had_failures = state.historic_sync_had_failures,
865 "Batch confirmed - items moved from pending to confirmed" 898 "Batch confirmed - items moved from pending to confirmed"
866 ); 899 );
867 } else { 900 } else {
@@ -881,13 +914,24 @@ impl SyncManager {
881 914
882 /// Check if historic sync is complete and transition to Connected status 915 /// Check if historic sync is complete and transition to Connected status
883 /// 916 ///
884 /// This method checks if there are any pending batches for the relay. 917 /// This method uses a double-check pattern to avoid race conditions with
885 /// If no pending batches exist and the relay is in Syncing status, 918 /// the self-subscriber's batching window. The sequence is:
886 /// it transitions to Connected and updates metrics. 919 ///
920 /// 1. First check: Are there pending batches?
921 /// 2. Wait for batch window + buffer (6 seconds)
922 /// 3. Second check: Are there still no pending batches?
923 /// 4. If still no pending batches, transition to Connected
924 ///
925 /// This ensures that events received just before the first check have time
926 /// to be batched and create Layer 2/3 filters before we mark sync complete.
927 ///
928 /// The 6-second delay is based on:
929 /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS)
930 /// - Buffer for processing: 1 second
887 /// 931 ///
888 /// Called after each batch is confirmed to detect completion. 932 /// Called after each batch is confirmed to detect completion.
889 async fn check_and_complete_historic_sync(&self, relay_url: &str) { 933 async fn check_and_complete_historic_sync(&self, relay_url: &str) {
890 // Check if there are any pending batches 934 // First check: Are there any pending batches?
891 let has_pending = { 935 let has_pending = {
892 let pending = self.pending_sync_index.read().await; 936 let pending = self.pending_sync_index.read().await;
893 pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) 937 pending.get(relay_url).map_or(false, |batches| !batches.is_empty())
@@ -898,12 +942,33 @@ impl SyncManager {
898 return; 942 return;
899 } 943 }
900 944
901 // No pending batches - check if we should transition to Connected 945 // Wait for self-subscriber batch window + buffer to catch any in-flight events
946 // that might create new Layer 2/3 filters
947 tokio::time::sleep(Duration::from_millis(6000)).await;
948
949 // Second check: Are there still no pending batches?
950 let has_pending = {
951 let pending = self.pending_sync_index.read().await;
952 pending.get(relay_url).map_or(false, |batches| !batches.is_empty())
953 };
954
955 if has_pending {
956 // New batches appeared during the wait - still syncing
957 return;
958 }
959
960 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded
902 let mut relay_index = self.relay_sync_index.write().await; 961 let mut relay_index = self.relay_sync_index.write().await;
903 if let Some(state) = relay_index.get_mut(relay_url) { 962 if let Some(state) = relay_index.get_mut(relay_url) {
904 if state.connection_status == ConnectionStatus::Syncing { 963 if state.connection_status == ConnectionStatus::Syncing {
905 // Transition to Connected 964 // Check if any batches failed during historic sync
906 state.connection_status = ConnectionStatus::Connected; 965 let new_status = if state.historic_sync_had_failures {
966 ConnectionStatus::ConnectedDegraded
967 } else {
968 ConnectionStatus::Connected
969 };
970
971 state.connection_status = new_status;
907 state.historic_sync_completed = true; 972 state.historic_sync_completed = true;
908 state.historic_sync_completed_at = Some(Timestamp::now()); 973 state.historic_sync_completed_at = Some(Timestamp::now());
909 974
@@ -911,12 +976,15 @@ impl SyncManager {
911 relay = %relay_url, 976 relay = %relay_url,
912 repos_synced = state.repos.len(), 977 repos_synced = state.repos.len(),
913 root_events_synced = state.root_events.len(), 978 root_events_synced = state.root_events.len(),
914 "Historic sync complete - transitioned to Connected status" 979 had_failures = state.historic_sync_had_failures,
980 status = ?new_status,
981 "Historic sync complete - transitioned to {} status",
982 if state.historic_sync_had_failures { "ConnectedDegraded" } else { "Connected" }
915 ); 983 );
916 984
917 // Update metrics 985 // Update metrics
918 if let Some(ref metrics) = self.metrics { 986 if let Some(ref metrics) = self.metrics {
919 metrics.record_connection_status(relay_url, ConnectionStatus::Connected); 987 metrics.record_connection_status(relay_url, new_status);
920 } 988 }
921 } 989 }
922 } 990 }
@@ -1147,8 +1215,8 @@ impl SyncManager {
1147 ); 1215 );
1148 return; 1216 return;
1149 } 1217 }
1150 Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { 1218 Some(status) if status.is_live_sync_active() => {
1151 // Continue to subscribe - both Syncing and Connected can accept new filters 1219 // Continue to subscribe - live sync is active, can accept new filters
1152 } 1220 }
1153 } 1221 }
1154 1222
@@ -1514,8 +1582,8 @@ impl SyncManager {
1514 "Cleared sync state in fresh_start" 1582 "Cleared sync state in fresh_start"
1515 ); 1583 );
1516 } 1584 }
1517 // Only sync if we're connected (either Syncing or fully Connected) 1585 // Only sync if we're connected (live sync active)
1518 if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { 1586 if state.connection_status.is_live_sync_active() {
1519 drop(index); 1587 drop(index);
1520 self.sync_generic_filters(relay_url, None).await; 1588 self.sync_generic_filters(relay_url, None).await;
1521 // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) 1589 // Step 5: compute_actions for L2+L3 (will be triggered by EOSE)
@@ -2474,6 +2542,7 @@ impl SyncManager {
2474 requested_event_ids: None, // Will be set after negentropy diff 2542 requested_event_ids: None, // Will be set after negentropy diff
2475 received_event_ids: None, // Will be set after negentropy diff 2543 received_event_ids: None, // Will be set after negentropy diff
2476 retry_count: 0, 2544 retry_count: 0,
2545 failed: false,
2477 }; 2546 };
2478 2547
2479 // Add to pending_sync_index 2548 // Add to pending_sync_index
@@ -2712,6 +2781,7 @@ impl SyncManager {
2712 requested_event_ids: None, // Not used for REQ+EOSE 2781 requested_event_ids: None, // Not used for REQ+EOSE
2713 received_event_ids: None, // Not used for REQ+EOSE 2782 received_event_ids: None, // Not used for REQ+EOSE
2714 retry_count: 0, // Not used for REQ+EOSE 2783 retry_count: 0, // Not used for REQ+EOSE
2784 failed: false,
2715 }; 2785 };
2716 2786
2717 // Add to pending_sync_index 2787 // Add to pending_sync_index
@@ -2925,12 +2995,14 @@ mod tests {
2925 requested_event_ids: Some(HashSet::new()), 2995 requested_event_ids: Some(HashSet::new()),
2926 received_event_ids: Some(HashSet::new()), 2996 received_event_ids: Some(HashSet::new()),
2927 retry_count: 0, 2997 retry_count: 0,
2998 failed: false,
2928 }; 2999 };
2929 3000
2930 assert!(batch.requested_event_ids.is_some()); 3001 assert!(batch.requested_event_ids.is_some());
2931 assert!(batch.received_event_ids.is_some()); 3002 assert!(batch.received_event_ids.is_some());
2932 assert_eq!(batch.sync_method, SyncMethod::Negentropy); 3003 assert_eq!(batch.sync_method, SyncMethod::Negentropy);
2933 assert_eq!(batch.retry_count, 0); 3004 assert_eq!(batch.retry_count, 0);
3005 assert!(!batch.failed);
2934 } 3006 }
2935 3007
2936 #[test] 3008 #[test]
@@ -2945,10 +3017,12 @@ mod tests {
2945 requested_event_ids: None, 3017 requested_event_ids: None,
2946 received_event_ids: None, 3018 received_event_ids: None,
2947 retry_count: 0, 3019 retry_count: 0,
3020 failed: false,
2948 }; 3021 };
2949 3022
2950 assert!(batch.requested_event_ids.is_none()); 3023 assert!(batch.requested_event_ids.is_none());
2951 assert!(batch.received_event_ids.is_none()); 3024 assert!(batch.received_event_ids.is_none());
2952 assert_eq!(batch.sync_method, SyncMethod::ReqEose); 3025 assert_eq!(batch.sync_method, SyncMethod::ReqEose);
3026 assert!(!batch.failed);
2953 } 3027 }
2954} 3028}