diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 13:28:11 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 13:28:11 +0000 |
| commit | c34492069abacae67482af4c8356241958a524f7 (patch) | |
| tree | fd9b8ca3c26a96742bad4e9e359a20fc37c998aa | |
| parent | eb10e85f199266affd3bca0a3d4cd934f74f3e7f (diff) | |
feat(sync): add Syncing connection status to track historic sync progress
- Add ConnectionStatus::Syncing state between Connecting and Connected
- Track historic_sync_completed and historic_sync_completed_at in RelayState
- Auto-detect sync completion via check_and_complete_historic_sync()
- Update metrics: ngit_sync_relay_connected now shows 0-3 (disconnected/connecting/syncing/connected)
- Update Prometheus metric documentation with new status values
- Add state machine diagram showing Syncing transition
- Operators can now distinguish 'connected but catching up' vs 'fully synced'
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync.md | 54 | ||||
| -rw-r--r-- | docs/explanation/monitoring.md | 83 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 29 | ||||
| -rw-r--r-- | src/sync/mod.rs | 90 |
4 files changed, 200 insertions, 56 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index 461bde7..e1fb367 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md | |||
| @@ -75,7 +75,9 @@ pub enum ConnectionStatus { | |||
| 75 | Disconnected, | 75 | Disconnected, |
| 76 | /// Connection attempt in progress | 76 | /// Connection attempt in progress |
| 77 | Connecting, | 77 | Connecting, |
| 78 | /// Successfully connected and subscribed | 78 | /// Successfully connected, historic sync in progress |
| 79 | Syncing, | ||
| 80 | /// Successfully connected, historic sync completed | ||
| 79 | Connected, | 81 | Connected, |
| 80 | } | 82 | } |
| 81 | 83 | ||
| @@ -97,6 +99,11 @@ pub struct RelayState { | |||
| 97 | /// Whether announcement filter historic sync has completed for this relay | 99 | /// Whether announcement filter historic sync has completed for this relay |
| 98 | /// Used to determine if we can use `since` filter on reconnect for Layer 1 | 100 | /// Used to determine if we can use `since` filter on reconnect for Layer 1 |
| 99 | pub announcements_synced: bool, | 101 | pub announcements_synced: bool, |
| 102 | /// Whether initial historic sync has fully completed (all layers) | ||
| 103 | /// Used to transition from Syncing -> Connected status | ||
| 104 | pub historic_sync_completed: bool, | ||
| 105 | /// When historic sync completed (None if never completed or cleared on fresh_start) | ||
| 106 | pub historic_sync_completed_at: Option<Timestamp>, | ||
| 100 | } | 107 | } |
| 101 | 108 | ||
| 102 | impl RelayState { | 109 | impl RelayState { |
| @@ -198,25 +205,52 @@ When a relay doesn't support NIP-77 Negentropy, historic sync falls back to trad | |||
| 198 | stateDiagram-v2 | 205 | stateDiagram-v2 |
| 199 | [*] --> Disconnected: discover relay → register_relay() | 206 | [*] --> Disconnected: discover relay → register_relay() |
| 200 | Disconnected --> Connecting: retry_disconnected_relays → try_connect_relay | 207 | Disconnected --> Connecting: retry_disconnected_relays → try_connect_relay |
| 201 | Connecting --> Connected: success → handle_connect_or_reconnect | 208 | Connecting --> Syncing: success → handle_connect_or_reconnect |
| 202 | Connecting --> Disconnected: failure + record in health tracker | 209 | Connecting --> Disconnected: failure + record in health tracker |
| 210 | Syncing --> Connected: all historic batches complete → check_and_complete_historic_sync | ||
| 211 | Syncing --> Disconnected: connection lost → handle_disconnect | ||
| 203 | Connected --> Disconnected: connection lost → handle_disconnect | 212 | Connected --> Disconnected: connection lost → handle_disconnect |
| 204 | Connected --> [*]: intentional disconnect via check_disconnects | 213 | Connected --> [*]: intentional disconnect via check_disconnects |
| 205 | 214 | ||
| 206 | note right of Disconnected: disconnected_at set for 15min rule<br/>RelayConnection kept in HashMap | 215 | note right of Disconnected: disconnected_at set for 15min rule<br/>RelayConnection kept in HashMap |
| 207 | note right of Connected: last_connected tracked for since filter<br/>Event loop spawned here | ||
| 208 | note right of Connecting: connection attempt with timeout | 216 | note right of Connecting: connection attempt with timeout |
| 217 | note right of Syncing: historic sync in progress<br/>event loop spawned here | ||
| 218 | note right of Connected: historic sync complete<br/>last_connected tracked for since filter | ||
| 209 | ``` | 219 | ``` |
| 210 | 220 | ||
| 211 | ### Connection Flow Methods | 221 | ### Connection Flow Methods |
| 212 | 222 | ||
| 213 | | Method | Purpose | When Called | Actions | | 223 | | Method | Purpose | When Called | Actions | |
| 214 | | ------------------------------- | ------------------------- | --------------------------------- | --------------------------------------------------------------- | | 224 | | ----------------------------------- | ---------------------------- | --------------------------------- | --------------------------------------------------------------- | |
| 215 | | `register_relay()` | Initialize relay tracking | Discovery via RepoSyncIndex | Creates RelayConnection, stores in HashMap, returns immediately | | 225 | | `register_relay()` | Initialize relay tracking | Discovery via RepoSyncIndex | Creates RelayConnection, stores in HashMap, returns immediately | |
| 216 | | `try_connect_relay()` | Attempt connection | Health tracker allows retry | Calls connection.connect(), sends notification on success | | 226 | | `try_connect_relay()` | Attempt connection | Health tracker allows retry | Calls connection.connect(), sends notification on success | |
| 217 | | `handle_connect_or_reconnect()` | Setup after connection | ConnectNotification received | Spawns event loop, updates state, decides sync strategy | | 227 | | `handle_connect_or_reconnect()` | Setup after connection | ConnectNotification received | Spawns event loop, sets Syncing, decides sync strategy | |
| 218 | | `handle_disconnect()` | Cleanup after disconnect | DisconnectNotification received | Updates state, clears pending, KEEPS RelayConnection | | 228 | | `check_and_complete_historic_sync()` | Detect sync completion | After each batch confirmation | Transitions Syncing → Connected when no pending batches | |
| 219 | | `retry_disconnected_relays()` | Periodic reconnection | Every 2s (health & metrics timer) | For each ready relay: try_connect_relay() | | 229 | | `handle_disconnect()` | Cleanup after disconnect | DisconnectNotification received | Updates state, clears pending, KEEPS RelayConnection | |
| 230 | | `retry_disconnected_relays()` | Periodic reconnection | Every 2s (health & metrics timer) | For each ready relay: try_connect_relay() | | ||
| 231 | |||
| 232 | ### Historic Sync Completion | ||
| 233 | |||
| 234 | When a relay first connects, it enters the **Syncing** state and begins historic sync: | ||
| 235 | |||
| 236 | 1. **Layer 1 (Announcements)**: Generic filter for all repository announcements | ||
| 237 | 2. **Layer 2 (Repo Events)**: Filters for events tagging discovered repositories | ||
| 238 | 3. **Layer 3 (Root Events)**: Filters for events tagging discovered PRs/Issues/Patches | ||
| 239 | |||
| 240 | Each layer creates one or more `PendingBatch` entries tracked in `PendingSyncIndex`. As EOSE messages arrive: | ||
| 241 | |||
| 242 | - `handle_eose()` confirms each batch via `confirm_batch()` | ||
| 243 | - `confirm_batch()` moves items to confirmed state and calls `check_and_complete_historic_sync()` | ||
| 244 | - `check_and_complete_historic_sync()` checks if `PendingSyncIndex` is empty for this relay | ||
| 245 | - When empty: transitions `Syncing` → `Connected`, sets `historic_sync_completed = true` | ||
| 246 | |||
| 247 | **Metrics tracking**: The `ngit_sync_relay_connected` metric shows: | ||
| 248 | - `0` = Disconnected | ||
| 249 | - `1` = Connecting | ||
| 250 | - `2` = Syncing (historic sync in progress) | ||
| 251 | - `3` = Connected (historic sync complete, live sync active) | ||
| 252 | |||
| 253 | This allows operators to monitor sync progress and distinguish between "connected but still catching up" vs "fully synced and live". | ||
| 220 | 254 | ||
| 221 | ### Event Loop Lifecycle | 255 | ### Event Loop Lifecycle |
| 222 | 256 | ||
diff --git a/docs/explanation/monitoring.md b/docs/explanation/monitoring.md index 9368bf4..d2d20c0 100644 --- a/docs/explanation/monitoring.md +++ b/docs/explanation/monitoring.md | |||
| @@ -98,54 +98,64 @@ When GRASP-02 proactive sync is implemented, the following metrics will be added | |||
| 98 | 98 | ||
| 99 | | Metric | Type | Labels | Description | | 99 | | Metric | Type | Labels | Description | |
| 100 | |--------|------|--------|-------------| | 100 | |--------|------|--------|-------------| |
| 101 | | `ngit_sync_relay_connected` | Gauge | relay | 1 if connected, 0 if not | | 101 | | `ngit_sync_relay_connected` | Gauge | relay | Connection status (0=disconnected, 1=connecting, 2=syncing, 3=connected) | |
| 102 | | `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes | | 102 | | `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes | |
| 103 | | `ngit_sync_relay_status` | Gauge | relay, status | 1 for current status, 0 otherwise | | 103 | | `ngit_sync_relay_status` | Gauge | relay | Health status (1=healthy, 2=disconnected, 3=degraded, 4=dead, 5=rate_limited) | |
| 104 | | `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count | | 104 | | `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count | |
| 105 | | `ngit_sync_events_total` | Counter | source | Events received by source type | | 105 | | `ngit_sync_events_synced_total` | Counter | - | Events synced (newly saved events only) | |
| 106 | | `ngit_sync_gap_events_total` | Counter | relay | Events found during catchup | | ||
| 107 | | `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | | 106 | | `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | |
| 108 | | `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count | | 107 | | `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count | |
| 109 | | `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead | | 108 | | `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead | |
| 110 | 109 | ||
| 111 | ### Event Sources | 110 | ### Connection Status Values |
| 112 | 111 | ||
| 113 | The `source` label on `ngit_sync_events_total` tracks how events were received: | 112 | The `ngit_sync_relay_connected` metric tracks the connection lifecycle: |
| 114 | 113 | ||
| 115 | - `direct` - Submitted directly to our relay by a user | 114 | - `0` = **Disconnected** - Not currently connected |
| 116 | - `live_sync` - Received via live WebSocket subscription (expected path) | 115 | - `1` = **Connecting** - Connection attempt in progress |
| 117 | - `catchup` - Found during negentropy catchup after reconnect | 116 | - `2` = **Syncing** - Connected, historic sync in progress |
| 118 | - `daily_catchup` - Found during daily reconciliation | 117 | - `3` = **Connected** - Connected, historic sync complete, live sync active |
| 119 | 118 | ||
| 120 | **Catchup events indicate sync failures** - these should have been received via live sync. High catchup rates suggest connectivity issues or filter mismatches. | 119 | This allows operators to distinguish between "connected but still catching up" (Syncing) vs "fully synced and live" (Connected). |
| 121 | 120 | ||
| 122 | ### Relay Health States | 121 | ### Relay Health States |
| 123 | 122 | ||
| 124 | The `status` label on `ngit_sync_relay_status` tracks relay health: | 123 | The `ngit_sync_relay_status` metric tracks relay health: |
| 125 | 124 | ||
| 126 | - `healthy` - Normal operation, connections working | 125 | - `1` = **Healthy** - Connected and stable |
| 127 | - `backoff` - Exponential backoff after failures (5s → 10s → ... → 1h) | 126 | - `2` = **Disconnected** - Not connected, but no issues detected |
| 128 | - `dead` - 24h of continuous failures, daily retry only | 127 | - `3` = **Degraded** - Connection problems or unstable after recovery |
| 128 | - `4` = **Dead** - 24h+ of continuous failures | ||
| 129 | - `5` = **RateLimited** - Rate limit cooldown active (65s) | ||
| 129 | 130 | ||
| 130 | ### Example Grafana Queries | 131 | ### Example Grafana Queries |
| 131 | 132 | ||
| 132 | ```promql | 133 | ```promql |
| 133 | # Relay health overview - count by status | 134 | # Relay connection status overview - count by status |
| 134 | sum by (status) (ngit_sync_relay_status == 1) | 135 | sum by (relay) (ngit_sync_relay_connected == 0) # Disconnected |
| 136 | sum by (relay) (ngit_sync_relay_connected == 1) # Connecting | ||
| 137 | sum by (relay) (ngit_sync_relay_connected == 2) # Syncing | ||
| 138 | sum by (relay) (ngit_sync_relay_connected == 3) # Connected | ||
| 139 | |||
| 140 | # Relays still syncing (not yet fully caught up) | ||
| 141 | count(ngit_sync_relay_connected == 2) | ||
| 135 | 142 | ||
| 136 | # Connection success rate over last hour | 143 | # Connection success rate over last hour |
| 137 | sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) | 144 | sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) |
| 138 | / sum(rate(ngit_sync_connection_attempts_total[1h])) | 145 | / sum(rate(ngit_sync_connection_attempts_total[1h])) |
| 139 | 146 | ||
| 140 | # Sync gap detection - events that should have been live synced | 147 | # Event sync rate (newly saved events) |
| 141 | sum(rate(ngit_sync_gap_events_total[1h])) by (relay) | 148 | rate(ngit_sync_events_synced_total[5m]) |
| 142 | |||
| 143 | # Live sync effectiveness (lower is better - fewer gaps) | ||
| 144 | sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) | ||
| 145 | / sum(rate(ngit_sync_events_total[1h])) | ||
| 146 | 149 | ||
| 147 | # Relays with high failure counts (potential issues) | 150 | # Relays with high failure counts (potential issues) |
| 148 | topk(10, ngit_sync_relay_failures) | 151 | topk(10, ngit_sync_relay_failures) |
| 152 | |||
| 153 | # Relay health overview - count by health state | ||
| 154 | sum(ngit_sync_relay_status == 1) # Healthy | ||
| 155 | sum(ngit_sync_relay_status == 2) # Disconnected | ||
| 156 | sum(ngit_sync_relay_status == 3) # Degraded | ||
| 157 | sum(ngit_sync_relay_status == 4) # Dead | ||
| 158 | sum(ngit_sync_relay_status == 5) # RateLimited | ||
| 149 | ``` | 159 | ``` |
| 150 | 160 | ||
| 151 | ### Example Alerts | 161 | ### Example Alerts |
| @@ -153,23 +163,30 @@ topk(10, ngit_sync_relay_failures) | |||
| 153 | ```yaml | 163 | ```yaml |
| 154 | # Alert if relay stuck in dead state for > 1 day | 164 | # Alert if relay stuck in dead state for > 1 day |
| 155 | - alert: SyncRelayDead | 165 | - alert: SyncRelayDead |
| 156 | expr: ngit_sync_relay_status{status="dead"} == 1 | 166 | expr: ngit_sync_relay_status == 4 # Dead state |
| 157 | for: 1d | 167 | for: 1d |
| 158 | labels: | 168 | labels: |
| 159 | severity: warning | 169 | severity: warning |
| 160 | annotations: | 170 | annotations: |
| 161 | summary: "Sync relay {{ $labels.relay }} is dead" | 171 | summary: "Sync relay {{ $labels.relay }} is dead (24h+ failures)" |
| 162 | 172 | ||
| 163 | # Alert if sync gap rate is high (>10% of events from catchup) | 173 | # Alert if relay stuck in syncing state for > 1 hour |
| 164 | - alert: SyncGapHigh | 174 | - alert: SyncRelaySlow |
| 165 | expr: > | 175 | expr: ngit_sync_relay_connected == 2 # Syncing state |
| 166 | sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) | 176 | for: 1h |
| 167 | / sum(rate(ngit_sync_events_total[1h])) > 0.1 | 177 | labels: |
| 168 | for: 30m | 178 | severity: info |
| 179 | annotations: | ||
| 180 | summary: "Sync relay {{ $labels.relay }} taking >1h to complete historic sync" | ||
| 181 | |||
| 182 | # Alert if too many relays are degraded | ||
| 183 | - alert: SyncManyDegraded | ||
| 184 | expr: sum(ngit_sync_relay_status == 3) > 5 # Degraded state | ||
| 185 | for: 15m | ||
| 169 | labels: | 186 | labels: |
| 170 | severity: warning | 187 | severity: warning |
| 171 | annotations: | 188 | annotations: |
| 172 | summary: "High sync gap rate - {{ $value | humanizePercentage }} of events from catchup" | 189 | summary: "{{ $value }} relays in degraded state" |
| 173 | ``` | 190 | ``` |
| 174 | 191 | ||
| 175 | ### Design Rationale | 192 | ### Design Rationale |
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 453a79c..db7dd20 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -53,7 +53,7 @@ impl SyncMetrics { | |||
| 53 | let relay_connected = IntGaugeVec::new( | 53 | let relay_connected = IntGaugeVec::new( |
| 54 | Opts::new( | 54 | Opts::new( |
| 55 | "ngit_sync_relay_connected", | 55 | "ngit_sync_relay_connected", |
| 56 | "Relay connection status (1=connected, 0=disconnected)", | 56 | "Relay connection status (0=disconnected, 1=connecting, 2=syncing, 3=connected)", |
| 57 | ), | 57 | ), |
| 58 | &["relay"], | 58 | &["relay"], |
| 59 | )?; | 59 | )?; |
| @@ -201,6 +201,33 @@ impl SyncMetrics { | |||
| 201 | .set(state_value); | 201 | .set(state_value); |
| 202 | } | 202 | } |
| 203 | 203 | ||
| 204 | /// Record relay connection status change. | ||
| 205 | /// | ||
| 206 | /// Maps connection status to numeric values for Prometheus: | ||
| 207 | /// - Disconnected = 0 (not connected) | ||
| 208 | /// - Connecting = 1 (connection attempt in progress) | ||
| 209 | /// - Syncing = 2 (connected, historic sync in progress) | ||
| 210 | /// - Connected = 3 (connected, historic sync complete) | ||
| 211 | /// | ||
| 212 | /// This is separate from health state and provides more granular connection lifecycle tracking. | ||
| 213 | /// | ||
| 214 | /// # Arguments | ||
| 215 | /// | ||
| 216 | /// * `relay` - The relay URL | ||
| 217 | /// * `status` - The current connection status | ||
| 218 | pub fn record_connection_status(&self, relay: &str, status: super::ConnectionStatus) { | ||
| 219 | use super::ConnectionStatus; | ||
| 220 | let status_value = match status { | ||
| 221 | ConnectionStatus::Disconnected => 0, | ||
| 222 | ConnectionStatus::Connecting => 1, | ||
| 223 | ConnectionStatus::Syncing => 2, | ||
| 224 | ConnectionStatus::Connected => 3, | ||
| 225 | }; | ||
| 226 | self.relay_connected | ||
| 227 | .with_label_values(&[relay]) | ||
| 228 | .set(status_value); | ||
| 229 | } | ||
| 230 | |||
| 204 | /// Record relay failure count. | 231 | /// Record relay failure count. |
| 205 | /// | 232 | /// |
| 206 | /// # Arguments | 233 | /// # Arguments |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 00668ac..e5b724d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -90,7 +90,9 @@ pub enum ConnectionStatus { | |||
| 90 | Disconnected, | 90 | Disconnected, |
| 91 | /// Connection attempt in progress | 91 | /// Connection attempt in progress |
| 92 | Connecting, | 92 | Connecting, |
| 93 | /// Successfully connected and subscribed | 93 | /// Successfully connected, historic sync in progress |
| 94 | Syncing, | ||
| 95 | /// Successfully connected, historic sync completed | ||
| 94 | Connected, | 96 | Connected, |
| 95 | } | 97 | } |
| 96 | 98 | ||
| @@ -112,6 +114,11 @@ pub struct RelayState { | |||
| 112 | /// Whether announcement filter historic sync has completed for this relay | 114 | /// Whether announcement filter historic sync has completed for this relay |
| 113 | /// Used to determine if we can use `since` filter on reconnect for Layer 1 | 115 | /// Used to determine if we can use `since` filter on reconnect for Layer 1 |
| 114 | pub announcements_synced: bool, | 116 | pub announcements_synced: bool, |
| 117 | /// Whether initial historic sync has fully completed (all layers) | ||
| 118 | /// Used to transition from Syncing -> Connected status | ||
| 119 | pub historic_sync_completed: bool, | ||
| 120 | /// When historic sync completed (None if never completed or cleared on fresh_start) | ||
| 121 | pub historic_sync_completed_at: Option<Timestamp>, | ||
| 115 | } | 122 | } |
| 116 | 123 | ||
| 117 | impl Default for RelayState { | 124 | impl Default for RelayState { |
| @@ -124,6 +131,8 @@ impl Default for RelayState { | |||
| 124 | last_connected: None, | 131 | last_connected: None, |
| 125 | disconnected_at: None, | 132 | disconnected_at: None, |
| 126 | announcements_synced: false, | 133 | announcements_synced: false, |
| 134 | historic_sync_completed: false, | ||
| 135 | historic_sync_completed_at: None, | ||
| 127 | } | 136 | } |
| 128 | } | 137 | } |
| 129 | } | 138 | } |
| @@ -145,6 +154,8 @@ impl RelayState { | |||
| 145 | self.repos.clear(); | 154 | self.repos.clear(); |
| 146 | self.root_events.clear(); | 155 | self.root_events.clear(); |
| 147 | self.announcements_synced = false; | 156 | self.announcements_synced = false; |
| 157 | self.historic_sync_completed = false; | ||
| 158 | self.historic_sync_completed_at = None; | ||
| 148 | } | 159 | } |
| 149 | } | 160 | } |
| 150 | 161 | ||
| @@ -860,6 +871,55 @@ impl SyncManager { | |||
| 860 | "Batch completed but no RelayState found for relay" | 871 | "Batch completed but no RelayState found for relay" |
| 861 | ); | 872 | ); |
| 862 | } | 873 | } |
| 874 | |||
| 875 | // Release lock before checking if historic sync is complete | ||
| 876 | drop(relay_index); | ||
| 877 | |||
| 878 | // Check if all historic sync is complete (no more pending batches) | ||
| 879 | self.check_and_complete_historic_sync(relay_url).await; | ||
| 880 | } | ||
| 881 | |||
| 882 | /// Check if historic sync is complete and transition to Connected status | ||
| 883 | /// | ||
| 884 | /// This method checks if there are any pending batches for the relay. | ||
| 885 | /// If no pending batches exist and the relay is in Syncing status, | ||
| 886 | /// it transitions to Connected and updates metrics. | ||
| 887 | /// | ||
| 888 | /// Called after each batch is confirmed to detect completion. | ||
| 889 | async fn check_and_complete_historic_sync(&self, relay_url: &str) { | ||
| 890 | // Check if there are any pending batches | ||
| 891 | let has_pending = { | ||
| 892 | let pending = self.pending_sync_index.read().await; | ||
| 893 | pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) | ||
| 894 | }; | ||
| 895 | |||
| 896 | if has_pending { | ||
| 897 | // Still syncing, don't transition yet | ||
| 898 | return; | ||
| 899 | } | ||
| 900 | |||
| 901 | // No pending batches - check if we should transition to Connected | ||
| 902 | let mut relay_index = self.relay_sync_index.write().await; | ||
| 903 | if let Some(state) = relay_index.get_mut(relay_url) { | ||
| 904 | if state.connection_status == ConnectionStatus::Syncing { | ||
| 905 | // Transition to Connected | ||
| 906 | state.connection_status = ConnectionStatus::Connected; | ||
| 907 | state.historic_sync_completed = true; | ||
| 908 | state.historic_sync_completed_at = Some(Timestamp::now()); | ||
| 909 | |||
| 910 | tracing::info!( | ||
| 911 | relay = %relay_url, | ||
| 912 | repos_synced = state.repos.len(), | ||
| 913 | root_events_synced = state.root_events.len(), | ||
| 914 | "Historic sync complete - transitioned to Connected status" | ||
| 915 | ); | ||
| 916 | |||
| 917 | // Update metrics | ||
| 918 | if let Some(ref metrics) = self.metrics { | ||
| 919 | metrics.record_connection_status(relay_url, ConnectionStatus::Connected); | ||
| 920 | } | ||
| 921 | } | ||
| 922 | } | ||
| 863 | } | 923 | } |
| 864 | 924 | ||
| 865 | /// Perform a daily sync for a specific relay | 925 | /// Perform a daily sync for a specific relay |
| @@ -1087,8 +1147,8 @@ impl SyncManager { | |||
| 1087 | ); | 1147 | ); |
| 1088 | return; | 1148 | return; |
| 1089 | } | 1149 | } |
| 1090 | Some(ConnectionStatus::Connected) => { | 1150 | Some(ConnectionStatus::Syncing) | Some(ConnectionStatus::Connected) => { |
| 1091 | // Continue to subscribe | 1151 | // Continue to subscribe - both Syncing and Connected can accept new filters |
| 1092 | } | 1152 | } |
| 1093 | } | 1153 | } |
| 1094 | 1154 | ||
| @@ -1137,18 +1197,18 @@ impl SyncManager { | |||
| 1137 | index.get(relay_url).and_then(|s| s.last_connected) | 1197 | index.get(relay_url).and_then(|s| s.last_connected) |
| 1138 | }; | 1198 | }; |
| 1139 | 1199 | ||
| 1140 | // 2. Update state to Connected | 1200 | // 2. Update state to Syncing (will transition to Connected after historic sync completes) |
| 1141 | { | 1201 | { |
| 1142 | let mut index = self.relay_sync_index.write().await; | 1202 | let mut index = self.relay_sync_index.write().await; |
| 1143 | let state = index.entry(relay_url.to_string()).or_default(); | 1203 | let state = index.entry(relay_url.to_string()).or_default(); |
| 1144 | state.connection_status = ConnectionStatus::Connected; | 1204 | state.connection_status = ConnectionStatus::Syncing; |
| 1145 | state.last_connected = Some(Timestamp::now()); | 1205 | state.last_connected = Some(Timestamp::now()); |
| 1146 | state.disconnected_at = None; | 1206 | state.disconnected_at = None; |
| 1147 | } | 1207 | } |
| 1148 | 1208 | ||
| 1149 | // Update metrics | 1209 | // Update metrics - record as syncing initially |
| 1150 | if let Some(ref metrics) = self.metrics { | 1210 | if let Some(ref metrics) = self.metrics { |
| 1151 | metrics.set_relay_connected(relay_url, true); | 1211 | metrics.record_connection_status(relay_url, ConnectionStatus::Syncing); |
| 1152 | metrics.inc_connected_count(); | 1212 | metrics.inc_connected_count(); |
| 1153 | } | 1213 | } |
| 1154 | 1214 | ||
| @@ -1454,7 +1514,8 @@ impl SyncManager { | |||
| 1454 | "Cleared sync state in fresh_start" | 1514 | "Cleared sync state in fresh_start" |
| 1455 | ); | 1515 | ); |
| 1456 | } | 1516 | } |
| 1457 | if state.connection_status == ConnectionStatus::Connected { | 1517 | // Only sync if we're connected (either Syncing or fully Connected) |
| 1518 | if matches!(state.connection_status, ConnectionStatus::Syncing | ConnectionStatus::Connected) { | ||
| 1458 | drop(index); | 1519 | drop(index); |
| 1459 | self.sync_generic_filters(relay_url, None).await; | 1520 | self.sync_generic_filters(relay_url, None).await; |
| 1460 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) | 1521 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) |
| @@ -1632,13 +1693,18 @@ impl SyncManager { | |||
| 1632 | /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. | 1693 | /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. |
| 1633 | /// On failure, updates state and health tracker. | 1694 | /// On failure, updates state and health tracker. |
| 1634 | async fn try_connect_relay(&mut self, relay_url: &str) { | 1695 | async fn try_connect_relay(&mut self, relay_url: &str) { |
| 1635 | // 1. Mark attempting (optional, helpful for debugging) | 1696 | // 1. Mark attempting and update metrics |
| 1636 | { | 1697 | { |
| 1637 | let mut index = self.relay_sync_index.write().await; | 1698 | let mut index = self.relay_sync_index.write().await; |
| 1638 | if let Some(state) = index.get_mut(relay_url) { | 1699 | if let Some(state) = index.get_mut(relay_url) { |
| 1639 | state.connection_status = ConnectionStatus::Connecting; | 1700 | state.connection_status = ConnectionStatus::Connecting; |
| 1640 | } | 1701 | } |
| 1641 | } | 1702 | } |
| 1703 | |||
| 1704 | // Update metrics to show connecting status | ||
| 1705 | if let Some(ref metrics) = self.metrics { | ||
| 1706 | metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); | ||
| 1707 | } | ||
| 1642 | 1708 | ||
| 1643 | // 2. Record attempt in health tracker | 1709 | // 2. Record attempt in health tracker |
| 1644 | self.health_tracker.record_attempt(relay_url); | 1710 | self.health_tracker.record_attempt(relay_url); |
| @@ -1688,8 +1754,8 @@ impl SyncManager { | |||
| 1688 | // 6. Update metrics | 1754 | // 6. Update metrics |
| 1689 | if let Some(ref metrics) = self.metrics { | 1755 | if let Some(ref metrics) = self.metrics { |
| 1690 | metrics.record_connection_attempt(relay_url, false); | 1756 | metrics.record_connection_attempt(relay_url, false); |
| 1691 | metrics | 1757 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 1692 | .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1758 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); |
| 1693 | } | 1759 | } |
| 1694 | } | 1760 | } |
| 1695 | } | 1761 | } |
| @@ -1806,7 +1872,7 @@ impl SyncManager { | |||
| 1806 | 1872 | ||
| 1807 | // Update metrics | 1873 | // Update metrics |
| 1808 | if let Some(ref metrics) = self.metrics { | 1874 | if let Some(ref metrics) = self.metrics { |
| 1809 | metrics.set_relay_connected(relay_url, false); | 1875 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 1810 | metrics.dec_connected_count(); | 1876 | metrics.dec_connected_count(); |
| 1811 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1877 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); |
| 1812 | } | 1878 | } |