diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 14:12:24 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 14:12:24 +0000 |
| commit | 93a1684f068603b354ba3c05957a25459c73de05 (patch) | |
| tree | 324e6d0e2a6a34fd4804ef94133cd35233081bb9 /src/sync/mod.rs | |
| parent | c34492069abacae67482af4c8356241958a524f7 (diff) | |
feat(sync): add ConnectedDegraded status for failed historic sync
- Add ConnectionStatus::ConnectedDegraded (status=4 in metrics)
- Track batch failures via PendingBatch.failed field
- Track relay-level failures via RelayState.historic_sync_had_failures
- Transition to ConnectedDegraded when any batch fails during historic sync
- Add is_live_sync_active() helper for cleaner match patterns
- Update state machine diagram with ConnectedDegraded transitions
- Update metrics docs with status=4 and example queries
Fixes issue where relays with failed negentropy retries would
incorrectly transition to Connected status despite missing data.
Now operators can distinguish 'fully synced' vs 'degraded (partial data)'.
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 104 |
1 files changed, 89 insertions, 15 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e5b724d..2031ef4 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -94,6 +94,18 @@ pub enum ConnectionStatus { | |||
| 94 | Syncing, | 94 | Syncing, |
| 95 | /// Successfully connected, historic sync completed | 95 | /// Successfully connected, historic sync completed |
| 96 | Connected, | 96 | Connected, |
| 97 | /// Successfully connected, historic sync failed but live sync active | ||
| 98 | ConnectedDegraded, | ||
| 99 | } | ||
| 100 | |||
| 101 | impl ConnectionStatus { | ||
| 102 | /// Returns true if live sync is active (can accept new filters) | ||
| 103 | pub fn is_live_sync_active(&self) -> bool { | ||
| 104 | matches!( | ||
| 105 | self, | ||
| 106 | ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedDegraded | ||
| 107 | ) | ||
| 108 | } | ||
| 97 | } | 109 | } |
| 98 | 110 | ||
| 99 | /// Complete state for a single relay - combines sync needs with connection lifecycle | 111 | /// Complete state for a single relay - combines sync needs with connection lifecycle |
| @@ -119,6 +131,10 @@ pub struct RelayState { | |||
| 119 | pub historic_sync_completed: bool, | 131 | pub historic_sync_completed: bool, |
| 120 | /// When historic sync completed (None if never completed or cleared on fresh_start) | 132 | /// When historic sync completed (None if never completed or cleared on fresh_start) |
| 121 | pub historic_sync_completed_at: Option<Timestamp>, | 133 | pub historic_sync_completed_at: Option<Timestamp>, |
| 134 | /// Whether any batch failed during historic sync | ||
| 135 | /// Set to true when retry protection triggers or other failures occur | ||
| 136 | /// Used to transition to ConnectedDegraded instead of Connected | ||
| 137 | pub historic_sync_had_failures: bool, | ||
| 122 | } | 138 | } |
| 123 | 139 | ||
| 124 | impl Default for RelayState { | 140 | impl Default for RelayState { |
| @@ -133,6 +149,7 @@ impl Default for RelayState { | |||
| 133 | announcements_synced: false, | 149 | announcements_synced: false, |
| 134 | historic_sync_completed: false, | 150 | historic_sync_completed: false, |
| 135 | historic_sync_completed_at: None, | 151 | historic_sync_completed_at: None, |
| 152 | historic_sync_had_failures: false, | ||
| 136 | } | 153 | } |
| 137 | } | 154 | } |
| 138 | } | 155 | } |
| @@ -156,6 +173,7 @@ impl RelayState { | |||
| 156 | self.announcements_synced = false; | 173 | self.announcements_synced = false; |
| 157 | self.historic_sync_completed = false; | 174 | self.historic_sync_completed = false; |
| 158 | self.historic_sync_completed_at = None; | 175 | self.historic_sync_completed_at = None; |
| 176 | self.historic_sync_had_failures = false; | ||
| 159 | } | 177 | } |
| 160 | } | 178 | } |
| 161 | 179 | ||
| @@ -216,6 +234,9 @@ pub struct PendingBatch { | |||
| 216 | /// Number of retry attempts for missing events (Negentropy only) | 234 | /// Number of retry attempts for missing events (Negentropy only) |
| 217 | /// Used to prevent infinite retry loops when relay consistently fails | 235 | /// Used to prevent infinite retry loops when relay consistently fails |
| 218 | pub retry_count: usize, | 236 | pub retry_count: usize, |
| 237 | /// Whether this batch failed (completed with missing data) | ||
| 238 | /// Set to true when retry protection triggers or other failures occur | ||
| 239 | pub failed: bool, | ||
| 219 | } | 240 | } |
| 220 | 241 | ||
| 221 | /// Items included in a pending batch | 242 | /// Items included in a pending batch |
| @@ -682,9 +703,10 @@ impl SyncManager { | |||
| 682 | // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) | 703 | // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) |
| 683 | ); | 704 | ); |
| 684 | 705 | ||
| 685 | // Extract and complete batch with partial results | 706 | // Extract and complete batch with partial results, marking as failed |
| 686 | let batch_idx_for_completion = batch_idx; | 707 | let batch_idx_for_completion = batch_idx; |
| 687 | let completed_batch = batches.remove(batch_idx_for_completion); | 708 | let mut completed_batch = batches.remove(batch_idx_for_completion); |
| 709 | completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition | ||
| 688 | if batches.is_empty() { | 710 | if batches.is_empty() { |
| 689 | pending.remove(relay_url); | 711 | pending.remove(relay_url); |
| 690 | } | 712 | } |
| @@ -849,6 +871,16 @@ impl SyncManager { | |||
| 849 | ); | 871 | ); |
| 850 | } | 872 | } |
| 851 | 873 | ||
| 874 | // Track if this batch failed (for ConnectedDegraded transition) | ||
| 875 | if batch.failed { | ||
| 876 | state.historic_sync_had_failures = true; | ||
| 877 | tracing::warn!( | ||
| 878 | relay = %relay_url, | ||
| 879 | batch_id = batch_id, | ||
| 880 | "Batch failed - will transition to ConnectedDegraded instead of Connected" | ||
| 881 | ); | ||
| 882 | } | ||
| 883 | |||
| 852 | // DEBUG TRACING: Log the root events being confirmed | 884 | // DEBUG TRACING: Log the root events being confirmed |
| 853 | tracing::info!( | 885 | tracing::info!( |
| 854 | relay = %relay_url, | 886 | relay = %relay_url, |
| @@ -862,6 +894,7 @@ impl SyncManager { | |||
| 862 | all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | 894 | all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), |
| 863 | is_generic_filter = is_generic_filter, | 895 | is_generic_filter = is_generic_filter, |
| 864 | announcements_synced = state.announcements_synced, | 896 | announcements_synced = state.announcements_synced, |
| 897 | had_failures = state.historic_sync_had_failures, | ||
| 865 | "Batch confirmed - items moved from pending to confirmed" | 898 | "Batch confirmed - items moved from pending to confirmed" |
| 866 | ); | 899 | ); |
| 867 | } else { | 900 | } else { |
| @@ -881,13 +914,24 @@ impl SyncManager { | |||
| 881 | 914 | ||
| 882 | /// Check if historic sync is complete and transition to Connected status | 915 | /// Check if historic sync is complete and transition to Connected status |
| 883 | /// | 916 | /// |
| 884 | /// This method checks if there are any pending batches for the relay. | 917 | /// This method uses a double-check pattern to avoid race conditions with |
| 885 | /// If no pending batches exist and the relay is in Syncing status, | 918 | /// the self-subscriber's batching window. The sequence is: |
| 886 | /// it transitions to Connected and updates metrics. | 919 | /// |
| 920 | /// 1. First check: Are there pending batches? | ||
| 921 | /// 2. Wait for batch window + buffer (6 seconds) | ||
| 922 | /// 3. Second check: Are there still no pending batches? | ||
| 923 | /// 4. If still no pending batches, transition to Connected | ||
| 924 | /// | ||
| 925 | /// This ensures that events received just before the first check have time | ||
| 926 | /// to be batched and create Layer 2/3 filters before we mark sync complete. | ||
| 927 | /// | ||
| 928 | /// The 6-second delay is based on: | ||
| 929 | /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) | ||
| 930 | /// - Buffer for processing: 1 second | ||
| 887 | /// | 931 | /// |
| 888 | /// Called after each batch is confirmed to detect completion. | 932 | /// Called after each batch is confirmed to detect completion. |
| 889 | async fn check_and_complete_historic_sync(&self, relay_url: &str) { | 933 | async fn check_and_complete_historic_sync(&self, relay_url: &str) { |
| 890 | // Check if there are any pending batches | 934 | // First check: Are there any pending batches? |
| 891 | let has_pending = { | 935 | let has_pending = { |
| 892 | let pending = self.pending_sync_index.read().await; | 936 | let pending = self.pending_sync_index.read().await; |
| 893 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) | 937 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) |
| @@ -898,12 +942,33 @@ impl SyncManager { | |||
| 898 | return; | 942 | return; |
| 899 | } | 943 | } |
| 900 | 944 | ||
| 901 | // No pending batches - check if we should transition to Connected | 945 | // Wait for self-subscriber batch window + buffer to catch any in-flight events |
| 946 | // that might create new Layer 2/3 filters | ||
| 947 | tokio::time::sleep(Duration::from_millis(6000)).await; | ||
| 948 | |||
| 949 | // Second check: Are there still no pending batches? | ||
| 950 | let has_pending = { | ||
| 951 | let pending = self.pending_sync_index.read().await; | ||
| 952 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) | ||
| 953 | }; | ||
| 954 | |||
| 955 | if has_pending { | ||
| 956 | // New batches appeared during the wait - still syncing | ||
| 957 | return; | ||
| 958 | } | ||
| 959 | |||
| 960 | // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded | ||
| 902 | let mut relay_index = self.relay_sync_index.write().await; | 961 | let mut relay_index = self.relay_sync_index.write().await; |
| 903 | if let Some(state) = relay_index.get_mut(relay_url) { | 962 | if let Some(state) = relay_index.get_mut(relay_url) { |
| 904 | if state.connection_status == ConnectionStatus::Syncing { | 963 | if state.connection_status == ConnectionStatus::Syncing { |
| 905 | // Transition to Connected | 964 | // Check if any batches failed during historic sync |
| 906 | state.connection_status = ConnectionStatus::Connected; | 965 | let new_status = if state.historic_sync_had_failures { |
| 966 | ConnectionStatus::ConnectedDegraded | ||
| 967 | } else { | ||
| 968 | ConnectionStatus::Connected | ||
| 969 | }; | ||
| 970 | |||
| 971 | state.connection_status = new_status; | ||
| 907 | state.historic_sync_completed = true; | 972 | state.historic_sync_completed = true; |
| 908 | state.historic_sync_completed_at = Some(Timestamp::now()); | 973 | state.historic_sync_completed_at = Some(Timestamp::now()); |
| 909 | 974 | ||
| @@ -911,12 +976,15 @@ impl SyncManager { | |||
| 911 | relay = %relay_url, | 976 | relay = %relay_url, |
| 912 | repos_synced = state.repos.len(), | 977 | repos_synced = state.repos.len(), |
| 913 | root_events_synced = state.root_events.len(), | 978 | root_events_synced = state.root_events.len(), |
| 914 | "Historic sync complete - transitioned to Connected status" | 979 | had_failures = state.historic_sync_had_failures, |
| 980 | status = ?new_status, | ||
| 981 | "Historic sync complete - transitioned to {} status", | ||
| 982 | if state.historic_sync_had_failures { "ConnectedDegraded" } else { "Connected" } | ||
| 915 | ); | 983 | ); |
| 916 | 984 | ||
| 917 | // Update metrics | 985 | // Update metrics |
| 918 | if let Some(ref metrics) = self.metrics { | 986 | if let Some(ref metrics) = self.metrics { |
| 919 | metrics.record_connection_status(relay_url, ConnectionStatus::Connected); | 987 | metrics.record_connection_status(relay_url, new_status); |
| 920 | } | 988 | } |
| 921 | } | 989 | } |
| 922 | } | 990 | } |
| @@ -1147,8 +1215,8 @@ impl SyncManager { | |||
| 1147 | ); | 1215 | ); |
| 1148 | return; | 1216 | return; |
| 1149 | } | 1217 | } |
| 1150 | Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { | 1218 | Some(status) if status.is_live_sync_active() => { |
| 1151 | // Continue to subscribe - both Syncing and Connected can accept new filters | 1219 | // Continue to subscribe - live sync is active, can accept new filters |
| 1152 | } | 1220 | } |
| 1153 | } | 1221 | } |
| 1154 | 1222 | ||
| @@ -1514,8 +1582,8 @@ impl SyncManager { | |||
| 1514 | "Cleared sync state in fresh_start" | 1582 | "Cleared sync state in fresh_start" |
| 1515 | ); | 1583 | ); |
| 1516 | } | 1584 | } |
| 1517 | // Only sync if we're connected (either Syncing or fully Connected) | 1585 | // Only sync if we're connected (live sync active) |
| 1518 | if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { | 1586 | if state.connection_status.is_live_sync_active() { |
| 1519 | drop(index); | 1587 | drop(index); |
| 1520 | self.sync_generic_filters(relay_url, None).await; | 1588 | self.sync_generic_filters(relay_url, None).await; |
| 1521 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) | 1589 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) |
| @@ -2474,6 +2542,7 @@ impl SyncManager { | |||
| 2474 | requested_event_ids: None, // Will be set after negentropy diff | 2542 | requested_event_ids: None, // Will be set after negentropy diff |
| 2475 | received_event_ids: None, // Will be set after negentropy diff | 2543 | received_event_ids: None, // Will be set after negentropy diff |
| 2476 | retry_count: 0, | 2544 | retry_count: 0, |
| 2545 | failed: false, | ||
| 2477 | }; | 2546 | }; |
| 2478 | 2547 | ||
| 2479 | // Add to pending_sync_index | 2548 | // Add to pending_sync_index |
| @@ -2712,6 +2781,7 @@ impl SyncManager { | |||
| 2712 | requested_event_ids: None, // Not used for REQ+EOSE | 2781 | requested_event_ids: None, // Not used for REQ+EOSE |
| 2713 | received_event_ids: None, // Not used for REQ+EOSE | 2782 | received_event_ids: None, // Not used for REQ+EOSE |
| 2714 | retry_count: 0, // Not used for REQ+EOSE | 2783 | retry_count: 0, // Not used for REQ+EOSE |
| 2784 | failed: false, | ||
| 2715 | }; | 2785 | }; |
| 2716 | 2786 | ||
| 2717 | // Add to pending_sync_index | 2787 | // Add to pending_sync_index |
| @@ -2925,12 +2995,14 @@ mod tests { | |||
| 2925 | requested_event_ids: Some(HashSet::new()), | 2995 | requested_event_ids: Some(HashSet::new()), |
| 2926 | received_event_ids: Some(HashSet::new()), | 2996 | received_event_ids: Some(HashSet::new()), |
| 2927 | retry_count: 0, | 2997 | retry_count: 0, |
| 2998 | failed: false, | ||
| 2928 | }; | 2999 | }; |
| 2929 | 3000 | ||
| 2930 | assert!(batch.requested_event_ids.is_some()); | 3001 | assert!(batch.requested_event_ids.is_some()); |
| 2931 | assert!(batch.received_event_ids.is_some()); | 3002 | assert!(batch.received_event_ids.is_some()); |
| 2932 | assert_eq!(batch.sync_method, SyncMethod::Negentropy); | 3003 | assert_eq!(batch.sync_method, SyncMethod::Negentropy); |
| 2933 | assert_eq!(batch.retry_count, 0); | 3004 | assert_eq!(batch.retry_count, 0); |
| 3005 | assert!(!batch.failed); | ||
| 2934 | } | 3006 | } |
| 2935 | 3007 | ||
| 2936 | #[test] | 3008 | #[test] |
| @@ -2945,10 +3017,12 @@ mod tests { | |||
| 2945 | requested_event_ids: None, | 3017 | requested_event_ids: None, |
| 2946 | received_event_ids: None, | 3018 | received_event_ids: None, |
| 2947 | retry_count: 0, | 3019 | retry_count: 0, |
| 3020 | failed: false, | ||
| 2948 | }; | 3021 | }; |
| 2949 | 3022 | ||
| 2950 | assert!(batch.requested_event_ids.is_none()); | 3023 | assert!(batch.requested_event_ids.is_none()); |
| 2951 | assert!(batch.received_event_ids.is_none()); | 3024 | assert!(batch.received_event_ids.is_none()); |
| 2952 | assert_eq!(batch.sync_method, SyncMethod::ReqEose); | 3025 | assert_eq!(batch.sync_method, SyncMethod::ReqEose); |
| 3026 | assert!(!batch.failed); | ||
| 2953 | } | 3027 | } |
| 2954 | } | 3028 | } |