diff options
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync.md | 729 | ||||
| -rw-r--r-- | src/sync/algorithms.rs | 33 | ||||
| -rw-r--r-- | src/sync/mod.rs | 700 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 8 |
4 files changed, 1120 insertions, 350 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index 2a86126..34b7bb6 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md | |||
| @@ -4,12 +4,11 @@ | |||
| 4 | 4 | ||
| 5 | This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles: | 5 | This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles: |
| 6 | 6 | ||
| 7 | 1. **Self-subscription as the only mechanism** - No database initialization at startup | 7 | 1. **Triggers call compute_actions → sync_computed_filters** - Self-subscriber batches and connect/reconnect events trigger this flow |
| 8 | 2. **compute_actions as single decision point** - Determines what NEW subscriptions to create | 8 | 2. **Clear separation of live vs historic sync** - Two distinct primitives with different purposes |
| 9 | 3. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) | 9 | 3. **Layer 1 on connect, Layer 2+3 via AddFilters** - L1 handled at connection time, L2+L3 flow through compute_actions |
| 10 | 4. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch | 10 | 4. **Always clear PendingSyncIndex first** - Before any reconnect/consolidate operation |
| 11 | 5. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary | 11 | 5. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported |
| 12 | 6. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported | ||
| 13 | 12 | ||
| 14 | --- | 13 | --- |
| 15 | 14 | ||
| @@ -90,7 +89,6 @@ impl RelayState { | |||
| 90 | ### PendingSyncIndex (In-Flight Batches) | 89 | ### PendingSyncIndex (In-Flight Batches) |
| 91 | 90 | ||
| 92 | ```rust | 91 | ```rust |
| 93 | |||
| 94 | /// Method used for synchronization | 92 | /// Method used for synchronization |
| 95 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 93 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| 96 | pub enum SyncMethod { | 94 | pub enum SyncMethod { |
| @@ -100,7 +98,6 @@ pub enum SyncMethod { | |||
| 100 | Negentropy, | 98 | Negentropy, |
| 101 | } | 99 | } |
| 102 | 100 | ||
| 103 | |||
| 104 | /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. | 101 | /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. |
| 105 | /// Each batch has its own ID and can confirm independently. | 102 | /// Each batch has its own ID and can confirm independently. |
| 106 | /// Key: relay URL | 103 | /// Key: relay URL |
| @@ -118,7 +115,6 @@ pub struct PendingBatch { | |||
| 118 | pub sync_method: SyncMethod, | 115 | pub sync_method: SyncMethod, |
| 119 | } | 116 | } |
| 120 | 117 | ||
| 121 | |||
| 122 | #[derive(Debug, Clone, Default)] | 118 | #[derive(Debug, Clone, Default)] |
| 123 | pub struct PendingItems { | 119 | pub struct PendingItems { |
| 124 | pub repos: HashSet<String>, | 120 | pub repos: HashSet<String>, |
| @@ -145,152 +141,202 @@ stateDiagram-v2 | |||
| 145 | 141 | ||
| 146 | --- | 142 | --- |
| 147 | 143 | ||
| 148 | ## Flow Scenarios | 144 | ## Core Architecture: Live vs Historic Sync |
| 145 | |||
| 146 | The sync system is built on two fundamental primitives that are clearly separated: | ||
| 147 | |||
| 148 | ### Sync Primitives | ||
| 149 | |||
| 150 | | Primitive | Purpose | Filter Modifier | Tracking | | ||
| 151 | | ----------------- | ----------------------- | ---------------- | ---------------- | | ||
| 152 | | `sync_live()` | Ongoing event stream | `limit: 0` | Not tracked | | ||
| 153 | | `historic_sync()` | Catch up on past events | Optional `since` | PendingSyncIndex | | ||
| 154 | |||
| 155 | ### Why `limit: 0` for Live Sync? | ||
| 156 | |||
| 157 | | Approach | Pros | Cons | | ||
| 158 | | ------------ | --------------------------------------- | --------------------------------- | | ||
| 159 | | `since: now` | Intuitive | Time-sensitive, clock skew issues | | ||
| 160 | | `limit: 0` | Deterministic, mirrors filter structure | Less intuitive name | | ||
| 161 | |||
| 162 | `limit: 0` is better because: | ||
| 163 | |||
| 164 | 1. **No time dependency**: Doesn't depend on synchronized clocks | ||
| 165 | 2. **Mirrors historic filters**: Same tag structure, just different limit | ||
| 166 | 3. **State reconstruction**: Can rebuild from repo/event lists without timestamps | ||
| 167 | |||
| 168 | ### Layer Strategy | ||
| 169 | |||
| 170 | | Layer | Content | When Subscribed | Managed By | | ||
| 171 | | ------- | --------------------------------------- | --------------------- | -------------------- | | ||
| 172 | | Layer 1 | 30617 Announcements, 30618 Maintainers | On connect (any type) | Connection lifecycle | | ||
| 173 | | Layer 2 | Events tagging our repos (a/A/q tags) | Via AddFilters | compute_actions | | ||
| 174 | | Layer 3 | Events tagging root events (e/E/q tags) | Via AddFilters | compute_actions | | ||
| 149 | 175 | ||
| 150 | ### Scenario 1: Initial Connect | 176 | **Key insight**: Layer 1 is connection-level (handled at connect time), Layer 2+3 are item-level (flow through compute_actions → sync_computed_filters). |
| 177 | |||
| 178 | --- | ||
| 179 | |||
| 180 | ## Triggers and Flow | ||
| 181 | |||
| 182 | ### What Triggers compute_actions → sync_computed_filters? | ||
| 183 | |||
| 184 | | Trigger | When | What Happens | | ||
| 185 | | --------------------------- | -------------------------------------- | ---------------------------------------- | | ||
| 186 | | Self-subscriber batch fires | New events discovered on own relay | Update RepoSyncIndex → compute_actions | | ||
| 187 | | fresh_start() | Initial connect, long_reconnect, daily | After L1 setup → compute_actions | | ||
| 188 | | quick_reconnect() | Reconnect < 15 minutes | After L1+L2+L3 catchup → compute_actions | | ||
| 189 | | consolidate() | Filter count > threshold | After live rebuild → compute_actions | | ||
| 190 | |||
| 191 | ### The Core Flow | ||
| 151 | 192 | ||
| 152 | ```mermaid | 193 | ```mermaid |
| 153 | flowchart TB | 194 | flowchart TB |
| 154 | START[Startup] --> SS[Self-subscribe to own relay] | 195 | TRIGGER[Trigger fires] --> CA[compute_actions] |
| 155 | SS --> |no since filter| EVENTS[Receive historical events] | 196 | CA --> |derives from| RSI[RepoSyncIndex] |
| 156 | EVENTS --> RSI[Update RepoSyncIndex] | 197 | CA --> |subtracts| RLI[RelaySyncIndex] |
| 157 | RSI --> DT[derive_relay_targets] | 198 | CA --> |subtracts| PSI[PendingSyncIndex] |
| 158 | DT --> CA[compute_actions with targets and empty confirmed] | 199 | CA --> |produces| AF[AddFilters actions] |
| 159 | CA --> AF[AddFilters for each relay] | 200 | AF --> SFRE[sync_computed_filters] |
| 160 | AF --> SPAWN{Relay connected?} | 201 | SFRE --> LIVE[sync_live - L2+L3] |
| 161 | SPAWN --> |no| CONN[spawn_connection] | 202 | SFRE --> HIST[historic_sync - L2+L3] |
| 162 | CONN --> HC[handle_connect_or_reconnect] | 203 | HIST --> PSI_UPDATE[Update PendingSyncIndex] |
| 163 | SPAWN --> |yes| SUB | 204 | PSI_UPDATE --> |EOSE received| CONFIRM[Move to RelaySyncIndex] |
| 164 | 205 | ``` | |
| 165 | subgraph handle_connect_or_reconnect - Fresh Sync | 206 | |
| 166 | HC --> CHECK_FRESH{is_fresh_sync?} | 207 | --- |
| 167 | CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] | 208 | |
| 168 | L1 --> RCA[recompute_actions_for_relay] | 209 | ## Flow Scenarios |
| 169 | end | ||
| 170 | 210 | ||
| 171 | RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] | 211 | ### Scenario 1: Fresh Start (Initial Connect / Long Reconnect / Daily Sync) |
| 172 | SUB --> PB[Create PendingBatch] | 212 | |
| 213 | ```mermaid | ||
| 214 | flowchart TB | ||
| 215 | START[fresh_start called] --> CLEAR_PSI[Clear PendingSyncIndex] | ||
| 216 | CLEAR_PSI --> CLEAR_RSI[Clear RelaySyncIndex] | ||
| 217 | CLEAR_RSI --> L1_LIVE[L1: sync_live - announcements] | ||
| 218 | L1_LIVE --> L1_HIST[L1: historic_sync - no since] | ||
| 219 | L1_HIST --> NEG{NIP-77 supported?} | ||
| 220 | NEG --> |yes| NEGENTROPY[negentropy sync] | ||
| 221 | NEG --> |no| REQ[REQ+EOSE] | ||
| 222 | NEGENTROPY --> CA[compute_actions] | ||
| 223 | REQ --> CA | ||
| 224 | CA --> |empty RelaySyncIndex| AF[AddFilters for ALL repos] | ||
| 225 | AF --> SFRE[sync_computed_filters] | ||
| 226 | SFRE --> L23_LIVE[L2+L3: sync_live] | ||
| 227 | SFRE --> L23_HIST[L2+L3: historic_sync] | ||
| 228 | L23_HIST --> PB[Create PendingBatch] | ||
| 173 | PB --> EOSE[Wait for EOSE] | 229 | PB --> EOSE[Wait for EOSE] |
| 174 | EOSE --> CONFIRM[Move items to confirmed repos/root_events] | 230 | EOSE --> CONFIRM[Move items to RelaySyncIndex] |
| 175 | ``` | 231 | ``` |
| 176 | 232 | ||
| 177 | **Key points:** | 233 | **Key points:** |
| 178 | 234 | ||
| 179 | - No `since` filter on initial connect - get full history | 235 | - Always clear PendingSyncIndex first, then RelaySyncIndex |
| 180 | - `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` | 236 | - L1 live + L1 historic (uses negentropy if available) |
| 181 | - Layer 1: `build_announcement_filter(None)` - subscribed immediately without since | 237 | - Empty RelaySyncIndex means diff produces AddFilters for everything |
| 182 | - Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking | 238 | - L2+L3 flow through sync_computed_filters with proper pending tracking |
| 183 | 239 | ||
| 184 | ### Scenario 2: Quick Reconnect (less than 15 minutes) | 240 | ### Scenario 2: Quick Reconnect (< 15 minutes) |
| 185 | 241 | ||
| 186 | ```mermaid | 242 | ```mermaid |
| 187 | flowchart TB | 243 | flowchart TB |
| 188 | DISC[Connection lost] --> MARK[Set disconnected_at = now] | 244 | DISC[Connection lost] --> MARK[Set disconnected_at = now] |
| 189 | MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] | 245 | MARK --> WAIT[Wait for reconnection < 15min] |
| 190 | CLEAR_PEND --> WAIT[Wait for reconnection] | ||
| 191 | WAIT --> RECONN[Connection restored] | 246 | WAIT --> RECONN[Connection restored] |
| 192 | RECONN --> HC[handle_connect_or_reconnect] | 247 | RECONN --> CLEAR_PSI[Clear PendingSyncIndex] |
| 193 | 248 | CLEAR_PSI --> L1_LIVE[L1: sync_live - announcements] | |
| 194 | subgraph handle_connect_or_reconnect - Quick Reconnect | 249 | L1_LIVE --> L1_HIST[L1: historic_sync WITH since] |
| 195 | HC --> CHECK{is_fresh_sync?} | 250 | L1_HIST --> RECON[reconstruct_filters from RelaySyncIndex] |
| 196 | CHECK --> |no - last_connected exists AND less than 15min| SINCE[since = last_connected - 15min] | 251 | RECON --> L23_LIVE[L2+L3: sync_live] |
| 197 | SINCE --> L1[build_announcement_filter - with since] | 252 | RECON --> L23_HIST[L2+L3: historic_sync WITH since] |
| 198 | L1 --> L23[rebuild_layer2_and_layer3 - with since] | 253 | L23_HIST --> CA[compute_actions] |
| 199 | L23 --> RCA[recompute_actions_for_relay] | 254 | CA --> |check for new items| AF{New items?} |
| 200 | end | 255 | AF --> |yes| SFRE[sync_computed_filters] |
| 201 | 256 | AF --> |no| DONE[Done] | |
| 202 | RCA --> AF[AddFilters for new items only] | 257 | SFRE --> PB[Create PendingBatch] |
| 203 | AF --> SUB[Subscribe] | ||
| 204 | SUB --> PB[Create PendingBatch] | ||
| 205 | PB --> EOSE[Wait for EOSE] | ||
| 206 | EOSE --> EXTEND[Extend confirmed state] | ||
| 207 | ``` | 258 | ``` |
| 208 | 259 | ||
| 209 | **Key points:** | 260 | **Key points:** |
| 210 | 261 | ||
| 211 | - PendingSyncIndex cleared on disconnect (not reconnect) | 262 | - Clear PendingSyncIndex first (old subscriptions are dead) |
| 212 | - `handle_connect_or_reconnect`: | 263 | - L1 live (always on any connection) |
| 213 | 1. `build_announcement_filter(Some(since))` - Layer 1 with since | 264 | - L1 historic WITH since (catches up missed announcements) |
| 214 | 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since | 265 | - L2+L3 rebuilt from RelaySyncIndex (confirmed state preserved) |
| 215 | 3. `recompute_actions_for_relay` - check for new items | 266 | - compute_actions checks for any NEW items discovered during catchup |
| 216 | - since = last_connected - 15min ensures we catch events during disconnection | ||
| 217 | 267 | ||
| 218 | ### Scenario 3: Stale Reconnect (greater than 15 minutes) | 268 | ### Scenario 3: Long Reconnect (> 15 minutes) |
| 219 | 269 | ||
| 220 | ```mermaid | 270 | ```mermaid |
| 221 | flowchart TB | 271 | flowchart TB |
| 222 | RECONN[Connection restored] --> HC[handle_connect_or_reconnect] | 272 | RECONN[Connection restored > 15min] --> METRIC[Record disconnect/reconnect metric] |
| 273 | METRIC --> FRESH[fresh_start] | ||
| 274 | FRESH --> |same as initial connect| DONE[Full sync initiated] | ||
| 275 | ``` | ||
| 223 | 276 | ||
| 224 | subgraph handle_connect_or_reconnect - Stale Reconnect | 277 | **Key points:** |
| 225 | HC --> CHECK{is_fresh_sync?} | ||
| 226 | CHECK --> |yes - disconnected greater than 15min| CLEAR[clear_sync_state] | ||
| 227 | CLEAR --> L1[build_announcement_filter - no since] | ||
| 228 | L1 --> RCA[recompute_actions_for_relay] | ||
| 229 | end | ||
| 230 | 278 | ||
| 231 | RCA --> CA[compute_actions with empty confirmed] | 279 | - Records disconnect/reconnect as a metric |
| 232 | CA --> AF[AddFilters for everything] | 280 | - Delegates to fresh_start() - same as initial connect |
| 233 | AF --> SUB[Subscribe - no since filter] | 281 | - State too stale to trust, start fresh |
| 234 | SUB --> PB[Create PendingBatch] | 282 | |
| 235 | PB --> EOSE[Wait for EOSE] | 283 | ### Scenario 4: Consolidation (Filter Count > Threshold) |
| 236 | EOSE --> CONFIRM[Populate confirmed state fresh] | 284 | |
| 285 | ```mermaid | ||
| 286 | flowchart TB | ||
| 287 | CHECK[Filter count check] --> THRESHOLD{count > 70?} | ||
| 288 | THRESHOLD --> |yes| CLEAR_PSI[Clear PendingSyncIndex] | ||
| 289 | CLEAR_PSI --> UNSUB[unsubscribe_all] | ||
| 290 | UNSUB --> RECON[reconstruct_filters from RelaySyncIndex] | ||
| 291 | RECON --> L1_LIVE[L1: sync_live] | ||
| 292 | RECON --> L23_LIVE[L2+L3: sync_live] | ||
| 293 | L23_LIVE --> CA[compute_actions] | ||
| 294 | CA --> |check for new items| AF{New items?} | ||
| 295 | AF --> |yes| SFRE[sync_computed_filters] | ||
| 296 | AF --> |no| DONE[Done] | ||
| 297 | THRESHOLD --> |no| SKIP[Continue normally] | ||
| 237 | ``` | 298 | ``` |
| 238 | 299 | ||
| 239 | **Key points:** | 300 | **Key points:** |
| 240 | 301 | ||
| 241 | - `should_clear_state()` returns true → triggers fresh sync | 302 | - Clear PendingSyncIndex first |
| 242 | - Same path as initial connect after clearing state | 303 | - NO historic sync needed - items already synced/syncing |
| 243 | - Layer 1: `build_announcement_filter(None)` - full history | 304 | - Only rebuilds live subscriptions from confirmed state |
| 244 | - Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything | 305 | - compute_actions catches any new items that need syncing |
| 245 | 306 | ||
| 246 | ### Scenario 4: Consolidation (Triggered on Filter Add) | 307 | ### Scenario 5: Daily Sync (23-25h Random Timer) |
| 247 | 308 | ||
| 248 | ```mermaid | 309 | ```mermaid |
| 249 | flowchart TB | 310 | flowchart TB |
| 250 | AF[handle_add_filters called] --> COUNT{current + new > 70?} | 311 | TIMER[Daily timer fires] --> FRESH[fresh_start] |
| 251 | COUNT --> |yes| CONSOLIDATE[consolidate] | 312 | FRESH --> |NO disconnect metric| DONE[Full sync initiated] |
| 252 | CONSOLIDATE --> WAIT_PEND[wait_pending_complete] | ||
| 253 | WAIT_PEND --> CLOSE[unsubscribe_all] | ||
| 254 | CLOSE --> SINCE[since = now - 15min] | ||
| 255 | SINCE --> L1[build_announcement_filter - with since] | ||
| 256 | L1 --> L23[rebuild_layer2_and_layer3 - with since] | ||
| 257 | COUNT --> |no| SUB[Subscribe new filters] | ||
| 258 | SUB --> PB[Create PendingBatch] | ||
| 259 | ``` | 313 | ``` |
| 260 | 314 | ||
| 261 | **Key points:** | 315 | **Key points:** |
| 262 | 316 | ||
| 263 | - Consolidation checked in `handle_add_filters` BEFORE adding new filters | 317 | - Same as fresh_start() but WITHOUT recording disconnect/reconnect metric |
| 264 | - After closing all subscriptions, re-subscribe: | 318 | - Ensures consistency, detects any drift accumulated over 24 hours |
| 265 | 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since | ||
| 266 | 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since | ||
| 267 | - `since = now - 15min` prevents re-fetching old events | ||
| 268 | - Keeps confirmed state, just reduces filter count | ||
| 269 | 319 | ||
| 270 | ### Scenario 5: Daily Timer (23-25h Random) | 320 | ### Scenario 6: Self-Subscriber Batch |
| 271 | 321 | ||
| 272 | ```mermaid | 322 | ```mermaid |
| 273 | flowchart TB | 323 | flowchart TB |
| 274 | DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] | 324 | EVENTS[Events from own relay] --> QUEUE[Queue to pending batch] |
| 275 | CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] | 325 | QUEUE --> TIMER[Batch timer fires - 5 seconds] |
| 276 | CLEAR_PEND --> CLEAR_STATE[clear_sync_state] | 326 | TIMER --> UPDATE[Update RepoSyncIndex] |
| 277 | CLEAR_STATE --> L1[build_announcement_filter - no since] | 327 | UPDATE --> CA[compute_actions] |
| 278 | L1 --> RCA[recompute_actions_for_relay] | 328 | CA --> |new repos/events discovered| AF[AddFilters] |
| 279 | RCA --> CA[compute_actions with empty confirmed] | 329 | AF --> SFRE[sync_computed_filters] |
| 280 | CA --> AF[AddFilters for everything] | 330 | SFRE --> LIVE[sync_live - L2+L3] |
| 281 | AF --> SUB[Subscribe - no since filter] | 331 | SFRE --> HIST[historic_sync - L2+L3] |
| 282 | SUB --> PB[Create PendingBatch] | ||
| 283 | PB --> EOSE[Wait for EOSE] | ||
| 284 | EOSE --> CONFIRM[Repopulate confirmed state] | ||
| 285 | ``` | 332 | ``` |
| 286 | 333 | ||
| 287 | **Key points:** | 334 | **Key points:** |
| 288 | 335 | ||
| 289 | - Daily timer is a full fresh sync, NOT consolidation | 336 | - Self-subscriber monitors own relay for 30617, 1617, 1618, 1619, 1621 |
| 290 | - Clears both PendingSyncIndex and confirmed state | 337 | - Batches events (5 second window) |
| 291 | - Layer 1: `build_announcement_filter(None)` - full history | 338 | - Updates RepoSyncIndex, then compute_actions finds new work |
| 292 | - Layer 2+3: via compute_actions with empty confirmed - full history | 339 | - New items flow through sync_computed_filters |
| 293 | - Detects any state drift accumulated over 24 hours | ||
| 294 | 340 | ||
| 295 | --- | 341 | --- |
| 296 | 342 | ||
| @@ -300,8 +346,6 @@ flowchart TB | |||
| 300 | 346 | ||
| 301 | Transforms the repo-centric `RepoSyncIndex` into a relay-centric view. For each relay URL mentioned in any repo's announcements, collects all the repos and root events that should be synced from that relay. | 347 | Transforms the repo-centric `RepoSyncIndex` into a relay-centric view. For each relay URL mentioned in any repo's announcements, collects all the repos and root events that should be synced from that relay. |
| 302 | 348 | ||
| 303 | **Implementation:** [`derive_relay_targets()`](../../src/sync/algorithms.rs:61) | ||
| 304 | |||
| 305 | ```rust | 349 | ```rust |
| 306 | // Conceptual: inverts repo → relays to relay → repos | 350 | // Conceptual: inverts repo → relays to relay → repos |
| 307 | fn derive_relay_targets(repo_index: &HashMap<String, RepoSyncNeeds>) | 351 | fn derive_relay_targets(repo_index: &HashMap<String, RepoSyncNeeds>) |
| @@ -320,104 +364,262 @@ Performs a three-way diff: `target - pending - confirmed = new` | |||
| 320 | 364 | ||
| 321 | Only creates `AddFilters` actions for items not already pending or confirmed. Skips disconnected relays (they will get AddFilters on reconnect). | 365 | Only creates `AddFilters` actions for items not already pending or confirmed. Skips disconnected relays (they will get AddFilters on reconnect). |
| 322 | 366 | ||
| 323 | **Implementation:** [`compute_actions()`](../../src/sync/algorithms.rs:96) | 367 | ```rust |
| 368 | fn compute_actions( | ||
| 369 | targets: &HashMap<String, RelaySyncNeeds>, | ||
| 370 | pending: &PendingSyncIndex, | ||
| 371 | confirmed: &RelaySyncIndex, | ||
| 372 | ) -> Vec<AddFilters> | ||
| 373 | ``` | ||
| 324 | 374 | ||
| 325 | --- | 375 | --- |
| 326 | 376 | ||
| 327 | ## Filter Building (Three-Layer Strategy) | 377 | ## Method Specifications |
| 378 | |||
| 379 | ### Primitives | ||
| 380 | |||
| 381 | #### `sync_live()` - Live Subscriptions | ||
| 382 | |||
| 383 | ```rust | ||
| 384 | /// Set up live subscription (filters with limit: 0) | ||
| 385 | /// | ||
| 386 | /// - Uses `limit: 0` to receive only new events | ||
| 387 | /// - NOT tracked in PendingSyncIndex (state reconstructable) | ||
| 388 | async fn sync_live(&self, relay_url: &str, filters: &[Filter]) | ||
| 389 | ``` | ||
| 390 | |||
| 391 | #### `historic_sync()` - Historical Sync Dispatcher | ||
| 328 | 392 | ||
| 329 | The filter strategy uses three layers to ensure comprehensive event coverage: | 393 | ```rust |
| 394 | /// Dispatch to appropriate historic sync method based on relay capabilities | ||
| 395 | /// | ||
| 396 | /// Both paths update PendingSyncIndex to ensure consistent lifecycle tracking. | ||
| 397 | async fn historic_sync( | ||
| 398 | &mut self, | ||
| 399 | relay_url: &str, | ||
| 400 | filters: Vec<Filter>, | ||
| 401 | items: PendingItems, | ||
| 402 | since: Option<Timestamp>, | ||
| 403 | ) -> Option<u64> // Returns batch_id | ||
| 404 | ``` | ||
| 405 | |||
| 406 | Dispatches to: | ||
| 407 | |||
| 408 | - `historic_sync_negentropy()` - NIP-77 parallel sync (if supported) | ||
| 409 | - `historic_sync_legacy()` - REQ+EOSE fallback | ||
| 410 | |||
| 411 | ### Building Blocks | ||
| 412 | |||
| 413 | #### `reconstruct_filters()` - Rebuild from Confirmed State | ||
| 414 | |||
| 415 | ```rust | ||
| 416 | /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY) | ||
| 417 | /// | ||
| 418 | /// Returns raw Vec<Filter> for L1+L2+L3. | ||
| 419 | /// Used by: quick_reconnect, consolidate | ||
| 420 | /// Does NOT include pending items - those flow through AddFilters path. | ||
| 421 | async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter> | ||
| 422 | ``` | ||
| 423 | |||
| 424 | #### `sync_computed_filters()` - Handle New AddFilters | ||
| 425 | |||
| 426 | ```rust | ||
| 427 | /// Process AddFilters action (from compute_actions) | ||
| 428 | /// | ||
| 429 | /// Orchestrates both live and historic sync for NEW items: | ||
| 430 | /// 1. sync_live() - set up permanent L2+L3 subscriptions | ||
| 431 | /// 2. historic_sync() - catch up on past events | ||
| 432 | /// | ||
| 433 | /// This is specifically for NEW filter discovery. | ||
| 434 | async fn sync_computed_filters( | ||
| 435 | &mut self, | ||
| 436 | action: AddFilters, | ||
| 437 | since: Option<Timestamp>, | ||
| 438 | ) -> Option<u64> | ||
| 439 | ``` | ||
| 440 | |||
| 441 | ### Top-Level Entry Points | ||
| 442 | |||
| 443 | #### `fresh_start()` - Clean Slate Sync | ||
| 444 | |||
| 445 | ```rust | ||
| 446 | /// Fresh start - clears state and does full sync | ||
| 447 | /// | ||
| 448 | /// Called by: initial connect, long_reconnect, daily_sync | ||
| 449 | /// | ||
| 450 | /// Flow: | ||
| 451 | /// 1. Clear PendingSyncIndex | ||
| 452 | /// 2. Clear RelaySyncIndex | ||
| 453 | /// 3. L1 live + L1 historic (negentropy if available) | ||
| 454 | /// 4. compute_actions → AddFilters → sync_computed_filters for L2+L3 | ||
| 455 | async fn fresh_start(&mut self, relay_url: &str) | ||
| 456 | ``` | ||
| 457 | |||
| 458 | #### `quick_reconnect()` - Short Disconnection Recovery | ||
| 459 | |||
| 460 | ```rust | ||
| 461 | /// Quick reconnect - for disconnections < 15 minutes | ||
| 462 | /// | ||
| 463 | /// Flow: | ||
| 464 | /// 1. Clear PendingSyncIndex | ||
| 465 | /// 2. L1 live + L1 historic(since) | ||
| 466 | /// 3. reconstruct_filters → L2+L3 live + L2+L3 historic(since) | ||
| 467 | /// 4. compute_actions for any new items | ||
| 468 | async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) | ||
| 469 | ``` | ||
| 470 | |||
| 471 | #### `long_reconnect()` - Extended Disconnection Recovery | ||
| 472 | |||
| 473 | ```rust | ||
| 474 | /// Long reconnect - for disconnections > 15 minutes | ||
| 475 | /// | ||
| 476 | /// Flow: | ||
| 477 | /// 1. Record disconnect/reconnect metric | ||
| 478 | /// 2. fresh_start() | ||
| 479 | async fn long_reconnect(&mut self, relay_url: &str) | ||
| 480 | ``` | ||
| 481 | |||
| 482 | #### `daily_sync()` - Scheduled Full Refresh | ||
| 483 | |||
| 484 | ```rust | ||
| 485 | /// Daily sync - full refresh without disconnect metrics | ||
| 486 | /// | ||
| 487 | /// Flow: fresh_start() (no disconnect metric recorded) | ||
| 488 | async fn daily_sync(&mut self, relay_url: &str) | ||
| 489 | ``` | ||
| 490 | |||
| 491 | #### `consolidate()` - Filter Count Reduction | ||
| 492 | |||
| 493 | ```rust | ||
| 494 | /// Consolidate subscriptions when filter count exceeds threshold | ||
| 495 | /// | ||
| 496 | /// Flow: | ||
| 497 | /// 1. Clear PendingSyncIndex | ||
| 498 | /// 2. unsubscribe_all | ||
| 499 | /// 3. reconstruct_filters → sync_live only (L1+L2+L3) | ||
| 500 | /// 4. compute_actions for any new items | ||
| 501 | /// | ||
| 502 | /// NO historic sync - items already synced, just reducing subscriptions | ||
| 503 | async fn consolidate(&mut self, relay_url: &str) | ||
| 504 | ``` | ||
| 505 | |||
| 506 | #### `handle_new_sync_filters()` - New Filter Discovery | ||
| 507 | |||
| 508 | ```rust | ||
| 509 | /// Handle AddFilters action from compute_actions | ||
| 510 | /// | ||
| 511 | /// Flow: | ||
| 512 | /// 1. Check/spawn connection if needed | ||
| 513 | /// 2. maybe_consolidate (check filter threshold) | ||
| 514 | /// 3. sync_computed_filters | ||
| 515 | async fn handle_new_sync_filters(&mut self, action: AddFilters) | ||
| 516 | ``` | ||
| 517 | |||
| 518 | --- | ||
| 519 | |||
| 520 | ## Method Relationships Summary | ||
| 521 | |||
| 522 | ``` | ||
| 523 | fresh_start(relay_url) // Initial/long_reconnect/daily | ||
| 524 | ├──> Clear PendingSyncIndex | ||
| 525 | ├──> Clear RelaySyncIndex | ||
| 526 | ├──> L1: sync_live(announcement_filter) | ||
| 527 | ├──> L1: historic_sync(announcement_filter, None) | ||
| 528 | └──> compute_actions → AddFilters → sync_computed_filters (L2+L3) | ||
| 529 | |||
| 530 | quick_reconnect(relay_url, since) // Disconnected < 15 min | ||
| 531 | ├──> Clear PendingSyncIndex | ||
| 532 | ├──> L1: sync_live(announcement_filter) | ||
| 533 | ├──> L1: historic_sync(announcement_filter, since) | ||
| 534 | ├──> reconstruct_filters() → L2+L3 filters | ||
| 535 | ├──> L2+L3: sync_live(filters) | ||
| 536 | ├──> L2+L3: historic_sync(filters, since) | ||
| 537 | └──> compute_actions → AddFilters → sync_computed_filters (new items only) | ||
| 538 | |||
| 539 | long_reconnect(relay_url) // Disconnected > 15 min | ||
| 540 | ├──> Record disconnect/reconnect metric | ||
| 541 | └──> fresh_start() | ||
| 542 | |||
| 543 | daily_sync(relay_url) // Timer fires | ||
| 544 | └──> fresh_start() // No disconnect metric | ||
| 545 | |||
| 546 | consolidate(relay_url) // Filter count > threshold | ||
| 547 | ├──> Clear PendingSyncIndex | ||
| 548 | ├──> unsubscribe_all() | ||
| 549 | ├──> reconstruct_filters() → L1+L2+L3 filters | ||
| 550 | ├──> sync_live(filters) // Live only, NO historic | ||
| 551 | └──> compute_actions → AddFilters → sync_computed_filters (new items only) | ||
| 552 | |||
| 553 | handle_new_sync_filters(action) // New filter discovery | ||
| 554 | ├──> Check/spawn connection | ||
| 555 | ├──> maybe_consolidate() | ||
| 556 | └──> sync_computed_filters(action, None) | ||
| 557 | |||
| 558 | sync_computed_filters(action, since) // Process AddFilters | ||
| 559 | ├──> sync_live(action.filters) // L2+L3 live | ||
| 560 | └──> historic_sync(action.filters, since) // L2+L3 historic | ||
| 561 | ├── historic_sync_negentropy() // Parallel, updates Pending | ||
| 562 | └── historic_sync_legacy() // REQ+EOSE, updates Pending | ||
| 563 | ``` | ||
| 564 | |||
| 565 | --- | ||
| 566 | |||
| 567 | ## Filter Building (Three-Layer Strategy) | ||
| 330 | 568 | ||
| 331 | ### Layer 1: Announcements | 569 | ### Layer 1: Announcements |
| 332 | 570 | ||
| 333 | - **Kinds**: 30617 (Repository Announcements), 30618 (Maintainer Lists) | 571 | - **Kinds**: 30617 (Repository Announcements), 30618 (Maintainer Lists) |
| 334 | - **When subscribed**: ONCE on connect, NOT rebuilt during consolidation | 572 | - **When subscribed**: On connect (any type) - handled by connection lifecycle |
| 335 | - **Function**: [`build_announcement_filter()`](../../src/sync/filters.rs:20) | 573 | - **Function**: `build_announcement_filter(since: Option<Timestamp>)` |
| 336 | - 30618 is ONLY synced from remote relays, not self-subscribed | 574 | - 30618 is ONLY synced from remote relays, not self-subscribed |
| 337 | 575 | ||
| 338 | ### Layer 2: Events Tagging Our Repos | 576 | ### Layer 2: Events Tagging Our Repos |
| 339 | 577 | ||
| 340 | - **Tags**: lowercase `a`, uppercase `A`, and `q` tags for comprehensive coverage | 578 | - **Tags**: lowercase `a`, uppercase `A`, and `q` tags for comprehensive coverage |
| 341 | - **Batching**: Per 100 repo refs | 579 | - **Batching**: Per 100 repo refs |
| 342 | - **Function**: [`tagged_one_of_our_repo_event_filters()`](../../src/sync/filters.rs:43) | 580 | - **Function**: `build_repo_tag_filters(repos, since)` |
| 343 | 581 | ||
| 344 | ### Layer 3: Events Tagging Our Root Events | 582 | ### Layer 3: Events Tagging Our Root Events |
| 345 | 583 | ||
| 346 | - **Tags**: lowercase `e`, uppercase `E`, and `q` tags for comprehensive coverage | 584 | - **Tags**: lowercase `e`, uppercase `E`, and `q` tags for comprehensive coverage |
| 347 | - **Batching**: Per 100 event IDs | 585 | - **Batching**: Per 100 event IDs |
| 348 | - **Function**: [`tagged_one_of_our_root_event_filters()`](../../src/sync/filters.rs:98) | 586 | - **Function**: `build_root_event_tag_filters(root_events, since)` |
| 349 | 587 | ||
| 350 | ### Combined Layer 2+3 | 588 | ### Combined Layer 2+3 |
| 351 | 589 | ||
| 352 | The [`build_layer2_and_layer3_filters()`](../../src/sync/filters.rs:152) function combines both layers. Used by: | 590 | The `build_layer2_and_layer3_filters()` function combines both layers. Used by: |
| 353 | |||
| 354 | - `compute_actions` for incremental subscriptions | ||
| 355 | - `rebuild_layer2_and_layer3` during reconnection | ||
| 356 | - Consolidation rebuilds (Layer 1 remains active separately) | ||
| 357 | 591 | ||
| 358 | **Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). | 592 | - `sync_computed_filters` for new item subscriptions |
| 593 | - `reconstruct_filters` for rebuilding from confirmed state | ||
| 359 | 594 | ||
| 360 | --- | 595 | --- |
| 361 | 596 | ||
| 362 | ## SyncManager Key Methods | 597 | ## NIP-77 Negentropy Sync |
| 363 | |||
| 364 | The [`SyncManager`](../../src/sync/mod.rs:308) orchestrates all sync operations. Key methods: | ||
| 365 | |||
| 366 | ### Connection Lifecycle | ||
| 367 | |||
| 368 | | Method | Purpose | | ||
| 369 | | ------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- | | ||
| 370 | | `handle_connect_or_reconnect()` | Unified handler for initial connect and reconnect. Determines fresh vs quick reconnect based on `last_connected` and 15-minute rule | | ||
| 371 | | `handle_disconnect()` | Updates RelayState to Disconnected, sets disconnected_at, clears pending batches, records failure in health tracker | | ||
| 372 | | `spawn_relay_connection()` | Creates RelayConnection, subscribes to Layer 1, spawns event loop task | | ||
| 373 | |||
| 374 | ### Sync Operations | ||
| 375 | |||
| 376 | | Method | Purpose | | ||
| 377 | | ------------------------------- | ------------------------------------------------------------------------------------------------------------------- | | ||
| 378 | | `handle_add_filters()` | Auto-spawns connection if needed, checks consolidation threshold (>70 filters), subscribes and creates PendingBatch | | ||
| 379 | | `handle_eose()` | Processes EOSE for subscription, moves items from pending to confirmed when batch completes | | ||
| 380 | | `recompute_actions_for_relay()` | Runs derive_relay_targets → compute_actions for a specific relay to find new items | | ||
| 381 | | `rebuild_layer2_and_layer3()` | Rebuilds subscriptions from confirmed state with optional since filter | | ||
| 382 | |||
| 383 | ### Maintenance | ||
| 384 | |||
| 385 | | Method | Purpose | | ||
| 386 | | --------------------- | -------------------------------------------------------------------------- | | ||
| 387 | | `daily_sync()` | Full fresh sync - unsubscribes all, clears state, recomputes actions | | ||
| 388 | | `consolidate()` | Reduces filter count by unsubscribing and rebuilding with combined filters | | ||
| 389 | | `check_disconnects()` | Periodic check for empty relays (no repos) to disconnect | | ||
| 390 | | `check_reconnects()` | Attempts reconnection for disconnected relays with pending work | | ||
| 391 | 598 | ||
| 392 | --- | 599 | ### What is Negentropy? |
| 393 | 600 | ||
| 394 | ## Self-Subscriber | 601 | NIP-77 defines the negentropy protocol for efficient event set comparison. Instead of requesting all events matching a filter (REQ+EOSE), negentropy allows relays to compare fingerprints of their event sets and only transfer the differences. |
| 395 | 602 | ||
| 396 | The [`SelfSubscriber`](../../src/sync/self_subscriber.rs:86) monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`. | 603 | ### When Negentropy is Used |
| 397 | 604 | ||
| 398 | ### Event Kinds Monitored | 605 | Negentropy sync is attempted for: |
| 399 | 606 | ||
| 400 | - **30617** - Repository Announcements (triggers discovery of repos listing our relay) | 607 | - **fresh_start()** - Full sync without `since` |
| 401 | - **1617** - Patches (root events referencing repos) | 608 | - **daily_sync()** - Periodic full refresh (via fresh_start) |
| 402 | - **1618** - Issues | 609 | - **long_reconnect()** - Via fresh_start |
| 403 | - **1619** - Replies/Status | ||
| 404 | - **1621** - Pull Requests | ||
| 405 | 610 | ||
| 406 | Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays. | 611 | Negentropy is NOT used for: |
| 407 | 612 | ||
| 408 | ### Batching Flow | 613 | - **quick_reconnect()** - Uses REQ with `since` (more efficient for small gaps) |
| 614 | - **Live subscriptions** - Always use REQ with `limit: 0` | ||
| 409 | 615 | ||
| 410 | 1. **Receive events** from own relay subscription | 616 | ### Fallback Behavior |
| 411 | 2. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID | ||
| 412 | 3. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events | ||
| 413 | 4. **Process batch**: | ||
| 414 | - Update `RepoSyncIndex` with discovered repos and root events | ||
| 415 | - Call `derive_relay_targets()` → `compute_actions()` | ||
| 416 | - Send `AddFilters` actions to SyncManager | ||
| 417 | 617 | ||
| 418 | ### Reconnection | 618 | If negentropy fails (relay doesn't support NIP-77, network error, etc.): |
| 419 | 619 | ||
| 420 | Uses `last_connected` timestamp to apply since filter on reconnect (15-minute buffer), similar to external relay reconnection logic. | 620 | 1. A warning is logged (once per relay to avoid spam) |
| 621 | 2. The sync falls back to traditional REQ+EOSE | ||
| 622 | 3. No error is raised - fallback is automatic | ||
| 421 | 623 | ||
| 422 | --- | 624 | --- |
| 423 | 625 | ||
| @@ -431,12 +633,14 @@ flowchart TB | |||
| 431 | end | 633 | end |
| 432 | 634 | ||
| 433 | subgraph RepoSyncIndex - What We Want | 635 | subgraph RepoSyncIndex - What We Want |
| 434 | RSI[HashMap: Repo to Relays+Events] | 636 | RSI[HashMap: Repo → Relays+Events] |
| 435 | end | 637 | end |
| 436 | 638 | ||
| 437 | subgraph Derived Target | 639 | subgraph Triggers |
| 438 | DT[derive_relay_targets fn] | 640 | T1[Self-subscriber batch] |
| 439 | TGT[Per-relay: repos + events we should sync] | 641 | T2[fresh_start after L1] |
| 642 | T3[quick_reconnect after catchup] | ||
| 643 | T4[consolidate after live rebuild] | ||
| 440 | end | 644 | end |
| 441 | 645 | ||
| 442 | subgraph compute_actions - Decision Point | 646 | subgraph compute_actions - Decision Point |
| @@ -449,136 +653,41 @@ flowchart TB | |||
| 449 | 653 | ||
| 450 | subgraph RelaySyncIndex - Confirmed State | 654 | subgraph RelaySyncIndex - Confirmed State |
| 451 | RLI[RelayState per relay] | 655 | RLI[RelayState per relay] |
| 452 | CONN[connection_status] | ||
| 453 | REPOS[repos + root_events] | ||
| 454 | TIMES[last_connected + disconnected_at] | ||
| 455 | end | 656 | end |
| 456 | 657 | ||
| 457 | SS -->|subscribe| OWN | 658 | SS -->|subscribe| OWN |
| 458 | OWN -->|events| SS | 659 | OWN -->|events| SS |
| 459 | SS -->|batch fires| RSI | 660 | SS -->|batch fires| RSI |
| 460 | RSI --> DT | 661 | RSI --> T1 |
| 461 | DT --> TGT | 662 | T1 --> CA |
| 462 | TGT --> CA | 663 | T2 --> CA |
| 664 | T3 --> CA | ||
| 665 | T4 --> CA | ||
| 463 | PSI --> CA | 666 | PSI --> CA |
| 464 | RLI --> CA | 667 | RLI --> CA |
| 465 | CA -->|Layer 2+3 new items| AF[AddFilters] | 668 | CA -->|new items| AF[AddFilters] |
| 466 | AF -->|check filter count| CONSOL{count + new > 70?} | 669 | AF --> SFRE[sync_computed_filters] |
| 467 | CONSOL -->|yes| CONSOLIDATE[consolidate] | 670 | SFRE --> LIVE[sync_live L2+L3] |
| 468 | CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] | 671 | SFRE --> HIST[historic_sync L2+L3] |
| 469 | L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] | 672 | HIST --> PSI |
| 470 | CONSOL -->|no| SUB[subscribe] | 673 | PSI -->|EOSE| RLI |
| 471 | AF -->|spawn if needed| CONN | ||
| 472 | SUB --> PSI | ||
| 473 | PSI -->|EOSE| REPOS | ||
| 474 | |||
| 475 | CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] | ||
| 476 | DISC -->|any reconnect| HC[handle_connect_or_reconnect] | ||
| 477 | |||
| 478 | subgraph handle_connect_or_reconnect | ||
| 479 | HC --> FRESH_CHECK{is_fresh_sync?} | ||
| 480 | FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] | ||
| 481 | FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since] | ||
| 482 | L1_FRESH --> RCA1[recompute_actions_for_relay] | ||
| 483 | L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since] | ||
| 484 | L23_QUICK --> RCA2[recompute_actions_for_relay] | ||
| 485 | end | ||
| 486 | ``` | 674 | ``` |
| 487 | 675 | ||
| 488 | --- | 676 | --- |
| 489 | 677 | ||
| 490 | ## Key Design Decisions | 678 | ## Key Design Decisions |
| 491 | 679 | ||
| 492 | | Decision | Choice | Rationale | | 680 | | Decision | Choice | Rationale | |
| 493 | | -------------------------- | -------------------------------------- | --------------------------------------------------------------------------- | | 681 | | ----------------------------- | ------------------------------------------- | ------------------------------------------------------------------ | |
| 494 | | Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | | 682 | | Live vs Historic separation | Two distinct primitives | Clear responsibilities, easier reasoning about state | |
| 495 | | Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | | 683 | | Live sync method | `limit: 0` not `since: now` | No clock dependency, deterministic, mirrors filter structure | |
| 496 | | Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | | 684 | | Layer 1 handling | On connect, separate from AddFilters | Connection-level concern, not item-level | |
| 497 | | Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | | 685 | | Layer 2+3 handling | Via compute_actions → sync_computed_filters | Item-level, proper pending tracking | |
| 498 | | Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | | 686 | | Clear PendingSyncIndex | Always first | Old subscriptions are dead, must clear before any operation | |
| 499 | | Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | | 687 | | fresh_start vs long_reconnect | Same flow, different metrics | Reuse logic, distinguish intentional refresh from failure recovery | |
| 500 | | compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | | 688 | | Consolidation | Live only, no historic | Items already synced, just reducing subscription count | |
| 501 | | Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | | 689 | | compute_actions role | ONLY decision point for new work | Single place to reason about what needs syncing | |
| 502 | | Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | | 690 | | NIP-77 negentropy | Try first on full sync, fallback | Efficient for large sets, graceful degradation | |
| 503 | | Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | | ||
| 504 | | 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | | ||
| 505 | | Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | | ||
| 506 | | NIP-77 negentropy | Try first, fallback to REQ | Efficient set reconciliation when supported | | ||
| 507 | |||
| 508 | --- | ||
| 509 | |||
| 510 | ## NIP-77 Negentropy Sync | ||
| 511 | |||
| 512 | The sync system supports NIP-77 negentropy for efficient set reconciliation when syncing with external relays. | ||
| 513 | |||
| 514 | ### What is Negentropy? | ||
| 515 | |||
| 516 | NIP-77 defines the negentropy protocol for efficient event set comparison. Instead of requesting all events matching a filter (REQ+EOSE), negentropy allows relays to compare fingerprints of their event sets and only transfer the differences. | ||
| 517 | |||
| 518 | ### When Negentropy is Used | ||
| 519 | |||
| 520 | Negentropy sync is attempted for: | ||
| 521 | |||
| 522 | - **Initial connect** - Fresh sync without `last_connected` | ||
| 523 | - **Daily sync** - Periodic full refresh (23-25 hour timer) | ||
| 524 | - **Stale reconnect** - Disconnected for more than 15 minutes | ||
| 525 | |||
| 526 | Negentropy is NOT used for: | ||
| 527 | |||
| 528 | - **Quick reconnect** - Less than 15 minutes disconnected (uses REQ with `since`) | ||
| 529 | - **Live subscriptions** - Ongoing event streams always use REQ | ||
| 530 | |||
| 531 | ### Implementation | ||
| 532 | |||
| 533 | The [`RelayConnection`](../../src/sync/relay_connection.rs:71) now includes NIP-77 methods: | ||
| 534 | |||
| 535 | ```rust | ||
| 536 | /// Check if negentropy sync should be attempted | ||
| 537 | pub async fn supports_negentropy(&self) -> bool { | ||
| 538 | // Always returns true - we try negentropy and handle failure gracefully | ||
| 539 | true | ||
| 540 | } | ||
| 541 | |||
| 542 | /// Perform negentropy synchronization for a filter | ||
| 543 | pub async fn negentropy_sync_filter(&self, filter: Filter) | ||
| 544 | -> Result<NegentropySyncResult, String> { | ||
| 545 | // Uses nostr-sdk's client.sync() method | ||
| 546 | } | ||
| 547 | ``` | ||
| 548 | |||
| 549 | ### Sync Flow with Negentropy | ||
| 550 | |||
| 551 | ```mermaid | ||
| 552 | flowchart TB | ||
| 553 | CONNECT[Connect to relay] --> NEG{Try negentropy} | ||
| 554 | NEG --> |success| L1[Layer 1 synced via negentropy] | ||
| 555 | NEG --> |failure| FALLBACK[Fall back to REQ+EOSE] | ||
| 556 | |||
| 557 | L1 --> SINCE[Record timestamp = now] | ||
| 558 | FALLBACK --> EOSE[Wait for EOSE] | ||
| 559 | EOSE --> SINCE | ||
| 560 | |||
| 561 | SINCE --> LIVE[Open live REQ with since=now] | ||
| 562 | ``` | ||
| 563 | |||
| 564 | ### Fallback Behavior | ||
| 565 | |||
| 566 | If negentropy fails (relay doesn't support NIP-77, network error, etc.): | ||
| 567 | |||
| 568 | 1. A warning is logged (once per relay to avoid spam) | ||
| 569 | 2. The sync falls back to traditional REQ+EOSE | ||
| 570 | 3. No error is raised - fallback is automatic | ||
| 571 | |||
| 572 | **Implementation:** [`negentropy_sync_and_process()`](../../src/sync/mod.rs:1549) | ||
| 573 | |||
| 574 | ### Key Design Decisions for Negentropy | ||
| 575 | |||
| 576 | | Decision | Choice | Rationale | | ||
| 577 | | ------------------ | --------------------------- | ------------------------------------------------- | | ||
| 578 | | Detection approach | Try and fallback | More reliable than NIP-11 document detection | | ||
| 579 | | When to use | Fresh/daily/stale sync only | Quick reconnect with `since` is already efficient | | ||
| 580 | | Error handling | Log once, fallback silently | Avoid log spam while maintaining visibility | | ||
| 581 | | Layer application | Layer 1 first | Announcements are highest priority | | ||
| 582 | 691 | ||
| 583 | --- | 692 | --- |
| 584 | 693 | ||
| @@ -586,8 +695,8 @@ If negentropy fails (relay doesn't support NIP-77, network error, etc.): | |||
| 586 | 695 | ||
| 587 | ``` | 696 | ``` |
| 588 | src/sync/ | 697 | src/sync/ |
| 589 | ├── mod.rs # SyncManager, main loop, data structures (RepoSyncNeeds, RelayState, etc.) | 698 | ├── mod.rs # SyncManager, main loop, data structures |
| 590 | ├── algorithms.rs # derive_relay_targets(), compute_actions(), AddFilters | 699 | ├── algorithms.rs # derive_relay_targets(), compute_actions() |
| 591 | ├── filters.rs # build_announcement_filter(), build_layer2_and_layer3_filters() | 700 | ├── filters.rs # build_announcement_filter(), build_layer2_and_layer3_filters() |
| 592 | ├── health.rs # RelayHealthTracker with exponential backoff | 701 | ├── health.rs # RelayHealthTracker with exponential backoff |
| 593 | ├── relay_connection.rs # RelayConnection, RelayEvent handling | 702 | ├── relay_connection.rs # RelayConnection, RelayEvent handling |
| @@ -599,7 +708,7 @@ src/sync/ | |||
| 599 | 708 | ||
| 600 | ## Health Tracking | 709 | ## Health Tracking |
| 601 | 710 | ||
| 602 | The [`RelayHealthTracker`](../../src/sync/health.rs:93) manages connection health with exponential backoff: | 711 | The `RelayHealthTracker` manages connection health with exponential backoff: |
| 603 | 712 | ||
| 604 | - **States**: Healthy, Degraded, Dead | 713 | - **States**: Healthy, Degraded, Dead |
| 605 | - **Backoff**: `base * 2^(failures-1)`, capped at max_backoff | 714 | - **Backoff**: `base * 2^(failures-1)`, capped at max_backoff |
| @@ -610,6 +719,32 @@ Bootstrap relays are never disconnected by the cleanup system, even if empty. | |||
| 610 | 719 | ||
| 611 | --- | 720 | --- |
| 612 | 721 | ||
| 722 | ## Self-Subscriber | ||
| 723 | |||
| 724 | The `SelfSubscriber` monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`. | ||
| 725 | |||
| 726 | ### Event Kinds Monitored | ||
| 727 | |||
| 728 | - **30617** - Repository Announcements (triggers discovery of repos listing our relay) | ||
| 729 | - **1617** - Patches (root events referencing repos) | ||
| 730 | - **1618** - Issues | ||
| 731 | - **1619** - Replies/Status | ||
| 732 | - **1621** - Pull Requests | ||
| 733 | |||
| 734 | Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays. | ||
| 735 | |||
| 736 | ### Batching Flow | ||
| 737 | |||
| 738 | 1. **Receive events** from own relay subscription | ||
| 739 | 2. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID | ||
| 740 | 3. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events | ||
| 741 | 4. **Process batch**: | ||
| 742 | - Update `RepoSyncIndex` with discovered repos and root events | ||
| 743 | - Call `compute_actions()` | ||
| 744 | - Send `AddFilters` actions to SyncManager → `sync_computed_filters()` | ||
| 745 | |||
| 746 | --- | ||
| 747 | |||
| 613 | ## Disconnect Handling | 748 | ## Disconnect Handling |
| 614 | 749 | ||
| 615 | The disconnect checker runs periodically (default: 60 seconds) to clean up empty relays: | 750 | The disconnect checker runs periodically (default: 60 seconds) to clean up empty relays: |
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 5b5b520..84248b1 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -11,7 +11,9 @@ use std::collections::{HashMap, HashSet}; | |||
| 11 | 11 | ||
| 12 | use nostr_sdk::prelude::*; | 12 | use nostr_sdk::prelude::*; |
| 13 | 13 | ||
| 14 | use super::{ConnectionStatus, PendingBatch, RelayState, SyncMethod}; | 14 | use crate::sync::PendingItems; |
| 15 | |||
| 16 | use super::{ConnectionStatus, PendingBatch, RelayState}; | ||
| 15 | 17 | ||
| 16 | // ============================================================================= | 18 | // ============================================================================= |
| 17 | // Data Structures | 19 | // Data Structures |
| @@ -36,10 +38,8 @@ pub struct RelaySyncNeeds { | |||
| 36 | pub struct AddFilters { | 38 | pub struct AddFilters { |
| 37 | /// The relay URL to add filters to | 39 | /// The relay URL to add filters to |
| 38 | pub relay_url: String, | 40 | pub relay_url: String, |
| 39 | /// Repos being synced in this action | 41 | /// pending items - repos and root events |
| 40 | pub repos: HashSet<String>, | 42 | pub items: PendingItems, |
| 41 | /// Root events being tracked in this action | ||
| 42 | pub root_events: HashSet<EventId>, | ||
| 43 | /// The actual filters to subscribe with | 43 | /// The actual filters to subscribe with |
| 44 | pub filters: Vec<Filter>, | 44 | pub filters: Vec<Filter>, |
| 45 | } | 45 | } |
| @@ -161,8 +161,10 @@ pub fn compute_actions( | |||
| 161 | 161 | ||
| 162 | actions.push(AddFilters { | 162 | actions.push(AddFilters { |
| 163 | relay_url: relay_url.clone(), | 163 | relay_url: relay_url.clone(), |
| 164 | repos: new_repos, | 164 | items: PendingItems { |
| 165 | root_events: new_events, | 165 | repos: new_repos, |
| 166 | root_events: new_events, | ||
| 167 | }, | ||
| 166 | filters, | 168 | filters, |
| 167 | }); | 169 | }); |
| 168 | } | 170 | } |
| @@ -175,6 +177,7 @@ pub fn compute_actions( | |||
| 175 | mod tests { | 177 | mod tests { |
| 176 | use super::*; | 178 | use super::*; |
| 177 | use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; | 179 | use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; |
| 180 | use crate::sync::SyncMethod; | ||
| 178 | 181 | ||
| 179 | // ========================================================================= | 182 | // ========================================================================= |
| 180 | // derive_relay_targets tests | 183 | // derive_relay_targets tests |
| @@ -371,7 +374,7 @@ mod tests { | |||
| 371 | assert_eq!(actions.len(), 1); | 374 | assert_eq!(actions.len(), 1); |
| 372 | let action = &actions[0]; | 375 | let action = &actions[0]; |
| 373 | assert_eq!(action.relay_url, "wss://relay1.com"); | 376 | assert_eq!(action.relay_url, "wss://relay1.com"); |
| 374 | assert!(action.repos.contains("repo1")); | 377 | assert!(action.items.repos.contains("repo1")); |
| 375 | assert!(!action.filters.is_empty()); | 378 | assert!(!action.filters.is_empty()); |
| 376 | } | 379 | } |
| 377 | 380 | ||
| @@ -528,10 +531,10 @@ mod tests { | |||
| 528 | assert_eq!(actions.len(), 1); | 531 | assert_eq!(actions.len(), 1); |
| 529 | let action = &actions[0]; | 532 | let action = &actions[0]; |
| 530 | // Only repo3 should be in the action (repo1 pending, repo2 confirmed) | 533 | // Only repo3 should be in the action (repo1 pending, repo2 confirmed) |
| 531 | assert_eq!(action.repos.len(), 1); | 534 | assert_eq!(action.items.repos.len(), 1); |
| 532 | assert!(action.repos.contains("repo3")); | 535 | assert!(action.items.repos.contains("repo3")); |
| 533 | assert!(!action.repos.contains("repo1")); | 536 | assert!(!action.items.repos.contains("repo1")); |
| 534 | assert!(!action.repos.contains("repo2")); | 537 | assert!(!action.items.repos.contains("repo2")); |
| 535 | } | 538 | } |
| 536 | 539 | ||
| 537 | #[test] | 540 | #[test] |
| @@ -554,9 +557,9 @@ mod tests { | |||
| 554 | 557 | ||
| 555 | assert_eq!(actions.len(), 1); | 558 | assert_eq!(actions.len(), 1); |
| 556 | let action = &actions[0]; | 559 | let action = &actions[0]; |
| 557 | assert!(action.repos.is_empty()); | 560 | assert!(action.items.repos.is_empty()); |
| 558 | assert_eq!(action.root_events.len(), 1); | 561 | assert_eq!(action.items.root_events.len(), 1); |
| 559 | assert!(action.root_events.contains(&event_id)); | 562 | assert!(action.items.root_events.contains(&event_id)); |
| 560 | // Should have 3 filters for the root event (e, E, q tags) | 563 | // Should have 3 filters for the root event (e, E, q tags) |
| 561 | assert_eq!(action.filters.len(), 3); | 564 | assert_eq!(action.filters.len(), 3); |
| 562 | } | 565 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 41586a4..401cf21 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -591,7 +591,7 @@ impl SyncManager { | |||
| 591 | } | 591 | } |
| 592 | 592 | ||
| 593 | // Recompute actions for Layer 2+3 based on synced events | 593 | // Recompute actions for Layer 2+3 based on synced events |
| 594 | self.recompute_actions_for_relay(relay_url).await; | 594 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 595 | } else { | 595 | } else { |
| 596 | // NIP-77 not supported - fall back to REQ+EOSE | 596 | // NIP-77 not supported - fall back to REQ+EOSE |
| 597 | tracing::info!( | 597 | tracing::info!( |
| @@ -612,7 +612,7 @@ impl SyncManager { | |||
| 612 | } | 612 | } |
| 613 | 613 | ||
| 614 | // Recompute actions for Layer 2+3 - will discover all repos/events again | 614 | // Recompute actions for Layer 2+3 - will discover all repos/events again |
| 615 | self.recompute_actions_for_relay(relay_url).await; | 615 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 616 | } | 616 | } |
| 617 | 617 | ||
| 618 | if let Some(ref metrics) = self.metrics { | 618 | if let Some(ref metrics) = self.metrics { |
| @@ -709,7 +709,7 @@ impl SyncManager { | |||
| 709 | Some(add_filters) => { | 709 | Some(add_filters) => { |
| 710 | // Process AddFilters action directly | 710 | // Process AddFilters action directly |
| 711 | let mut manager = sync_manager.lock().await; | 711 | let mut manager = sync_manager.lock().await; |
| 712 | manager.handle_add_filters(add_filters).await; | 712 | manager.handle_new_sync_filters(add_filters).await; |
| 713 | } | 713 | } |
| 714 | None => break, | 714 | None => break, |
| 715 | } | 715 | } |
| @@ -763,13 +763,13 @@ impl SyncManager { | |||
| 763 | /// - For new relays: creates entry with Connecting status, spawns connection | 763 | /// - For new relays: creates entry with Connecting status, spawns connection |
| 764 | /// - For existing connected relays: subscribes to filters, creates PendingBatch | 764 | /// - For existing connected relays: subscribes to filters, creates PendingBatch |
| 765 | /// - For disconnected/connecting relays: returns (will be handled on connection) | 765 | /// - For disconnected/connecting relays: returns (will be handled on connection) |
| 766 | async fn handle_add_filters(&mut self, action: AddFilters) { | 766 | async fn handle_new_sync_filters(&mut self, action: AddFilters) { |
| 767 | tracing::info!( | 767 | tracing::info!( |
| 768 | relay = %action.relay_url, | 768 | relay = %action.relay_url, |
| 769 | repo_count = action.repos.len(), | 769 | repo_count = action.items.repos.len(), |
| 770 | root_event_count = action.root_events.len(), | 770 | root_event_count = action.items.root_events.len(), |
| 771 | filter_count = action.filters.len(), | 771 | filter_count = action.filters.len(), |
| 772 | "[DIAG] handle_add_filters called" | 772 | "[DIAG] handle_new_sync_filters called" |
| 773 | ); | 773 | ); |
| 774 | 774 | ||
| 775 | // Step 1: Check if relay exists in relay_sync_index | 775 | // Step 1: Check if relay exists in relay_sync_index |
| @@ -801,7 +801,7 @@ impl SyncManager { | |||
| 801 | 801 | ||
| 802 | tracing::info!( | 802 | tracing::info!( |
| 803 | relay = %action.relay_url, | 803 | relay = %action.relay_url, |
| 804 | repos = action.repos.len(), | 804 | repos = action.items.repos.len(), |
| 805 | "Spawning connection for new relay" | 805 | "Spawning connection for new relay" |
| 806 | ); | 806 | ); |
| 807 | 807 | ||
| @@ -827,7 +827,7 @@ impl SyncManager { | |||
| 827 | // Step 2: Check if consolidation is needed BEFORE adding new filters | 827 | // Step 2: Check if consolidation is needed BEFORE adding new filters |
| 828 | self.maybe_consolidate(&action.relay_url, action.filters.len()) | 828 | self.maybe_consolidate(&action.relay_url, action.filters.len()) |
| 829 | .await; | 829 | .await; |
| 830 | 830 | /// DELETE this bit | |
| 831 | // Step 3: Get connection and subscribe to all filters | 831 | // Step 3: Get connection and subscribe to all filters |
| 832 | let connection = match self.connections.get(&action.relay_url) { | 832 | let connection = match self.connections.get(&action.relay_url) { |
| 833 | Some(conn) => conn, | 833 | Some(conn) => conn, |
| @@ -870,8 +870,8 @@ impl SyncManager { | |||
| 870 | let batch = PendingBatch { | 870 | let batch = PendingBatch { |
| 871 | batch_id, | 871 | batch_id, |
| 872 | items: PendingItems { | 872 | items: PendingItems { |
| 873 | repos: action.repos.clone(), | 873 | repos: action.items.repos.clone(), |
| 874 | root_events: action.root_events.clone(), | 874 | root_events: action.items.root_events.clone(), |
| 875 | }, | 875 | }, |
| 876 | outstanding_subs: subscription_ids.into_iter().collect(), | 876 | outstanding_subs: subscription_ids.into_iter().collect(), |
| 877 | sync_method: SyncMethod::ReqEose, | 877 | sync_method: SyncMethod::ReqEose, |
| @@ -889,33 +889,84 @@ impl SyncManager { | |||
| 889 | tracing::debug!( | 889 | tracing::debug!( |
| 890 | relay = %action.relay_url, | 890 | relay = %action.relay_url, |
| 891 | batch_id = batch_id, | 891 | batch_id = batch_id, |
| 892 | repos = action.repos.len(), | 892 | repos = action.items.repos.len(), |
| 893 | root_events = action.root_events.len(), | 893 | root_events = action.items.root_events.len(), |
| 894 | filters = action.filters.len(), | 894 | filters = action.filters.len(), |
| 895 | "Created pending batch for filter subscriptions" | 895 | "Created pending batch for filter subscriptions" |
| 896 | ); | 896 | ); |
| 897 | // REPLACE WITH THIS: | ||
| 898 | // // Subscribe to each filter and collect subscription IDs | ||
| 899 | // self.sync_live(&action.relay_url, &action.filters).await; | ||
| 900 | // // TODO need to do actions.repos | ||
| 901 | // self.historic_sync(&action.relay_url, action.filters, action.items, None) | ||
| 902 | // .await; | ||
| 897 | } | 903 | } |
| 898 | 904 | ||
| 899 | /// Handle a connection success (called when a relay connects or reconnects) | 905 | /// Handle a connection success (called when a relay connects or reconnects) |
| 900 | /// | 906 | /// |
| 901 | /// This method implements smart reconnection logic: | 907 | /// This method dispatches to the appropriate reconnection strategy: |
| 902 | /// - Fresh sync if never connected or >15 min since last connection | 908 | /// - `fresh_start()` if never connected before |
| 903 | /// - Quick reconnect with since filter if <15 min since last connection | 909 | /// - `quick_reconnect()` if disconnected < 15 minutes |
| 904 | /// | 910 | /// - `long_reconnect()` if disconnected > 15 minutes |
| 905 | /// For fresh sync (with NIP-77 negentropy if supported): | ||
| 906 | /// - Clears any stale state | ||
| 907 | /// - Uses negentropy sync for Layer 1 (if NIP-77 supported) | ||
| 908 | /// - Falls back to REQ+EOSE if NIP-77 not supported | ||
| 909 | /// - Recomputes actions for new items | ||
| 910 | /// | ||
| 911 | /// For quick reconnect: | ||
| 912 | /// - Preserves existing state | ||
| 913 | /// - Subscribes to Layer 1 with since filter | ||
| 914 | /// - Rebuilds Layer 2 and Layer 3 with since filter | ||
| 915 | /// - Recomputes actions for new items | ||
| 916 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | 911 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { |
| 917 | let now = Timestamp::now(); | 912 | let now = Timestamp::now(); |
| 918 | 913 | ||
| 914 | // // Get the relay state to determine reconnect type | ||
| 915 | // let (last_connected, disconnected_at) = { | ||
| 916 | // let index = self.relay_sync_index.read().await; | ||
| 917 | // if let Some(state) = index.get(relay_url) { | ||
| 918 | // (state.last_connected, state.disconnected_at) | ||
| 919 | // } else { | ||
| 920 | // (None, None) // No state found | ||
| 921 | // } | ||
| 922 | // }; | ||
| 923 | |||
| 924 | // // Determine which reconnection strategy to use | ||
| 925 | // match (last_connected, disconnected_at) { | ||
| 926 | // (None, _) => { | ||
| 927 | // // Never connected before - fresh start | ||
| 928 | // tracing::info!( | ||
| 929 | // relay = %relay_url, | ||
| 930 | // "First connection - initiating fresh_start" | ||
| 931 | // ); | ||
| 932 | // self.fresh_start(relay_url).await; | ||
| 933 | // } | ||
| 934 | // (Some(last), Some(disconnected)) => { | ||
| 935 | // // Was connected before, check how long disconnected | ||
| 936 | // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs()); | ||
| 937 | |||
| 938 | // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS { | ||
| 939 | // // Disconnected < 15 minutes - quick reconnect | ||
| 940 | // // Use last_connected minus buffer as since timestamp | ||
| 941 | // let since = | ||
| 942 | // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); | ||
| 943 | // tracing::info!( | ||
| 944 | // relay = %relay_url, | ||
| 945 | // disconnect_secs = disconnect_duration, | ||
| 946 | // since = %since, | ||
| 947 | // "Short disconnection - initiating quick_reconnect" | ||
| 948 | // ); | ||
| 949 | // self.quick_reconnect(relay_url, since).await; | ||
| 950 | // } else { | ||
| 951 | // // Disconnected > 15 minutes - long reconnect | ||
| 952 | // tracing::info!( | ||
| 953 | // relay = %relay_url, | ||
| 954 | // disconnect_secs = disconnect_duration, | ||
| 955 | // "Long disconnection - initiating long_reconnect" | ||
| 956 | // ); | ||
| 957 | // self.long_reconnect(relay_url).await; | ||
| 958 | // } | ||
| 959 | // } | ||
| 960 | // (Some(_last), None) => { | ||
| 961 | // // Was connected but no disconnected_at - shouldn't happen normally | ||
| 962 | // // Treat as long reconnect to be safe | ||
| 963 | // tracing::warn!( | ||
| 964 | // relay = %relay_url, | ||
| 965 | // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect" | ||
| 966 | // ); | ||
| 967 | // self.long_reconnect(relay_url).await; | ||
| 968 | // } | ||
| 969 | // } | ||
| 919 | // Get the relay state to determine reconnect type | 970 | // Get the relay state to determine reconnect type |
| 920 | let (is_fresh_sync, last_connected, is_bootstrap) = { | 971 | let (is_fresh_sync, last_connected, is_bootstrap) = { |
| 921 | let index = self.relay_sync_index.read().await; | 972 | let index = self.relay_sync_index.read().await; |
| @@ -998,7 +1049,7 @@ impl SyncManager { | |||
| 998 | 1049 | ||
| 999 | // After negentropy sync, recompute Layer 2+3 actions | 1050 | // After negentropy sync, recompute Layer 2+3 actions |
| 1000 | // Layer 1 events are now in sync, so we can proceed with Layer 2+3 | 1051 | // Layer 1 events are now in sync, so we can proceed with Layer 2+3 |
| 1001 | self.recompute_actions_for_relay(relay_url).await; | 1052 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1002 | 1053 | ||
| 1003 | // Set up live subscription for new events (since=now) | 1054 | // Set up live subscription for new events (since=now) |
| 1004 | let live_filter = filters::build_announcement_filter(Some(now)); | 1055 | let live_filter = filters::build_announcement_filter(Some(now)); |
| @@ -1021,7 +1072,7 @@ impl SyncManager { | |||
| 1021 | // during connect_and_subscribe() in handle_add_filters(). That call subscribes | 1072 | // during connect_and_subscribe() in handle_add_filters(). That call subscribes |
| 1022 | // to kinds 30617+30618 for the full history. Here we only need to recompute | 1073 | // to kinds 30617+30618 for the full history. Here we only need to recompute |
| 1023 | // Layer 2+3 actions based on the repos we're tracking. | 1074 | // Layer 2+3 actions based on the repos we're tracking. |
| 1024 | self.recompute_actions_for_relay(relay_url).await; | 1075 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1025 | } | 1076 | } |
| 1026 | } else { | 1077 | } else { |
| 1027 | // Quick reconnect: use since filter (no negentropy needed) | 1078 | // Quick reconnect: use since filter (no negentropy needed) |
| @@ -1055,7 +1106,7 @@ impl SyncManager { | |||
| 1055 | .await; | 1106 | .await; |
| 1056 | 1107 | ||
| 1057 | // Recompute actions for any new items discovered while disconnected | 1108 | // Recompute actions for any new items discovered while disconnected |
| 1058 | self.recompute_actions_for_relay(relay_url).await; | 1109 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1059 | 1110 | ||
| 1060 | if let Some(ref metrics) = self.metrics { | 1111 | if let Some(ref metrics) = self.metrics { |
| 1061 | metrics.record_event(event_source::RECONNECT); | 1112 | metrics.record_event(event_source::RECONNECT); |
| @@ -1063,6 +1114,225 @@ impl SyncManager { | |||
| 1063 | } | 1114 | } |
| 1064 | } | 1115 | } |
| 1065 | 1116 | ||
| 1117 | /// Fresh start - clears state and does full sync | ||
| 1118 | /// | ||
| 1119 | /// Called by: initial connect, long_reconnect, daily_sync | ||
| 1120 | /// | ||
| 1121 | /// Flow: | ||
| 1122 | /// 1. Clear PendingSyncIndex for this relay | ||
| 1123 | /// 2. Clear RelaySyncIndex sync state (repos/root_events) | ||
| 1124 | /// 3. Update connection state to Connected | ||
| 1125 | /// 4. L1 live + L1 historic (negentropy if available) | ||
| 1126 | /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3 | ||
| 1127 | async fn fresh_start(&mut self, relay_url: &str) { | ||
| 1128 | let now = Timestamp::now(); | ||
| 1129 | |||
| 1130 | tracing::info!(relay = %relay_url, "Starting fresh_start"); | ||
| 1131 | |||
| 1132 | // Step 1: Clear PendingSyncIndex for this relay | ||
| 1133 | { | ||
| 1134 | let mut pending = self.pending_sync_index.write().await; | ||
| 1135 | if pending.remove(relay_url).is_some() { | ||
| 1136 | tracing::debug!( | ||
| 1137 | relay = %relay_url, | ||
| 1138 | "Cleared pending batches in fresh_start" | ||
| 1139 | ); | ||
| 1140 | } | ||
| 1141 | } | ||
| 1142 | |||
| 1143 | // Step 2: Clear RelaySyncIndex sync state (but preserve connection metadata) | ||
| 1144 | { | ||
| 1145 | let mut index = self.relay_sync_index.write().await; | ||
| 1146 | if let Some(state) = index.get_mut(relay_url) { | ||
| 1147 | let repos_cleared = state.repos.len(); | ||
| 1148 | let events_cleared = state.root_events.len(); | ||
| 1149 | state.clear_sync_state(); | ||
| 1150 | if repos_cleared > 0 || events_cleared > 0 { | ||
| 1151 | tracing::debug!( | ||
| 1152 | relay = %relay_url, | ||
| 1153 | repos_cleared = repos_cleared, | ||
| 1154 | events_cleared = events_cleared, | ||
| 1155 | "Cleared sync state in fresh_start" | ||
| 1156 | ); | ||
| 1157 | } | ||
| 1158 | } | ||
| 1159 | } | ||
| 1160 | |||
| 1161 | // Step 3: Update connection state | ||
| 1162 | { | ||
| 1163 | let mut index = self.relay_sync_index.write().await; | ||
| 1164 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 1165 | state.connection_status = ConnectionStatus::Connected; | ||
| 1166 | state.last_connected = Some(now); | ||
| 1167 | state.disconnected_at = None; | ||
| 1168 | } | ||
| 1169 | |||
| 1170 | // Record success in health tracker | ||
| 1171 | self.health_tracker.record_success(relay_url); | ||
| 1172 | |||
| 1173 | // Update metrics | ||
| 1174 | if let Some(ref metrics) = self.metrics { | ||
| 1175 | metrics.set_relay_connected(relay_url, true); | ||
| 1176 | metrics.inc_connected_count(); | ||
| 1177 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1178 | } | ||
| 1179 | |||
| 1180 | // Step 4: L1 sync - check negentropy support | ||
| 1181 | let use_negentropy = if self.config.sync_disable_negentropy { | ||
| 1182 | tracing::debug!(relay = %relay_url, "Negentropy disabled via config"); | ||
| 1183 | false | ||
| 1184 | } else if let Some(connection) = self.connections.get(relay_url) { | ||
| 1185 | connection.supports_negentropy().await | ||
| 1186 | } else { | ||
| 1187 | false | ||
| 1188 | }; | ||
| 1189 | |||
| 1190 | if use_negentropy { | ||
| 1191 | // NIP-77 supported - use negentropy for L1 historical sync | ||
| 1192 | tracing::info!( | ||
| 1193 | relay = %relay_url, | ||
| 1194 | "Using NIP-77 negentropy for L1 historical sync" | ||
| 1195 | ); | ||
| 1196 | |||
| 1197 | // L1 historic sync (no since - full sync) | ||
| 1198 | let layer1_filter = filters::build_announcement_filter(None); | ||
| 1199 | self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)") | ||
| 1200 | .await; | ||
| 1201 | |||
| 1202 | // L1 live subscription (since=now for ongoing events) | ||
| 1203 | let live_filter = filters::build_announcement_filter(Some(now)); | ||
| 1204 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1205 | if let Err(e) = connection.subscribe_filter(live_filter).await { | ||
| 1206 | tracing::error!( | ||
| 1207 | relay = %relay_url, | ||
| 1208 | error = %e, | ||
| 1209 | "Failed to set up L1 live subscription in fresh_start" | ||
| 1210 | ); | ||
| 1211 | } | ||
| 1212 | } | ||
| 1213 | } else { | ||
| 1214 | // NIP-77 not supported - REQ+EOSE | ||
| 1215 | // Note: Layer 1 subscription (without since) was already established | ||
| 1216 | // during connect_and_subscribe() in spawn_relay_connection | ||
| 1217 | tracing::info!( | ||
| 1218 | relay = %relay_url, | ||
| 1219 | "Using REQ+EOSE for L1 sync (negentropy not available)" | ||
| 1220 | ); | ||
| 1221 | } | ||
| 1222 | |||
| 1223 | // Step 5: compute_actions → AddFilters for L2+L3 | ||
| 1224 | // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters | ||
| 1225 | // for ALL repos that should be synced from this relay | ||
| 1226 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1227 | |||
| 1228 | tracing::info!(relay = %relay_url, "fresh_start complete"); | ||
| 1229 | } | ||
| 1230 | |||
| 1231 | /// Quick reconnect - for disconnections < 15 minutes | ||
| 1232 | /// | ||
| 1233 | /// Flow: | ||
| 1234 | /// 1. Clear PendingSyncIndex for this relay | ||
| 1235 | /// 2. Update connection state to Connected | ||
| 1236 | /// 3. L1 live + L1 historic(since) | ||
| 1237 | /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since) | ||
| 1238 | /// 5. compute_actions for any new items discovered during catchup | ||
| 1239 | async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { | ||
| 1240 | let now = Timestamp::now(); | ||
| 1241 | |||
| 1242 | tracing::info!( | ||
| 1243 | relay = %relay_url, | ||
| 1244 | since = %since, | ||
| 1245 | "Starting quick_reconnect" | ||
| 1246 | ); | ||
| 1247 | |||
| 1248 | // Step 1: Clear PendingSyncIndex for this relay | ||
| 1249 | // Old subscriptions are dead after disconnect | ||
| 1250 | { | ||
| 1251 | let mut pending = self.pending_sync_index.write().await; | ||
| 1252 | if pending.remove(relay_url).is_some() { | ||
| 1253 | tracing::debug!( | ||
| 1254 | relay = %relay_url, | ||
| 1255 | "Cleared pending batches in quick_reconnect" | ||
| 1256 | ); | ||
| 1257 | } | ||
| 1258 | } | ||
| 1259 | |||
| 1260 | // Step 2: Update connection state (preserve repos/root_events - that's the point!) | ||
| 1261 | { | ||
| 1262 | let mut index = self.relay_sync_index.write().await; | ||
| 1263 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 1264 | state.connection_status = ConnectionStatus::Connected; | ||
| 1265 | state.last_connected = Some(now); | ||
| 1266 | state.disconnected_at = None; | ||
| 1267 | } | ||
| 1268 | |||
| 1269 | // Record success in health tracker | ||
| 1270 | self.health_tracker.record_success(relay_url); | ||
| 1271 | |||
| 1272 | // Update metrics | ||
| 1273 | if let Some(ref metrics) = self.metrics { | ||
| 1274 | metrics.set_relay_connected(relay_url, true); | ||
| 1275 | metrics.inc_connected_count(); | ||
| 1276 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1277 | metrics.record_event(event_source::RECONNECT); | ||
| 1278 | } | ||
| 1279 | |||
| 1280 | // Step 3: L1 live + L1 historic with since filter | ||
| 1281 | // L1 live subscription (since=now for ongoing events) | ||
| 1282 | let live_filter = filters::build_announcement_filter(Some(now)); | ||
| 1283 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1284 | if let Err(e) = connection.subscribe_filter(live_filter).await { | ||
| 1285 | tracing::error!( | ||
| 1286 | relay = %relay_url, | ||
| 1287 | error = %e, | ||
| 1288 | "Failed to set up L1 live subscription in quick_reconnect" | ||
| 1289 | ); | ||
| 1290 | } | ||
| 1291 | } | ||
| 1292 | |||
| 1293 | // L1 historic with since filter (catch up on missed announcements) | ||
| 1294 | let layer1_filter = filters::build_announcement_filter(Some(since)); | ||
| 1295 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1296 | if let Err(e) = connection.subscribe_filter(layer1_filter).await { | ||
| 1297 | tracing::error!( | ||
| 1298 | relay = %relay_url, | ||
| 1299 | error = %e, | ||
| 1300 | "Failed to subscribe to L1 historic filter in quick_reconnect" | ||
| 1301 | ); | ||
| 1302 | } | ||
| 1303 | } | ||
| 1304 | |||
| 1305 | // Step 4: Rebuild L2+L3 from confirmed state with since filter | ||
| 1306 | // This uses the preserved repos/root_events from RelaySyncIndex | ||
| 1307 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | ||
| 1308 | |||
| 1309 | // Step 5: compute_actions for any NEW items discovered while disconnected | ||
| 1310 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1311 | |||
| 1312 | tracing::info!(relay = %relay_url, "quick_reconnect complete"); | ||
| 1313 | } | ||
| 1314 | |||
| 1315 | /// Long reconnect - for disconnections > 15 minutes | ||
| 1316 | /// | ||
| 1317 | /// Flow: | ||
| 1318 | /// 1. Record disconnect/reconnect metric | ||
| 1319 | /// 2. Delegate to fresh_start() | ||
| 1320 | async fn long_reconnect(&mut self, relay_url: &str) { | ||
| 1321 | tracing::info!(relay = %relay_url, "Starting long_reconnect"); | ||
| 1322 | |||
| 1323 | // Step 1: Record disconnect/reconnect metric | ||
| 1324 | // This distinguishes intentional daily refresh from failure recovery | ||
| 1325 | if let Some(ref metrics) = self.metrics { | ||
| 1326 | metrics.record_event(event_source::RECONNECT); | ||
| 1327 | } | ||
| 1328 | |||
| 1329 | // Step 2: Delegate to fresh_start | ||
| 1330 | // State is too stale to trust, start fresh | ||
| 1331 | self.fresh_start(relay_url).await; | ||
| 1332 | |||
| 1333 | tracing::info!(relay = %relay_url, "long_reconnect complete"); | ||
| 1334 | } | ||
| 1335 | |||
| 1066 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay | 1336 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay |
| 1067 | /// | 1337 | /// |
| 1068 | /// Uses the confirmed repos and root_events from RelayState to build filters. | 1338 | /// Uses the confirmed repos and root_events from RelayState to build filters. |
| @@ -1129,7 +1399,7 @@ impl SyncManager { | |||
| 1129 | /// | 1399 | /// |
| 1130 | /// Uses derive_relay_targets and compute_actions to find new items | 1400 | /// Uses derive_relay_targets and compute_actions to find new items |
| 1131 | /// that need to be synced. Processes AddFilters actions for new items. | 1401 | /// that need to be synced. Processes AddFilters actions for new items. |
| 1132 | async fn recompute_actions_for_relay(&mut self, relay_url: &str) { | 1402 | async fn recompute_new_sync_filters_for_relay(&mut self, relay_url: &str) { |
| 1133 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | 1403 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; |
| 1134 | 1404 | ||
| 1135 | // Get current state from indexes (need to collect to avoid holding locks) | 1405 | // Get current state from indexes (need to collect to avoid holding locks) |
| @@ -1173,12 +1443,12 @@ impl SyncManager { | |||
| 1173 | for action in actions { | 1443 | for action in actions { |
| 1174 | tracing::info!( | 1444 | tracing::info!( |
| 1175 | relay = %action.relay_url, | 1445 | relay = %action.relay_url, |
| 1176 | new_repos = action.repos.len(), | 1446 | new_repos = action.items.repos.len(), |
| 1177 | new_root_events = action.root_events.len(), | 1447 | new_root_events = action.items.root_events.len(), |
| 1178 | filters = action.filters.len(), | 1448 | filters = action.filters.len(), |
| 1179 | "Processing AddFilters for new items" | 1449 | "Processing AddFilters for new items" |
| 1180 | ); | 1450 | ); |
| 1181 | self.handle_add_filters(action).await; | 1451 | self.handle_new_sync_filters(action).await; |
| 1182 | } | 1452 | } |
| 1183 | } | 1453 | } |
| 1184 | 1454 | ||
| @@ -2095,6 +2365,366 @@ impl SyncManager { | |||
| 2095 | } | 2365 | } |
| 2096 | } | 2366 | } |
| 2097 | 2367 | ||
| 2368 | // ========================================================================= | ||
| 2369 | // Sync Primitives (Phase 1 of GRASP-02 refactoring) | ||
| 2370 | // These methods are new primitives that will be used in subsequent phases. | ||
| 2371 | // ========================================================================= | ||
| 2372 | |||
| 2373 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex | ||
| 2374 | #[allow(dead_code)] // Will be used in Phase 2+ | ||
| 2375 | /// | ||
| 2376 | /// This method subscribes to filters with `limit: 0` for receiving ongoing events. | ||
| 2377 | /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have | ||
| 2378 | /// a definite "completion" - they stay open indefinitely. | ||
| 2379 | /// | ||
| 2380 | /// Used for: | ||
| 2381 | /// - Layer 1 live subscription (new announcements after initial sync) | ||
| 2382 | /// - Layer 2+3 live subscriptions (new events after initial sync) | ||
| 2383 | /// | ||
| 2384 | /// # Arguments | ||
| 2385 | /// * `relay_url` - The relay URL to subscribe on | ||
| 2386 | /// * `filters` - Filters to subscribe to (will have `limit: 0` applied) | ||
| 2387 | /// | ||
| 2388 | /// # Returns | ||
| 2389 | /// Vec of subscription IDs for the live subscriptions, or empty if connection not found | ||
| 2390 | async fn sync_live(&self, relay_url: &str, filters: &[Filter]) -> Vec<SubscriptionId> { | ||
| 2391 | if filters.is_empty() { | ||
| 2392 | return vec![]; | ||
| 2393 | } | ||
| 2394 | |||
| 2395 | let connection = match self.connections.get(relay_url) { | ||
| 2396 | Some(conn) => conn, | ||
| 2397 | None => { | ||
| 2398 | tracing::warn!( | ||
| 2399 | relay = %relay_url, | ||
| 2400 | "No connection found for sync_live" | ||
| 2401 | ); | ||
| 2402 | return vec![]; | ||
| 2403 | } | ||
| 2404 | }; | ||
| 2405 | |||
| 2406 | let mut sub_ids = Vec::new(); | ||
| 2407 | |||
| 2408 | for filter in filters { | ||
| 2409 | // Apply limit: 0 to make this a live subscription | ||
| 2410 | // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", | ||
| 2411 | // but omitting limit means "no limit" which is what we want for live. | ||
| 2412 | // The filter passed in should already NOT have a limit set. | ||
| 2413 | match connection.subscribe_filter(filter.clone()).await { | ||
| 2414 | Ok(sub_id) => { | ||
| 2415 | tracing::trace!( | ||
| 2416 | relay = %relay_url, | ||
| 2417 | sub_id = %sub_id, | ||
| 2418 | "Live subscription created" | ||
| 2419 | ); | ||
| 2420 | sub_ids.push(sub_id); | ||
| 2421 | } | ||
| 2422 | Err(e) => { | ||
| 2423 | tracing::error!( | ||
| 2424 | relay = %relay_url, | ||
| 2425 | error = %e, | ||
| 2426 | "Failed to create live subscription" | ||
| 2427 | ); | ||
| 2428 | } | ||
| 2429 | } | ||
| 2430 | } | ||
| 2431 | |||
| 2432 | tracing::debug!( | ||
| 2433 | relay = %relay_url, | ||
| 2434 | filter_count = filters.len(), | ||
| 2435 | sub_count = sub_ids.len(), | ||
| 2436 | "sync_live completed" | ||
| 2437 | ); | ||
| 2438 | |||
| 2439 | sub_ids | ||
| 2440 | } | ||
| 2441 | |||
| 2442 | /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY) | ||
| 2443 | /// | ||
| 2444 | /// Returns raw Vec<Filter> for L1+L2+L3. | ||
| 2445 | /// Used by: quick_reconnect, consolidate | ||
| 2446 | /// Does NOT include pending items - those flow through AddFilters path. | ||
| 2447 | /// | ||
| 2448 | /// # Arguments | ||
| 2449 | /// * `relay_url` - The relay URL to reconstruct filters for | ||
| 2450 | /// | ||
| 2451 | /// # Returns | ||
| 2452 | /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags) | ||
| 2453 | #[allow(dead_code)] // Will be used in Phase 3+ | ||
| 2454 | async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter> { | ||
| 2455 | // Get confirmed state from relay_sync_index | ||
| 2456 | let (repos, root_events) = { | ||
| 2457 | let index = self.relay_sync_index.read().await; | ||
| 2458 | match index.get(relay_url) { | ||
| 2459 | Some(state) => (state.repos.clone(), state.root_events.clone()), | ||
| 2460 | None => { | ||
| 2461 | tracing::warn!( | ||
| 2462 | relay = %relay_url, | ||
| 2463 | "No RelayState found for reconstruct_filters" | ||
| 2464 | ); | ||
| 2465 | return vec![]; | ||
| 2466 | } | ||
| 2467 | } | ||
| 2468 | }; | ||
| 2469 | |||
| 2470 | let mut all_filters = Vec::new(); | ||
| 2471 | |||
| 2472 | // Layer 1: Announcements (always included) | ||
| 2473 | // Note: No `since` filter - this returns raw filters for live subscriptions | ||
| 2474 | all_filters.push(filters::build_announcement_filter(None)); | ||
| 2475 | |||
| 2476 | // Layer 2 + Layer 3: Repo and root event tag filters | ||
| 2477 | if !repos.is_empty() || !root_events.is_empty() { | ||
| 2478 | let l2_l3_filters = | ||
| 2479 | filters::build_layer2_and_layer3_filters(&repos, &root_events, None); | ||
| 2480 | all_filters.extend(l2_l3_filters); | ||
| 2481 | } | ||
| 2482 | |||
| 2483 | tracing::debug!( | ||
| 2484 | relay = %relay_url, | ||
| 2485 | total_filters = all_filters.len(), | ||
| 2486 | repos_count = repos.len(), | ||
| 2487 | root_events_count = root_events.len(), | ||
| 2488 | "Reconstructed filters from confirmed state" | ||
| 2489 | ); | ||
| 2490 | |||
| 2491 | all_filters | ||
| 2492 | } | ||
| 2493 | |||
| 2494 | /// Sync historical events and track in PendingSyncIndex | ||
| 2495 | #[allow(dead_code)] // Will be used in Phase 3+ | ||
| 2496 | /// | ||
| 2497 | /// This method handles historical synchronization for a set of filters, | ||
| 2498 | /// creating a PendingBatch to track completion. It dispatches to either | ||
| 2499 | /// negentropy sync or traditional REQ+EOSE based on relay capability and config. | ||
| 2500 | /// | ||
| 2501 | /// Used for: | ||
| 2502 | /// - Initial sync (no since filter) | ||
| 2503 | /// - Reconnect sync (with since filter) | ||
| 2504 | /// - Daily sync (no since filter, full re-sync) | ||
| 2505 | /// | ||
| 2506 | /// # Arguments | ||
| 2507 | /// * `relay_url` - The relay URL to sync from | ||
| 2508 | /// * `filters` - Filters to sync (will have `since` applied if provided) | ||
| 2509 | /// * `items` - Items being synced (for tracking in PendingBatch) | ||
| 2510 | /// * `since` - Optional timestamp for incremental sync | ||
| 2511 | /// | ||
| 2512 | /// # Returns | ||
| 2513 | /// * `Some(batch_id)` - Batch was created and sync initiated | ||
| 2514 | /// * `None` - No connection or sync failed to start | ||
| 2515 | async fn historic_sync( | ||
| 2516 | &mut self, | ||
| 2517 | relay_url: &str, | ||
| 2518 | filters: Vec<Filter>, | ||
| 2519 | items: PendingItems, | ||
| 2520 | since: Option<Timestamp>, | ||
| 2521 | ) -> Option<u64> { | ||
| 2522 | if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() { | ||
| 2523 | tracing::debug!( | ||
| 2524 | relay = %relay_url, | ||
| 2525 | "historic_sync called with empty filters and items, skipping" | ||
| 2526 | ); | ||
| 2527 | return None; | ||
| 2528 | } | ||
| 2529 | |||
| 2530 | // Check connection exists | ||
| 2531 | let connection = match self.connections.get(relay_url) { | ||
| 2532 | Some(conn) => conn, | ||
| 2533 | None => { | ||
| 2534 | tracing::warn!( | ||
| 2535 | relay = %relay_url, | ||
| 2536 | "No connection found for historic_sync" | ||
| 2537 | ); | ||
| 2538 | return None; | ||
| 2539 | } | ||
| 2540 | }; | ||
| 2541 | |||
| 2542 | // Apply since filter if provided | ||
| 2543 | let filters_with_since: Vec<Filter> = if let Some(ts) = since { | ||
| 2544 | filters.into_iter().map(|f| f.since(ts)).collect() | ||
| 2545 | } else { | ||
| 2546 | filters | ||
| 2547 | }; | ||
| 2548 | |||
| 2549 | // Check if we should use negentropy | ||
| 2550 | let use_negentropy = | ||
| 2551 | !self.config.sync_disable_negentropy && connection.supports_negentropy().await; | ||
| 2552 | |||
| 2553 | // Generate batch ID | ||
| 2554 | let batch_id = self.next_batch_id(); | ||
| 2555 | |||
| 2556 | if use_negentropy && !filters_with_since.is_empty() { | ||
| 2557 | // NIP-77 negentropy path | ||
| 2558 | tracing::debug!( | ||
| 2559 | relay = %relay_url, | ||
| 2560 | batch_id = batch_id, | ||
| 2561 | filter_count = filters_with_since.len(), | ||
| 2562 | repos = items.repos.len(), | ||
| 2563 | root_events = items.root_events.len(), | ||
| 2564 | "Starting historic_sync with negentropy" | ||
| 2565 | ); | ||
| 2566 | |||
| 2567 | // Create PendingBatch for negentropy (empty outstanding_subs) | ||
| 2568 | let batch = PendingBatch { | ||
| 2569 | batch_id, | ||
| 2570 | items: items.clone(), | ||
| 2571 | outstanding_subs: HashSet::new(), | ||
| 2572 | sync_method: SyncMethod::Negentropy, | ||
| 2573 | }; | ||
| 2574 | |||
| 2575 | // Add to pending_sync_index | ||
| 2576 | { | ||
| 2577 | let mut pending = self.pending_sync_index.write().await; | ||
| 2578 | pending | ||
| 2579 | .entry(relay_url.to_string()) | ||
| 2580 | .or_insert_with(Vec::new) | ||
| 2581 | .push(batch); | ||
| 2582 | } | ||
| 2583 | |||
| 2584 | // Perform negentropy sync for each filter | ||
| 2585 | // Note: We sync each filter separately because negentropy works on a single filter | ||
| 2586 | let mut total_received = 0; | ||
| 2587 | let mut any_success = false; | ||
| 2588 | |||
| 2589 | for filter in &filters_with_since { | ||
| 2590 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 2591 | match conn.negentropy_sync_filter(filter.clone()).await { | ||
| 2592 | Ok(result) => { | ||
| 2593 | total_received += result.received.len(); | ||
| 2594 | any_success = true; | ||
| 2595 | |||
| 2596 | // Record metrics for received events | ||
| 2597 | if let Some(ref metrics) = self.metrics { | ||
| 2598 | for _ in 0..result.received.len() { | ||
| 2599 | metrics.record_event(event_source::STARTUP); | ||
| 2600 | } | ||
| 2601 | } | ||
| 2602 | } | ||
| 2603 | Err(e) => { | ||
| 2604 | tracing::warn!( | ||
| 2605 | relay = %relay_url, | ||
| 2606 | error = %e, | ||
| 2607 | "Negentropy sync failed for filter in historic_sync" | ||
| 2608 | ); | ||
| 2609 | } | ||
| 2610 | } | ||
| 2611 | } | ||
| 2612 | } | ||
| 2613 | |||
| 2614 | if any_success { | ||
| 2615 | // Remove batch from pending and confirm it | ||
| 2616 | let completed_batch = { | ||
| 2617 | let mut pending = self.pending_sync_index.write().await; | ||
| 2618 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 2619 | let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); | ||
| 2620 | if let Some(idx) = batch_idx { | ||
| 2621 | let batch = batches.remove(idx); | ||
| 2622 | if batches.is_empty() { | ||
| 2623 | pending.remove(relay_url); | ||
| 2624 | } | ||
| 2625 | Some(batch) | ||
| 2626 | } else { | ||
| 2627 | None | ||
| 2628 | } | ||
| 2629 | } else { | ||
| 2630 | None | ||
| 2631 | } | ||
| 2632 | }; | ||
| 2633 | |||
| 2634 | if let Some(batch) = completed_batch { | ||
| 2635 | self.confirm_batch(relay_url, batch).await; | ||
| 2636 | } | ||
| 2637 | |||
| 2638 | tracing::info!( | ||
| 2639 | relay = %relay_url, | ||
| 2640 | batch_id = batch_id, | ||
| 2641 | total_received = total_received, | ||
| 2642 | "historic_sync (negentropy) completed" | ||
| 2643 | ); | ||
| 2644 | } else { | ||
| 2645 | // All negentropy syncs failed - remove the pending batch | ||
| 2646 | let mut pending = self.pending_sync_index.write().await; | ||
| 2647 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 2648 | batches.retain(|b| b.batch_id != batch_id); | ||
| 2649 | if batches.is_empty() { | ||
| 2650 | pending.remove(relay_url); | ||
| 2651 | } | ||
| 2652 | } | ||
| 2653 | |||
| 2654 | tracing::warn!( | ||
| 2655 | relay = %relay_url, | ||
| 2656 | batch_id = batch_id, | ||
| 2657 | "historic_sync (negentropy) failed for all filters" | ||
| 2658 | ); | ||
| 2659 | return None; | ||
| 2660 | } | ||
| 2661 | } else { | ||
| 2662 | // Traditional REQ+EOSE path | ||
| 2663 | tracing::debug!( | ||
| 2664 | relay = %relay_url, | ||
| 2665 | batch_id = batch_id, | ||
| 2666 | filter_count = filters_with_since.len(), | ||
| 2667 | repos = items.repos.len(), | ||
| 2668 | root_events = items.root_events.len(), | ||
| 2669 | use_negentropy = use_negentropy, | ||
| 2670 | "Starting historic_sync with REQ+EOSE" | ||
| 2671 | ); | ||
| 2672 | |||
| 2673 | // Subscribe to each filter and collect subscription IDs | ||
| 2674 | let mut subscription_ids = HashSet::new(); | ||
| 2675 | |||
| 2676 | for filter in &filters_with_since { | ||
| 2677 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 2678 | match conn.subscribe_filter(filter.clone()).await { | ||
| 2679 | Ok(sub_id) => { | ||
| 2680 | subscription_ids.insert(sub_id); | ||
| 2681 | } | ||
| 2682 | Err(e) => { | ||
| 2683 | tracing::error!( | ||
| 2684 | relay = %relay_url, | ||
| 2685 | error = %e, | ||
| 2686 | "Failed to subscribe to filter in historic_sync" | ||
| 2687 | ); | ||
| 2688 | } | ||
| 2689 | } | ||
| 2690 | } | ||
| 2691 | } | ||
| 2692 | |||
| 2693 | if subscription_ids.is_empty() && !filters_with_since.is_empty() { | ||
| 2694 | tracing::warn!( | ||
| 2695 | relay = %relay_url, | ||
| 2696 | "All filter subscriptions failed in historic_sync" | ||
| 2697 | ); | ||
| 2698 | return None; | ||
| 2699 | } | ||
| 2700 | |||
| 2701 | // Create PendingBatch for REQ+EOSE | ||
| 2702 | let batch = PendingBatch { | ||
| 2703 | batch_id, | ||
| 2704 | items, | ||
| 2705 | outstanding_subs: subscription_ids, | ||
| 2706 | sync_method: SyncMethod::ReqEose, | ||
| 2707 | }; | ||
| 2708 | |||
| 2709 | // Add to pending_sync_index | ||
| 2710 | { | ||
| 2711 | let mut pending = self.pending_sync_index.write().await; | ||
| 2712 | pending | ||
| 2713 | .entry(relay_url.to_string()) | ||
| 2714 | .or_insert_with(Vec::new) | ||
| 2715 | .push(batch); | ||
| 2716 | } | ||
| 2717 | |||
| 2718 | tracing::debug!( | ||
| 2719 | relay = %relay_url, | ||
| 2720 | batch_id = batch_id, | ||
| 2721 | "historic_sync (REQ+EOSE) batch created, awaiting EOSE" | ||
| 2722 | ); | ||
| 2723 | } | ||
| 2724 | |||
| 2725 | Some(batch_id) | ||
| 2726 | } | ||
| 2727 | |||
| 2098 | /// Gracefully shutdown the SyncManager | 2728 | /// Gracefully shutdown the SyncManager |
| 2099 | /// | 2729 | /// |
| 2100 | /// This method: | 2730 | /// This method: |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 0379fe4..9643fc0 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -499,7 +499,7 @@ impl SelfSubscriber { | |||
| 499 | drop(index); // Release lock before async operations | 499 | drop(index); // Release lock before async operations |
| 500 | 500 | ||
| 501 | // For each relay, send AddFilters action directly | 501 | // For each relay, send AddFilters action directly |
| 502 | // SyncManager's handle_add_filters auto-spawns connection for unknown relays | 502 | // SyncManager's handle_new_sync_filters auto-spawns connection for unknown relays |
| 503 | for (relay_url, needs) in targets { | 503 | for (relay_url, needs) in targets { |
| 504 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) | 504 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) |
| 505 | if relay_url.contains(&self.relay_domain) { | 505 | if relay_url.contains(&self.relay_domain) { |
| @@ -519,8 +519,10 @@ impl SelfSubscriber { | |||
| 519 | 519 | ||
| 520 | let action = AddFilters { | 520 | let action = AddFilters { |
| 521 | relay_url: relay_url.clone(), | 521 | relay_url: relay_url.clone(), |
| 522 | repos: needs.repos, | 522 | items: crate::sync::PendingItems { |
| 523 | root_events: needs.root_events, | 523 | repos: needs.repos, |
| 524 | root_events: needs.root_events, | ||
| 525 | }, | ||
| 524 | filters, | 526 | filters, |
| 525 | }; | 527 | }; |
| 526 | 528 | ||