diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/algorithms.rs | 2 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 4 | ||||
| -rw-r--r-- | src/sync/mod.rs | 104 |
3 files changed, 94 insertions, 16 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index e083dc8..7536f41 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -405,6 +405,7 @@ mod tests { | |||
| 405 | requested_event_ids: None, | 405 | requested_event_ids: None, |
| 406 | received_event_ids: None, | 406 | received_event_ids: None, |
| 407 | retry_count: 0, | 407 | retry_count: 0, |
| 408 | failed: false, | ||
| 408 | }], | 409 | }], |
| 409 | ); | 410 | ); |
| 410 | 411 | ||
| @@ -520,6 +521,7 @@ mod tests { | |||
| 520 | requested_event_ids: None, | 521 | requested_event_ids: None, |
| 521 | received_event_ids: None, | 522 | received_event_ids: None, |
| 522 | retry_count: 0, | 523 | retry_count: 0, |
| 524 | failed: false, | ||
| 523 | }], | 525 | }], |
| 524 | ); | 526 | ); |
| 525 | 527 | ||
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index db7dd20..0f56911 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -53,7 +53,7 @@ impl SyncMetrics { | |||
| 53 | let relay_connected = IntGaugeVec::new( | 53 | let relay_connected = IntGaugeVec::new( |
| 54 | Opts::new( | 54 | Opts::new( |
| 55 | "ngit_sync_relay_connected", | 55 | "ngit_sync_relay_connected", |
| 56 | "Relay connection status (0=disconnected, 1=connecting, 2=syncing, 3=connected)", | 56 | "Relay connection status (0=disconnected, 1=connecting, 2=syncing, 3=connected, 4=connected_degraded)", |
| 57 | ), | 57 | ), |
| 58 | &["relay"], | 58 | &["relay"], |
| 59 | )?; | 59 | )?; |
| @@ -208,6 +208,7 @@ impl SyncMetrics { | |||
| 208 | /// - Connecting = 1 (connection attempt in progress) | 208 | /// - Connecting = 1 (connection attempt in progress) |
| 209 | /// - Syncing = 2 (connected, historic sync in progress) | 209 | /// - Syncing = 2 (connected, historic sync in progress) |
| 210 | /// - Connected = 3 (connected, historic sync complete) | 210 | /// - Connected = 3 (connected, historic sync complete) |
| 211 | /// - ConnectedDegraded = 4 (connected, historic sync failed but live sync active) | ||
| 211 | /// | 212 | /// |
| 212 | /// This is separate from health state and provides more granular connection lifecycle tracking. | 213 | /// This is separate from health state and provides more granular connection lifecycle tracking. |
| 213 | /// | 214 | /// |
| @@ -222,6 +223,7 @@ impl SyncMetrics { | |||
| 222 | ConnectionStatus::Connecting => 1, | 223 | ConnectionStatus::Connecting => 1, |
| 223 | ConnectionStatus::Syncing => 2, | 224 | ConnectionStatus::Syncing => 2, |
| 224 | ConnectionStatus::Connected => 3, | 225 | ConnectionStatus::Connected => 3, |
| 226 | ConnectionStatus::ConnectedDegraded => 4, | ||
| 225 | }; | 227 | }; |
| 226 | self.relay_connected | 228 | self.relay_connected |
| 227 | .with_label_values(&[relay]) | 229 | .with_label_values(&[relay]) |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e5b724d..2031ef4 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -94,6 +94,18 @@ pub enum ConnectionStatus { | |||
| 94 | Syncing, | 94 | Syncing, |
| 95 | /// Successfully connected, historic sync completed | 95 | /// Successfully connected, historic sync completed |
| 96 | Connected, | 96 | Connected, |
| 97 | /// Successfully connected, historic sync failed but live sync active | ||
| 98 | ConnectedDegraded, | ||
| 99 | } | ||
| 100 | |||
| 101 | impl ConnectionStatus { | ||
| 102 | /// Returns true if live sync is active (can accept new filters) | ||
| 103 | pub fn is_live_sync_active(&self) -> bool { | ||
| 104 | matches!( | ||
| 105 | self, | ||
| 106 | ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedDegraded | ||
| 107 | ) | ||
| 108 | } | ||
| 97 | } | 109 | } |
| 98 | 110 | ||
| 99 | /// Complete state for a single relay - combines sync needs with connection lifecycle | 111 | /// Complete state for a single relay - combines sync needs with connection lifecycle |
| @@ -119,6 +131,10 @@ pub struct RelayState { | |||
| 119 | pub historic_sync_completed: bool, | 131 | pub historic_sync_completed: bool, |
| 120 | /// When historic sync completed (None if never completed or cleared on fresh_start) | 132 | /// When historic sync completed (None if never completed or cleared on fresh_start) |
| 121 | pub historic_sync_completed_at: Option<Timestamp>, | 133 | pub historic_sync_completed_at: Option<Timestamp>, |
| 134 | /// Whether any batch failed during historic sync | ||
| 135 | /// Set to true when retry protection triggers or other failures occur | ||
| 136 | /// Used to transition to ConnectedDegraded instead of Connected | ||
| 137 | pub historic_sync_had_failures: bool, | ||
| 122 | } | 138 | } |
| 123 | 139 | ||
| 124 | impl Default for RelayState { | 140 | impl Default for RelayState { |
| @@ -133,6 +149,7 @@ impl Default for RelayState { | |||
| 133 | announcements_synced: false, | 149 | announcements_synced: false, |
| 134 | historic_sync_completed: false, | 150 | historic_sync_completed: false, |
| 135 | historic_sync_completed_at: None, | 151 | historic_sync_completed_at: None, |
| 152 | historic_sync_had_failures: false, | ||
| 136 | } | 153 | } |
| 137 | } | 154 | } |
| 138 | } | 155 | } |
| @@ -156,6 +173,7 @@ impl RelayState { | |||
| 156 | self.announcements_synced = false; | 173 | self.announcements_synced = false; |
| 157 | self.historic_sync_completed = false; | 174 | self.historic_sync_completed = false; |
| 158 | self.historic_sync_completed_at = None; | 175 | self.historic_sync_completed_at = None; |
| 176 | self.historic_sync_had_failures = false; | ||
| 159 | } | 177 | } |
| 160 | } | 178 | } |
| 161 | 179 | ||
| @@ -216,6 +234,9 @@ pub struct PendingBatch { | |||
| 216 | /// Number of retry attempts for missing events (Negentropy only) | 234 | /// Number of retry attempts for missing events (Negentropy only) |
| 217 | /// Used to prevent infinite retry loops when relay consistently fails | 235 | /// Used to prevent infinite retry loops when relay consistently fails |
| 218 | pub retry_count: usize, | 236 | pub retry_count: usize, |
| 237 | /// Whether this batch failed (completed with missing data) | ||
| 238 | /// Set to true when retry protection triggers or other failures occur | ||
| 239 | pub failed: bool, | ||
| 219 | } | 240 | } |
| 220 | 241 | ||
| 221 | /// Items included in a pending batch | 242 | /// Items included in a pending batch |
| @@ -682,9 +703,10 @@ impl SyncManager { | |||
| 682 | // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) | 703 | // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) |
| 683 | ); | 704 | ); |
| 684 | 705 | ||
| 685 | // Extract and complete batch with partial results | 706 | // Extract and complete batch with partial results, marking as failed |
| 686 | let batch_idx_for_completion = batch_idx; | 707 | let batch_idx_for_completion = batch_idx; |
| 687 | let completed_batch = batches.remove(batch_idx_for_completion); | 708 | let mut completed_batch = batches.remove(batch_idx_for_completion); |
| 709 | completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition | ||
| 688 | if batches.is_empty() { | 710 | if batches.is_empty() { |
| 689 | pending.remove(relay_url); | 711 | pending.remove(relay_url); |
| 690 | } | 712 | } |
| @@ -849,6 +871,16 @@ impl SyncManager { | |||
| 849 | ); | 871 | ); |
| 850 | } | 872 | } |
| 851 | 873 | ||
| 874 | // Track if this batch failed (for ConnectedDegraded transition) | ||
| 875 | if batch.failed { | ||
| 876 | state.historic_sync_had_failures = true; | ||
| 877 | tracing::warn!( | ||
| 878 | relay = %relay_url, | ||
| 879 | batch_id = batch_id, | ||
| 880 | "Batch failed - will transition to ConnectedDegraded instead of Connected" | ||
| 881 | ); | ||
| 882 | } | ||
| 883 | |||
| 852 | // DEBUG TRACING: Log the root events being confirmed | 884 | // DEBUG TRACING: Log the root events being confirmed |
| 853 | tracing::info!( | 885 | tracing::info!( |
| 854 | relay = %relay_url, | 886 | relay = %relay_url, |
| @@ -862,6 +894,7 @@ impl SyncManager { | |||
| 862 | all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | 894 | all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), |
| 863 | is_generic_filter = is_generic_filter, | 895 | is_generic_filter = is_generic_filter, |
| 864 | announcements_synced = state.announcements_synced, | 896 | announcements_synced = state.announcements_synced, |
| 897 | had_failures = state.historic_sync_had_failures, | ||
| 865 | "Batch confirmed - items moved from pending to confirmed" | 898 | "Batch confirmed - items moved from pending to confirmed" |
| 866 | ); | 899 | ); |
| 867 | } else { | 900 | } else { |
| @@ -881,13 +914,24 @@ impl SyncManager { | |||
| 881 | 914 | ||
| 882 | /// Check if historic sync is complete and transition to Connected status | 915 | /// Check if historic sync is complete and transition to Connected status |
| 883 | /// | 916 | /// |
| 884 | /// This method checks if there are any pending batches for the relay. | 917 | /// This method uses a double-check pattern to avoid race conditions with |
| 885 | /// If no pending batches exist and the relay is in Syncing status, | 918 | /// the self-subscriber's batching window. The sequence is: |
| 886 | /// it transitions to Connected and updates metrics. | 919 | /// |
| 920 | /// 1. First check: Are there pending batches? | ||
| 921 | /// 2. Wait for batch window + buffer (6 seconds) | ||
| 922 | /// 3. Second check: Are there still no pending batches? | ||
| 923 | /// 4. If still no pending batches, transition to Connected | ||
| 924 | /// | ||
| 925 | /// This ensures that events received just before the first check have time | ||
| 926 | /// to be batched and create Layer 2/3 filters before we mark sync complete. | ||
| 927 | /// | ||
| 928 | /// The 6-second delay is based on: | ||
| 929 | /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) | ||
| 930 | /// - Buffer for processing: 1 second | ||
| 887 | /// | 931 | /// |
| 888 | /// Called after each batch is confirmed to detect completion. | 932 | /// Called after each batch is confirmed to detect completion. |
| 889 | async fn check_and_complete_historic_sync(&self, relay_url: &str) { | 933 | async fn check_and_complete_historic_sync(&self, relay_url: &str) { |
| 890 | // Check if there are any pending batches | 934 | // First check: Are there any pending batches? |
| 891 | let has_pending = { | 935 | let has_pending = { |
| 892 | let pending = self.pending_sync_index.read().await; | 936 | let pending = self.pending_sync_index.read().await; |
| 893 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) | 937 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) |
| @@ -898,12 +942,33 @@ impl SyncManager { | |||
| 898 | return; | 942 | return; |
| 899 | } | 943 | } |
| 900 | 944 | ||
| 901 | // No pending batches - check if we should transition to Connected | 945 | // Wait for self-subscriber batch window + buffer to catch any in-flight events |
| 946 | // that might create new Layer 2/3 filters | ||
| 947 | tokio::time::sleep(Duration::from_millis(6000)).await; | ||
| 948 | |||
| 949 | // Second check: Are there still no pending batches? | ||
| 950 | let has_pending = { | ||
| 951 | let pending = self.pending_sync_index.read().await; | ||
| 952 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) | ||
| 953 | }; | ||
| 954 | |||
| 955 | if has_pending { | ||
| 956 | // New batches appeared during the wait - still syncing | ||
| 957 | return; | ||
| 958 | } | ||
| 959 | |||
| 960 | // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded | ||
| 902 | let mut relay_index = self.relay_sync_index.write().await; | 961 | let mut relay_index = self.relay_sync_index.write().await; |
| 903 | if let Some(state) = relay_index.get_mut(relay_url) { | 962 | if let Some(state) = relay_index.get_mut(relay_url) { |
| 904 | if state.connection_status == ConnectionStatus::Syncing { | 963 | if state.connection_status == ConnectionStatus::Syncing { |
| 905 | // Transition to Connected | 964 | // Check if any batches failed during historic sync |
| 906 | state.connection_status = ConnectionStatus::Connected; | 965 | let new_status = if state.historic_sync_had_failures { |
| 966 | ConnectionStatus::ConnectedDegraded | ||
| 967 | } else { | ||
| 968 | ConnectionStatus::Connected | ||
| 969 | }; | ||
| 970 | |||
| 971 | state.connection_status = new_status; | ||
| 907 | state.historic_sync_completed = true; | 972 | state.historic_sync_completed = true; |
| 908 | state.historic_sync_completed_at = Some(Timestamp::now()); | 973 | state.historic_sync_completed_at = Some(Timestamp::now()); |
| 909 | 974 | ||
| @@ -911,12 +976,15 @@ impl SyncManager { | |||
| 911 | relay = %relay_url, | 976 | relay = %relay_url, |
| 912 | repos_synced = state.repos.len(), | 977 | repos_synced = state.repos.len(), |
| 913 | root_events_synced = state.root_events.len(), | 978 | root_events_synced = state.root_events.len(), |
| 914 | "Historic sync complete - transitioned to Connected status" | 979 | had_failures = state.historic_sync_had_failures, |
| 980 | status = ?new_status, | ||
| 981 | "Historic sync complete - transitioned to {} status", | ||
| 982 | if state.historic_sync_had_failures { "ConnectedDegraded" } else { "Connected" } | ||
| 915 | ); | 983 | ); |
| 916 | 984 | ||
| 917 | // Update metrics | 985 | // Update metrics |
| 918 | if let Some(ref metrics) = self.metrics { | 986 | if let Some(ref metrics) = self.metrics { |
| 919 | metrics.record_connection_status(relay_url, ConnectionStatus::Connected); | 987 | metrics.record_connection_status(relay_url, new_status); |
| 920 | } | 988 | } |
| 921 | } | 989 | } |
| 922 | } | 990 | } |
| @@ -1147,8 +1215,8 @@ impl SyncManager { | |||
| 1147 | ); | 1215 | ); |
| 1148 | return; | 1216 | return; |
| 1149 | } | 1217 | } |
| 1150 | Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { | 1218 | Some(status) if status.is_live_sync_active() => { |
| 1151 | // Continue to subscribe - both Syncing and Connected can accept new filters | 1219 | // Continue to subscribe - live sync is active, can accept new filters |
| 1152 | } | 1220 | } |
| 1153 | } | 1221 | } |
| 1154 | 1222 | ||
| @@ -1514,8 +1582,8 @@ impl SyncManager { | |||
| 1514 | "Cleared sync state in fresh_start" | 1582 | "Cleared sync state in fresh_start" |
| 1515 | ); | 1583 | ); |
| 1516 | } | 1584 | } |
| 1517 | // Only sync if we're connected (either Syncing or fully Connected) | 1585 | // Only sync if we're connected (live sync active) |
| 1518 | if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { | 1586 | if state.connection_status.is_live_sync_active() { |
| 1519 | drop(index); | 1587 | drop(index); |
| 1520 | self.sync_generic_filters(relay_url, None).await; | 1588 | self.sync_generic_filters(relay_url, None).await; |
| 1521 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) | 1589 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) |
| @@ -2474,6 +2542,7 @@ impl SyncManager { | |||
| 2474 | requested_event_ids: None, // Will be set after negentropy diff | 2542 | requested_event_ids: None, // Will be set after negentropy diff |
| 2475 | received_event_ids: None, // Will be set after negentropy diff | 2543 | received_event_ids: None, // Will be set after negentropy diff |
| 2476 | retry_count: 0, | 2544 | retry_count: 0, |
| 2545 | failed: false, | ||
| 2477 | }; | 2546 | }; |
| 2478 | 2547 | ||
| 2479 | // Add to pending_sync_index | 2548 | // Add to pending_sync_index |
| @@ -2712,6 +2781,7 @@ impl SyncManager { | |||
| 2712 | requested_event_ids: None, // Not used for REQ+EOSE | 2781 | requested_event_ids: None, // Not used for REQ+EOSE |
| 2713 | received_event_ids: None, // Not used for REQ+EOSE | 2782 | received_event_ids: None, // Not used for REQ+EOSE |
| 2714 | retry_count: 0, // Not used for REQ+EOSE | 2783 | retry_count: 0, // Not used for REQ+EOSE |
| 2784 | failed: false, | ||
| 2715 | }; | 2785 | }; |
| 2716 | 2786 | ||
| 2717 | // Add to pending_sync_index | 2787 | // Add to pending_sync_index |
| @@ -2925,12 +2995,14 @@ mod tests { | |||
| 2925 | requested_event_ids: Some(HashSet::new()), | 2995 | requested_event_ids: Some(HashSet::new()), |
| 2926 | received_event_ids: Some(HashSet::new()), | 2996 | received_event_ids: Some(HashSet::new()), |
| 2927 | retry_count: 0, | 2997 | retry_count: 0, |
| 2998 | failed: false, | ||
| 2928 | }; | 2999 | }; |
| 2929 | 3000 | ||
| 2930 | assert!(batch.requested_event_ids.is_some()); | 3001 | assert!(batch.requested_event_ids.is_some()); |
| 2931 | assert!(batch.received_event_ids.is_some()); | 3002 | assert!(batch.received_event_ids.is_some()); |
| 2932 | assert_eq!(batch.sync_method, SyncMethod::Negentropy); | 3003 | assert_eq!(batch.sync_method, SyncMethod::Negentropy); |
| 2933 | assert_eq!(batch.retry_count, 0); | 3004 | assert_eq!(batch.retry_count, 0); |
| 3005 | assert!(!batch.failed); | ||
| 2934 | } | 3006 | } |
| 2935 | 3007 | ||
| 2936 | #[test] | 3008 | #[test] |
| @@ -2945,10 +3017,12 @@ mod tests { | |||
| 2945 | requested_event_ids: None, | 3017 | requested_event_ids: None, |
| 2946 | received_event_ids: None, | 3018 | received_event_ids: None, |
| 2947 | retry_count: 0, | 3019 | retry_count: 0, |
| 3020 | failed: false, | ||
| 2948 | }; | 3021 | }; |
| 2949 | 3022 | ||
| 2950 | assert!(batch.requested_event_ids.is_none()); | 3023 | assert!(batch.requested_event_ids.is_none()); |
| 2951 | assert!(batch.received_event_ids.is_none()); | 3024 | assert!(batch.received_event_ids.is_none()); |
| 2952 | assert_eq!(batch.sync_method, SyncMethod::ReqEose); | 3025 | assert_eq!(batch.sync_method, SyncMethod::ReqEose); |
| 3026 | assert!(!batch.failed); | ||
| 2953 | } | 3027 | } |
| 2954 | } | 3028 | } |