diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 14:18:05 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 14:18:05 +0000 |
| commit | 4e5a937a4ef5288e702ba2bae3daf2a78398b690 (patch) | |
| tree | f05bbd0cc51325e7a18db27271bfc63e6f93a0d4 /docs/explanation/grasp-02-proactive-sync-v4.md | |
| parent | fb8928f626e81f78e13e642009de9a86ea100487 (diff) | |
fix docs
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v4.md')
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync-v4.md | 314 |
1 files changed, 160 insertions, 154 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md index 5ac92cd..dd508b3 100644 --- a/docs/explanation/grasp-02-proactive-sync-v4.md +++ b/docs/explanation/grasp-02-proactive-sync-v4.md | |||
| @@ -64,8 +64,6 @@ pub struct RelayState { | |||
| 64 | pub last_connected: Option<Timestamp>, | 64 | pub last_connected: Option<Timestamp>, |
| 65 | /// When we disconnected - for 15-minute state retention rule | 65 | /// When we disconnected - for 15-minute state retention rule |
| 66 | pub disconnected_at: Option<Timestamp>, | 66 | pub disconnected_at: Option<Timestamp>, |
| 67 | /// The active connection - None if disconnected | ||
| 68 | pub connection: Option<RelayConnection>, | ||
| 69 | } | 67 | } |
| 70 | 68 | ||
| 71 | impl RelayState { | 69 | impl RelayState { |
| @@ -79,7 +77,7 @@ impl RelayState { | |||
| 79 | None => false, // Still connected or never connected | 77 | None => false, // Still connected or never connected |
| 80 | } | 78 | } |
| 81 | } | 79 | } |
| 82 | 80 | ||
| 83 | /// Clear repos and root_events - called when reconnect takes > 15 minutes | 81 | /// Clear repos and root_events - called when reconnect takes > 15 minutes |
| 84 | pub fn clear_sync_state(&mut self) { | 82 | pub fn clear_sync_state(&mut self) { |
| 85 | self.repos.clear(); | 83 | self.repos.clear(); |
| @@ -125,7 +123,7 @@ stateDiagram-v2 | |||
| 125 | Connecting --> Disconnected: failure + record in health tracker | 123 | Connecting --> Disconnected: failure + record in health tracker |
| 126 | Connected --> Disconnected: connection lost | 124 | Connected --> Disconnected: connection lost |
| 127 | Connected --> [*]: intentional disconnect via check_disconnects | 125 | Connected --> [*]: intentional disconnect via check_disconnects |
| 128 | 126 | ||
| 129 | note right of Disconnected: disconnected_at set for 15min rule | 127 | note right of Disconnected: disconnected_at set for 15min rule |
| 130 | note right of Connected: last_connected tracked for since filter | 128 | note right of Connected: last_connected tracked for since filter |
| 131 | ``` | 129 | ``` |
| @@ -148,13 +146,13 @@ flowchart TB | |||
| 148 | SPAWN --> |no| CONN[spawn_connection] | 146 | SPAWN --> |no| CONN[spawn_connection] |
| 149 | CONN --> HC[handle_connect_or_reconnect] | 147 | CONN --> HC[handle_connect_or_reconnect] |
| 150 | SPAWN --> |yes| SUB | 148 | SPAWN --> |yes| SUB |
| 151 | 149 | ||
| 152 | subgraph handle_connect_or_reconnect - Fresh Sync | 150 | subgraph handle_connect_or_reconnect - Fresh Sync |
| 153 | HC --> CHECK_FRESH{is_fresh_sync?} | 151 | HC --> CHECK_FRESH{is_fresh_sync?} |
| 154 | CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] | 152 | CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] |
| 155 | L1 --> RCA[recompute_actions_for_relay] | 153 | L1 --> RCA[recompute_actions_for_relay] |
| 156 | end | 154 | end |
| 157 | 155 | ||
| 158 | RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] | 156 | RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] |
| 159 | SUB --> PB[Create PendingBatch] | 157 | SUB --> PB[Create PendingBatch] |
| 160 | PB --> EOSE[Wait for EOSE] | 158 | PB --> EOSE[Wait for EOSE] |
| @@ -162,6 +160,7 @@ flowchart TB | |||
| 162 | ``` | 160 | ``` |
| 163 | 161 | ||
| 164 | **Key points:** | 162 | **Key points:** |
| 163 | |||
| 165 | - No `since` filter on initial connect - get full history | 164 | - No `since` filter on initial connect - get full history |
| 166 | - `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` | 165 | - `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` |
| 167 | - Layer 1: `build_announcement_filter(None)` - subscribed immediately without since | 166 | - Layer 1: `build_announcement_filter(None)` - subscribed immediately without since |
| @@ -176,7 +175,7 @@ flowchart TB | |||
| 176 | CLEAR_PEND --> WAIT[Wait for reconnection] | 175 | CLEAR_PEND --> WAIT[Wait for reconnection] |
| 177 | WAIT --> RECONN[Connection restored] | 176 | WAIT --> RECONN[Connection restored] |
| 178 | RECONN --> HC[handle_connect_or_reconnect] | 177 | RECONN --> HC[handle_connect_or_reconnect] |
| 179 | 178 | ||
| 180 | subgraph handle_connect_or_reconnect - Quick Reconnect | 179 | subgraph handle_connect_or_reconnect - Quick Reconnect |
| 181 | HC --> CHECK{is_fresh_sync?} | 180 | HC --> CHECK{is_fresh_sync?} |
| 182 | CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] | 181 | CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] |
| @@ -184,7 +183,7 @@ flowchart TB | |||
| 184 | L1 --> L23[rebuild_layer2_and_layer3 - with since] | 183 | L1 --> L23[rebuild_layer2_and_layer3 - with since] |
| 185 | L23 --> RCA[recompute_actions_for_relay] | 184 | L23 --> RCA[recompute_actions_for_relay] |
| 186 | end | 185 | end |
| 187 | 186 | ||
| 188 | RCA --> AF[AddFilters for new items only] | 187 | RCA --> AF[AddFilters for new items only] |
| 189 | AF --> SUB[Subscribe] | 188 | AF --> SUB[Subscribe] |
| 190 | SUB --> PB[Create PendingBatch] | 189 | SUB --> PB[Create PendingBatch] |
| @@ -193,6 +192,7 @@ flowchart TB | |||
| 193 | ``` | 192 | ``` |
| 194 | 193 | ||
| 195 | **Key points:** | 194 | **Key points:** |
| 195 | |||
| 196 | - PendingSyncIndex cleared on disconnect (not reconnect) | 196 | - PendingSyncIndex cleared on disconnect (not reconnect) |
| 197 | - `handle_connect_or_reconnect`: | 197 | - `handle_connect_or_reconnect`: |
| 198 | 1. `build_announcement_filter(Some(since))` - Layer 1 with since | 198 | 1. `build_announcement_filter(Some(since))` - Layer 1 with since |
| @@ -205,14 +205,14 @@ flowchart TB | |||
| 205 | ```mermaid | 205 | ```mermaid |
| 206 | flowchart TB | 206 | flowchart TB |
| 207 | RECONN[Connection restored] --> HC[handle_connect_or_reconnect] | 207 | RECONN[Connection restored] --> HC[handle_connect_or_reconnect] |
| 208 | 208 | ||
| 209 | subgraph handle_connect_or_reconnect - Stale Reconnect | 209 | subgraph handle_connect_or_reconnect - Stale Reconnect |
| 210 | HC --> CHECK{is_fresh_sync?} | 210 | HC --> CHECK{is_fresh_sync?} |
| 211 | CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] | 211 | CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] |
| 212 | CLEAR --> L1[build_announcement_filter - no since] | 212 | CLEAR --> L1[build_announcement_filter - no since] |
| 213 | L1 --> RCA[recompute_actions_for_relay] | 213 | L1 --> RCA[recompute_actions_for_relay] |
| 214 | end | 214 | end |
| 215 | 215 | ||
| 216 | RCA --> CA[compute_actions with empty confirmed] | 216 | RCA --> CA[compute_actions with empty confirmed] |
| 217 | CA --> AF[AddFilters for everything] | 217 | CA --> AF[AddFilters for everything] |
| 218 | AF --> SUB[Subscribe - no since filter] | 218 | AF --> SUB[Subscribe - no since filter] |
| @@ -222,6 +222,7 @@ flowchart TB | |||
| 222 | ``` | 222 | ``` |
| 223 | 223 | ||
| 224 | **Key points:** | 224 | **Key points:** |
| 225 | |||
| 225 | - `should_clear_state()` returns true → triggers fresh sync | 226 | - `should_clear_state()` returns true → triggers fresh sync |
| 226 | - Same path as initial connect after clearing state | 227 | - Same path as initial connect after clearing state |
| 227 | - Layer 1: `build_announcement_filter(None)` - full history | 228 | - Layer 1: `build_announcement_filter(None)` - full history |
| @@ -243,6 +244,7 @@ flowchart TB | |||
| 243 | ``` | 244 | ``` |
| 244 | 245 | ||
| 245 | **Key points:** | 246 | **Key points:** |
| 247 | |||
| 246 | - Consolidation checked in `handle_add_filters` BEFORE adding new filters | 248 | - Consolidation checked in `handle_add_filters` BEFORE adding new filters |
| 247 | - After closing all subscriptions, re-subscribe: | 249 | - After closing all subscriptions, re-subscribe: |
| 248 | 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since | 250 | 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since |
| @@ -268,6 +270,7 @@ flowchart TB | |||
| 268 | ``` | 270 | ``` |
| 269 | 271 | ||
| 270 | **Key points:** | 272 | **Key points:** |
| 273 | |||
| 271 | - Daily timer is a full fresh sync, NOT consolidation | 274 | - Daily timer is a full fresh sync, NOT consolidation |
| 272 | - Clears both PendingSyncIndex and confirmed state | 275 | - Clears both PendingSyncIndex and confirmed state |
| 273 | - Layer 1: `build_announcement_filter(None)` - full history | 276 | - Layer 1: `build_announcement_filter(None)` - full history |
| @@ -288,7 +291,7 @@ fn derive_relay_targets( | |||
| 288 | repo_index: &HashMap<String, RepoSyncNeeds> | 291 | repo_index: &HashMap<String, RepoSyncNeeds> |
| 289 | ) -> HashMap<String, RelaySyncNeeds> { | 292 | ) -> HashMap<String, RelaySyncNeeds> { |
| 290 | let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new(); | 293 | let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new(); |
| 291 | 294 | ||
| 292 | for (repo_ref, needs) in repo_index { | 295 | for (repo_ref, needs) in repo_index { |
| 293 | for relay_url in &needs.relays { | 296 | for relay_url in &needs.relays { |
| 294 | let target = targets.entry(relay_url.clone()).or_default(); | 297 | let target = targets.entry(relay_url.clone()).or_default(); |
| @@ -296,7 +299,7 @@ fn derive_relay_targets( | |||
| 296 | target.root_events.extend(needs.root_events.iter().cloned()); | 299 | target.root_events.extend(needs.root_events.iter().cloned()); |
| 297 | } | 300 | } |
| 298 | } | 301 | } |
| 299 | 302 | ||
| 300 | targets | 303 | targets |
| 301 | } | 304 | } |
| 302 | ``` | 305 | ``` |
| @@ -316,7 +319,7 @@ fn compute_actions( | |||
| 316 | confirmed: &HashMap<String, RelayState>, | 319 | confirmed: &HashMap<String, RelayState>, |
| 317 | ) -> Vec<AddFilters> { | 320 | ) -> Vec<AddFilters> { |
| 318 | let mut actions = Vec::new(); | 321 | let mut actions = Vec::new(); |
| 319 | 322 | ||
| 320 | for (relay_url, target) in targets { | 323 | for (relay_url, target) in targets { |
| 321 | // Skip disconnected relays - they will get AddFilters on reconnect | 324 | // Skip disconnected relays - they will get AddFilters on reconnect |
| 322 | if let Some(state) = confirmed.get(relay_url) { | 325 | if let Some(state) = confirmed.get(relay_url) { |
| @@ -324,7 +327,7 @@ fn compute_actions( | |||
| 324 | continue; | 327 | continue; |
| 325 | } | 328 | } |
| 326 | } | 329 | } |
| 327 | 330 | ||
| 328 | // Collect all pending items for this relay | 331 | // Collect all pending items for this relay |
| 329 | let pending_repos: HashSet<_> = pending.get(relay_url) | 332 | let pending_repos: HashSet<_> = pending.get(relay_url) |
| 330 | .map(|batches| batches.iter() | 333 | .map(|batches| batches.iter() |
| @@ -336,7 +339,7 @@ fn compute_actions( | |||
| 336 | .flat_map(|b| b.items.root_events.iter().cloned()) | 339 | .flat_map(|b| b.items.root_events.iter().cloned()) |
| 337 | .collect()) | 340 | .collect()) |
| 338 | .unwrap_or_default(); | 341 | .unwrap_or_default(); |
| 339 | 342 | ||
| 340 | // Collect confirmed items for this relay | 343 | // Collect confirmed items for this relay |
| 341 | let confirmed_repos = confirmed.get(relay_url) | 344 | let confirmed_repos = confirmed.get(relay_url) |
| 342 | .map(|c| &c.repos) | 345 | .map(|c| &c.repos) |
| @@ -344,7 +347,7 @@ fn compute_actions( | |||
| 344 | let confirmed_events = confirmed.get(relay_url) | 347 | let confirmed_events = confirmed.get(relay_url) |
| 345 | .map(|c| &c.root_events) | 348 | .map(|c| &c.root_events) |
| 346 | .unwrap_or(&HashSet::new()); | 349 | .unwrap_or(&HashSet::new()); |
| 347 | 350 | ||
| 348 | // New = target - pending - confirmed | 351 | // New = target - pending - confirmed |
| 349 | let new_repos: HashSet<_> = target.repos.iter() | 352 | let new_repos: HashSet<_> = target.repos.iter() |
| 350 | .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) | 353 | .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) |
| @@ -354,7 +357,7 @@ fn compute_actions( | |||
| 354 | .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) | 357 | .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) |
| 355 | .cloned() | 358 | .cloned() |
| 356 | .collect(); | 359 | .collect(); |
| 357 | 360 | ||
| 358 | if !new_repos.is_empty() || !new_events.is_empty() { | 361 | if !new_repos.is_empty() || !new_events.is_empty() { |
| 359 | let filters = build_filters(&new_repos, &new_events); | 362 | let filters = build_filters(&new_repos, &new_events); |
| 360 | actions.push(AddFilters { | 363 | actions.push(AddFilters { |
| @@ -365,7 +368,7 @@ fn compute_actions( | |||
| 365 | }); | 368 | }); |
| 366 | } | 369 | } |
| 367 | } | 370 | } |
| 368 | 371 | ||
| 369 | actions | 372 | actions |
| 370 | } | 373 | } |
| 371 | ``` | 374 | ``` |
| @@ -373,6 +376,7 @@ fn compute_actions( | |||
| 373 | ### 3. Filter Building Functions (Three-Layer Strategy) | 376 | ### 3. Filter Building Functions (Three-Layer Strategy) |
| 374 | 377 | ||
| 375 | The filter strategy uses three layers: | 378 | The filter strategy uses three layers: |
| 379 | |||
| 376 | - **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation | 380 | - **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation |
| 377 | - **Layer 2**: Events tagging our repos | 381 | - **Layer 2**: Events tagging our repos |
| 378 | - **Layer 3**: Events tagging our root events | 382 | - **Layer 3**: Events tagging our root events |
| @@ -388,7 +392,7 @@ fn build_announcement_filter(since: Option<Timestamp>) -> Filter { | |||
| 388 | Kind::Custom(30617), // Repository announcements | 392 | Kind::Custom(30617), // Repository announcements |
| 389 | Kind::Custom(30618), // Maintainer lists | 393 | Kind::Custom(30618), // Maintainer lists |
| 390 | ]); | 394 | ]); |
| 391 | 395 | ||
| 392 | match since { | 396 | match since { |
| 393 | Some(ts) => filter.since(ts), | 397 | Some(ts) => filter.since(ts), |
| 394 | None => filter, | 398 | None => filter, |
| @@ -404,10 +408,10 @@ fn tagged_one_of_our_repo_event_filters( | |||
| 404 | ) -> Vec<Filter> { | 408 | ) -> Vec<Filter> { |
| 405 | let mut filters = Vec::new(); | 409 | let mut filters = Vec::new(); |
| 406 | let repo_refs: Vec<_> = repos.iter().collect(); | 410 | let repo_refs: Vec<_> = repos.iter().collect(); |
| 407 | 411 | ||
| 408 | for chunk in repo_refs.chunks(100) { | 412 | for chunk in repo_refs.chunks(100) { |
| 409 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); | 413 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); |
| 410 | 414 | ||
| 411 | // Lowercase 'a' tag - standard addressable reference | 415 | // Lowercase 'a' tag - standard addressable reference |
| 412 | let mut f1 = Filter::new() | 416 | let mut f1 = Filter::new() |
| 413 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); | 417 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); |
| @@ -417,18 +421,18 @@ fn tagged_one_of_our_repo_event_filters( | |||
| 417 | // Quote 'q' tag - NIP-10 quote references to addressable events | 421 | // Quote 'q' tag - NIP-10 quote references to addressable events |
| 418 | let mut f3 = Filter::new() | 422 | let mut f3 = Filter::new() |
| 419 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); | 423 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); |
| 420 | 424 | ||
| 421 | if let Some(ts) = since { | 425 | if let Some(ts) = since { |
| 422 | f1 = f1.since(ts); | 426 | f1 = f1.since(ts); |
| 423 | f2 = f2.since(ts); | 427 | f2 = f2.since(ts); |
| 424 | f3 = f3.since(ts); | 428 | f3 = f3.since(ts); |
| 425 | } | 429 | } |
| 426 | 430 | ||
| 427 | filters.push(f1); | 431 | filters.push(f1); |
| 428 | filters.push(f2); | 432 | filters.push(f2); |
| 429 | filters.push(f3); | 433 | filters.push(f3); |
| 430 | } | 434 | } |
| 431 | 435 | ||
| 432 | filters | 436 | filters |
| 433 | } | 437 | } |
| 434 | 438 | ||
| @@ -441,10 +445,10 @@ fn tagged_one_of_our_root_event_filters( | |||
| 441 | ) -> Vec<Filter> { | 445 | ) -> Vec<Filter> { |
| 442 | let mut filters = Vec::new(); | 446 | let mut filters = Vec::new(); |
| 443 | let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); | 447 | let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); |
| 444 | 448 | ||
| 445 | for chunk in event_ids.chunks(100) { | 449 | for chunk in event_ids.chunks(100) { |
| 446 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); | 450 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); |
| 447 | 451 | ||
| 448 | // Lowercase 'e' tag - standard event reference | 452 | // Lowercase 'e' tag - standard event reference |
| 449 | let mut f1 = Filter::new() | 453 | let mut f1 = Filter::new() |
| 450 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); | 454 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); |
| @@ -454,18 +458,18 @@ fn tagged_one_of_our_root_event_filters( | |||
| 454 | // Quote 'q' tag - NIP-10 quote references to events | 458 | // Quote 'q' tag - NIP-10 quote references to events |
| 455 | let mut f3 = Filter::new() | 459 | let mut f3 = Filter::new() |
| 456 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); | 460 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); |
| 457 | 461 | ||
| 458 | if let Some(ts) = since { | 462 | if let Some(ts) = since { |
| 459 | f1 = f1.since(ts); | 463 | f1 = f1.since(ts); |
| 460 | f2 = f2.since(ts); | 464 | f2 = f2.since(ts); |
| 461 | f3 = f3.since(ts); | 465 | f3 = f3.since(ts); |
| 462 | } | 466 | } |
| 463 | 467 | ||
| 464 | filters.push(f1); | 468 | filters.push(f1); |
| 465 | filters.push(f2); | 469 | filters.push(f2); |
| 466 | filters.push(f3); | 470 | filters.push(f3); |
| 467 | } | 471 | } |
| 468 | 472 | ||
| 469 | filters | 473 | filters |
| 470 | } | 474 | } |
| 471 | 475 | ||
| @@ -493,7 +497,7 @@ fn build_layer2_and_layer3_filters( | |||
| 493 | impl SyncManager { | 497 | impl SyncManager { |
| 494 | async fn handle_add_filters(&mut self, action: AddFilters) { | 498 | async fn handle_add_filters(&mut self, action: AddFilters) { |
| 495 | let AddFilters { relay_url, repos, root_events, filters } = action; | 499 | let AddFilters { relay_url, repos, root_events, filters } = action; |
| 496 | 500 | ||
| 497 | // Auto-spawn connection if needed | 501 | // Auto-spawn connection if needed |
| 498 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); | 502 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); |
| 499 | match state { | 503 | match state { |
| @@ -522,11 +526,11 @@ impl SyncManager { | |||
| 522 | // Already connected - proceed with subscription | 526 | // Already connected - proceed with subscription |
| 523 | } | 527 | } |
| 524 | } | 528 | } |
| 525 | 529 | ||
| 526 | // Subscribe and collect subscription IDs | 530 | // Subscribe and collect subscription IDs |
| 527 | let conn = self.connections.get(&relay_url).unwrap(); | 531 | let conn = self.connections.get(&relay_url).unwrap(); |
| 528 | let mut sub_ids = HashSet::new(); | 532 | let mut sub_ids = HashSet::new(); |
| 529 | 533 | ||
| 530 | for filter in filters { | 534 | for filter in filters { |
| 531 | match conn.client.subscribe(filter, None).await { | 535 | match conn.client.subscribe(filter, None).await { |
| 532 | Ok(output) => { | 536 | Ok(output) => { |
| @@ -539,14 +543,14 @@ impl SyncManager { | |||
| 539 | } | 543 | } |
| 540 | } | 544 | } |
| 541 | } | 545 | } |
| 542 | 546 | ||
| 543 | // Create pending batch | 547 | // Create pending batch |
| 544 | let batch = PendingBatch { | 548 | let batch = PendingBatch { |
| 545 | batch_id: self.next_batch_id(), | 549 | batch_id: self.next_batch_id(), |
| 546 | items: PendingItems { repos, root_events }, | 550 | items: PendingItems { repos, root_events }, |
| 547 | outstanding_subs: sub_ids, | 551 | outstanding_subs: sub_ids, |
| 548 | }; | 552 | }; |
| 549 | 553 | ||
| 550 | // Add to pending index | 554 | // Add to pending index |
| 551 | self.pending_sync_index.write().await | 555 | self.pending_sync_index.write().await |
| 552 | .entry(relay_url) | 556 | .entry(relay_url) |
| @@ -563,19 +567,19 @@ impl SyncManager { | |||
| 563 | /// Called when connection to a relay is lost | 567 | /// Called when connection to a relay is lost |
| 564 | async fn handle_disconnect(&mut self, relay_url: &str) { | 568 | async fn handle_disconnect(&mut self, relay_url: &str) { |
| 565 | let mut index = self.relay_sync_index.write().await; | 569 | let mut index = self.relay_sync_index.write().await; |
| 566 | 570 | ||
| 567 | if let Some(state) = index.get_mut(relay_url) { | 571 | if let Some(state) = index.get_mut(relay_url) { |
| 568 | state.connection_status = ConnectionStatus::Disconnected; | 572 | state.connection_status = ConnectionStatus::Disconnected; |
| 569 | state.disconnected_at = Some(Timestamp::now()); | 573 | state.disconnected_at = Some(Timestamp::now()); |
| 570 | state.connection = None; | 574 | state.connection = None; |
| 571 | } | 575 | } |
| 572 | 576 | ||
| 573 | // Clear pending batches - these items were not confirmed | 577 | // Clear pending batches - these items were not confirmed |
| 574 | self.pending_sync_index.write().await.remove(relay_url); | 578 | self.pending_sync_index.write().await.remove(relay_url); |
| 575 | 579 | ||
| 576 | // Remove from active connections map | 580 | // Remove from active connections map |
| 577 | self.connections.remove(relay_url); | 581 | self.connections.remove(relay_url); |
| 578 | 582 | ||
| 579 | // Health tracker records failure for backoff | 583 | // Health tracker records failure for backoff |
| 580 | self.health_tracker.record_failure(relay_url); | 584 | self.health_tracker.record_failure(relay_url); |
| 581 | } | 585 | } |
| @@ -599,39 +603,39 @@ impl SyncManager { | |||
| 599 | Some(s) => s, | 603 | Some(s) => s, |
| 600 | None => return, // Relay was removed while disconnected | 604 | None => return, // Relay was removed while disconnected |
| 601 | }; | 605 | }; |
| 602 | 606 | ||
| 603 | // Determine if this is a fresh sync or quick reconnect | 607 | // Determine if this is a fresh sync or quick reconnect |
| 604 | let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); | 608 | let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); |
| 605 | let last_connected = state.last_connected; | 609 | let last_connected = state.last_connected; |
| 606 | 610 | ||
| 607 | if is_fresh_sync && state.last_connected.is_some() { | 611 | if is_fresh_sync && state.last_connected.is_some() { |
| 608 | // Stale reconnect (>15min) - clear state | 612 | // Stale reconnect (>15min) - clear state |
| 609 | tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); | 613 | tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); |
| 610 | state.clear_sync_state(); | 614 | state.clear_sync_state(); |
| 611 | } | 615 | } |
| 612 | 616 | ||
| 613 | // Update connection state | 617 | // Update connection state |
| 614 | state.connection_status = ConnectionStatus::Connected; | 618 | state.connection_status = ConnectionStatus::Connected; |
| 615 | state.last_connected = Some(Timestamp::now()); | 619 | state.last_connected = Some(Timestamp::now()); |
| 616 | state.disconnected_at = None; | 620 | state.disconnected_at = None; |
| 617 | 621 | ||
| 618 | // Record success in health tracker | 622 | // Record success in health tracker |
| 619 | self.health_tracker.record_success(relay_url); | 623 | self.health_tracker.record_success(relay_url); |
| 620 | 624 | ||
| 621 | drop(index); // Release lock | 625 | drop(index); // Release lock |
| 622 | 626 | ||
| 623 | let conn = match self.connections.get(relay_url) { | 627 | let conn = match self.connections.get(relay_url) { |
| 624 | Some(c) => c, | 628 | Some(c) => c, |
| 625 | None => return, | 629 | None => return, |
| 626 | }; | 630 | }; |
| 627 | 631 | ||
| 628 | if is_fresh_sync { | 632 | if is_fresh_sync { |
| 629 | // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions | 633 | // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions |
| 630 | 634 | ||
| 631 | // Step 1: Subscribe Layer 1 (announcements) without since | 635 | // Step 1: Subscribe Layer 1 (announcements) without since |
| 632 | let layer1 = build_announcement_filter(None); | 636 | let layer1 = build_announcement_filter(None); |
| 633 | let _ = conn.client.subscribe(layer1, None).await; | 637 | let _ = conn.client.subscribe(layer1, None).await; |
| 634 | 638 | ||
| 635 | // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) | 639 | // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) |
| 636 | self.recompute_actions_for_relay(relay_url).await; | 640 | self.recompute_actions_for_relay(relay_url).await; |
| 637 | } else { | 641 | } else { |
| @@ -639,19 +643,19 @@ impl SyncManager { | |||
| 639 | let since = last_connected | 643 | let since = last_connected |
| 640 | .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) | 644 | .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) |
| 641 | .unwrap_or(Timestamp::from(0)); | 645 | .unwrap_or(Timestamp::from(0)); |
| 642 | 646 | ||
| 643 | // Step 1: Subscribe Layer 1 (announcements) with since | 647 | // Step 1: Subscribe Layer 1 (announcements) with since |
| 644 | let layer1 = build_announcement_filter(Some(since)); | 648 | let layer1 = build_announcement_filter(Some(since)); |
| 645 | let _ = conn.client.subscribe(layer1, None).await; | 649 | let _ = conn.client.subscribe(layer1, None).await; |
| 646 | 650 | ||
| 647 | // Step 2: Rebuild Layer 2+3 for confirmed items with since | 651 | // Step 2: Rebuild Layer 2+3 for confirmed items with since |
| 648 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | 652 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; |
| 649 | 653 | ||
| 650 | // Step 3: Check for NEW items via compute_actions | 654 | // Step 3: Check for NEW items via compute_actions |
| 651 | self.recompute_actions_for_relay(relay_url).await; | 655 | self.recompute_actions_for_relay(relay_url).await; |
| 652 | } | 656 | } |
| 653 | } | 657 | } |
| 654 | 658 | ||
| 655 | /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). | 659 | /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). |
| 656 | /// Used by: | 660 | /// Used by: |
| 657 | /// - Quick reconnect: rebuild confirmed items with since filter | 661 | /// - Quick reconnect: rebuild confirmed items with since filter |
| @@ -662,46 +666,46 @@ impl SyncManager { | |||
| 662 | Some(s) => s, | 666 | Some(s) => s, |
| 663 | None => return, | 667 | None => return, |
| 664 | }; | 668 | }; |
| 665 | 669 | ||
| 666 | // Build Layer 2+3 filters WITH since | 670 | // Build Layer 2+3 filters WITH since |
| 667 | let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); | 671 | let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); |
| 668 | drop(confirmed); | 672 | drop(confirmed); |
| 669 | 673 | ||
| 670 | // Subscribe directly - no PendingBatch for catch-up (items already confirmed) | 674 | // Subscribe directly - no PendingBatch for catch-up (items already confirmed) |
| 671 | let conn = match self.connections.get(relay_url) { | 675 | let conn = match self.connections.get(relay_url) { |
| 672 | Some(c) => c, | 676 | Some(c) => c, |
| 673 | None => return, | 677 | None => return, |
| 674 | }; | 678 | }; |
| 675 | 679 | ||
| 676 | for filter in filters { | 680 | for filter in filters { |
| 677 | let _ = conn.client.subscribe(filter, None).await; | 681 | let _ = conn.client.subscribe(filter, None).await; |
| 678 | } | 682 | } |
| 679 | } | 683 | } |
| 680 | 684 | ||
| 681 | /// Rerun compute_actions for a specific relay and process resulting AddFilters. | 685 | /// Rerun compute_actions for a specific relay and process resulting AddFilters. |
| 682 | /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. | 686 | /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. |
| 683 | async fn recompute_actions_for_relay(&mut self, relay_url: &str) { | 687 | async fn recompute_actions_for_relay(&mut self, relay_url: &str) { |
| 684 | let repo_index = self.repo_sync_index.read().await; | 688 | let repo_index = self.repo_sync_index.read().await; |
| 685 | let targets = derive_relay_targets(&repo_index); | 689 | let targets = derive_relay_targets(&repo_index); |
| 686 | drop(repo_index); | 690 | drop(repo_index); |
| 687 | 691 | ||
| 688 | // Filter to just this relay | 692 | // Filter to just this relay |
| 689 | let target = match targets.get(relay_url) { | 693 | let target = match targets.get(relay_url) { |
| 690 | Some(t) => t.clone(), | 694 | Some(t) => t.clone(), |
| 691 | None => return, // No repos reference this relay anymore | 695 | None => return, // No repos reference this relay anymore |
| 692 | }; | 696 | }; |
| 693 | 697 | ||
| 694 | let pending = self.pending_sync_index.read().await; | 698 | let pending = self.pending_sync_index.read().await; |
| 695 | let confirmed = self.relay_sync_index.read().await; | 699 | let confirmed = self.relay_sync_index.read().await; |
| 696 | 700 | ||
| 697 | let mut single_relay_targets = HashMap::new(); | 701 | let mut single_relay_targets = HashMap::new(); |
| 698 | single_relay_targets.insert(relay_url.to_string(), target); | 702 | single_relay_targets.insert(relay_url.to_string(), target); |
| 699 | 703 | ||
| 700 | let actions = compute_actions(&single_relay_targets, &pending, &confirmed); | 704 | let actions = compute_actions(&single_relay_targets, &pending, &confirmed); |
| 701 | 705 | ||
| 702 | drop(pending); | 706 | drop(pending); |
| 703 | drop(confirmed); | 707 | drop(confirmed); |
| 704 | 708 | ||
| 705 | // Process AddFilters | 709 | // Process AddFilters |
| 706 | for action in actions { | 710 | for action in actions { |
| 707 | self.handle_add_filters(action).await; | 711 | self.handle_add_filters(action).await; |
| @@ -719,30 +723,30 @@ impl SyncManager { | |||
| 719 | // Random 23-25 hours | 723 | // Random 23-25 hours |
| 720 | let hours = 23.0 + rand::random::<f64>() * 2.0; | 724 | let hours = 23.0 + rand::random::<f64>() * 2.0; |
| 721 | tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; | 725 | tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; |
| 722 | 726 | ||
| 723 | let relay_urls: Vec<_> = self.relay_sync_index.read().await | 727 | let relay_urls: Vec<_> = self.relay_sync_index.read().await |
| 724 | .keys() | 728 | .keys() |
| 725 | .cloned() | 729 | .cloned() |
| 726 | .collect(); | 730 | .collect(); |
| 727 | 731 | ||
| 728 | for relay_url in relay_urls { | 732 | for relay_url in relay_urls { |
| 729 | self.daily_sync(&relay_url).await; | 733 | self.daily_sync(&relay_url).await; |
| 730 | } | 734 | } |
| 731 | } | 735 | } |
| 732 | } | 736 | } |
| 733 | 737 | ||
| 734 | /// Perform daily fresh sync for a relay | 738 | /// Perform daily fresh sync for a relay |
| 735 | async fn daily_sync(&mut self, relay_url: &str) { | 739 | async fn daily_sync(&mut self, relay_url: &str) { |
| 736 | tracing::info!("Daily sync triggered for {}", relay_url); | 740 | tracing::info!("Daily sync triggered for {}", relay_url); |
| 737 | 741 | ||
| 738 | // Close all subscriptions | 742 | // Close all subscriptions |
| 739 | if let Some(conn) = self.connections.get(relay_url) { | 743 | if let Some(conn) = self.connections.get(relay_url) { |
| 740 | conn.client.unsubscribe_all().await; | 744 | conn.client.unsubscribe_all().await; |
| 741 | } | 745 | } |
| 742 | 746 | ||
| 743 | // Clear PendingSyncIndex | 747 | // Clear PendingSyncIndex |
| 744 | self.pending_sync_index.write().await.remove(relay_url); | 748 | self.pending_sync_index.write().await.remove(relay_url); |
| 745 | 749 | ||
| 746 | // Clear confirmed state - triggers fresh sync | 750 | // Clear confirmed state - triggers fresh sync |
| 747 | { | 751 | { |
| 748 | let mut index = self.relay_sync_index.write().await; | 752 | let mut index = self.relay_sync_index.write().await; |
| @@ -750,7 +754,7 @@ impl SyncManager { | |||
| 750 | state.clear_sync_state(); | 754 | state.clear_sync_state(); |
| 751 | } | 755 | } |
| 752 | } | 756 | } |
| 753 | 757 | ||
| 754 | // Recompute actions - will generate AddFilters for everything | 758 | // Recompute actions - will generate AddFilters for everything |
| 755 | self.recompute_actions_for_relay(relay_url).await; | 759 | self.recompute_actions_for_relay(relay_url).await; |
| 756 | } | 760 | } |
| @@ -767,33 +771,33 @@ impl SyncManager { | |||
| 767 | /// Called from handle_add_filters BEFORE adding new filters. | 771 | /// Called from handle_add_filters BEFORE adding new filters. |
| 768 | async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { | 772 | async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { |
| 769 | let current_count = self.get_filter_count(relay_url).await; | 773 | let current_count = self.get_filter_count(relay_url).await; |
| 770 | 774 | ||
| 771 | if current_count + new_filter_count > 70 { | 775 | if current_count + new_filter_count > 70 { |
| 772 | self.consolidate(relay_url).await; | 776 | self.consolidate(relay_url).await; |
| 773 | } | 777 | } |
| 774 | } | 778 | } |
| 775 | 779 | ||
| 776 | /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. | 780 | /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. |
| 777 | /// Does NOT clear state - just reduces filter count. | 781 | /// Does NOT clear state - just reduces filter count. |
| 778 | async fn consolidate(&mut self, relay_url: &str) { | 782 | async fn consolidate(&mut self, relay_url: &str) { |
| 779 | tracing::info!("Consolidating filters for {} (count > 70)", relay_url); | 783 | tracing::info!("Consolidating filters for {} (count > 70)", relay_url); |
| 780 | 784 | ||
| 781 | // Wait for all pending batches to complete first | 785 | // Wait for all pending batches to complete first |
| 782 | self.wait_pending_complete(relay_url).await; | 786 | self.wait_pending_complete(relay_url).await; |
| 783 | 787 | ||
| 784 | // Close Layer 2+3 subscriptions only - Layer 1 remains active | 788 | // Close Layer 2+3 subscriptions only - Layer 1 remains active |
| 785 | // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately | 789 | // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately |
| 786 | // For simplicity, we close all and re-add Layer 1 | 790 | // For simplicity, we close all and re-add Layer 1 |
| 787 | if let Some(conn) = self.connections.get(relay_url) { | 791 | if let Some(conn) = self.connections.get(relay_url) { |
| 788 | conn.client.unsubscribe_all().await; | 792 | conn.client.unsubscribe_all().await; |
| 789 | } | 793 | } |
| 790 | 794 | ||
| 791 | // Re-subscribe Layer 1 with since (maintains announcements stream) | 795 | // Re-subscribe Layer 1 with since (maintains announcements stream) |
| 792 | let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); | 796 | let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); |
| 793 | let conn = self.connections.get(relay_url).unwrap(); | 797 | let conn = self.connections.get(relay_url).unwrap(); |
| 794 | let layer1 = build_announcement_filter(Some(since)); | 798 | let layer1 = build_announcement_filter(Some(since)); |
| 795 | let _ = conn.client.subscribe(layer1, None).await; | 799 | let _ = conn.client.subscribe(layer1, None).await; |
| 796 | 800 | ||
| 797 | // Rebuild Layer 2+3 only | 801 | // Rebuild Layer 2+3 only |
| 798 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | 802 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; |
| 799 | } | 803 | } |
| @@ -806,7 +810,7 @@ impl SyncManager { | |||
| 806 | impl SyncManager { | 810 | impl SyncManager { |
| 807 | async fn handle_add_filters(&mut self, action: AddFilters) { | 811 | async fn handle_add_filters(&mut self, action: AddFilters) { |
| 808 | let AddFilters { relay_url, repos, root_events, filters } = action; | 812 | let AddFilters { relay_url, repos, root_events, filters } = action; |
| 809 | 813 | ||
| 810 | // Auto-spawn connection if needed (unchanged) | 814 | // Auto-spawn connection if needed (unchanged) |
| 811 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); | 815 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); |
| 812 | match state { | 816 | match state { |
| @@ -834,14 +838,14 @@ impl SyncManager { | |||
| 834 | // Already connected - proceed | 838 | // Already connected - proceed |
| 835 | } | 839 | } |
| 836 | } | 840 | } |
| 837 | 841 | ||
| 838 | // CHECK CONSOLIDATION BEFORE ADDING | 842 | // CHECK CONSOLIDATION BEFORE ADDING |
| 839 | self.maybe_consolidate(&relay_url, filters.len()).await; | 843 | self.maybe_consolidate(&relay_url, filters.len()).await; |
| 840 | 844 | ||
| 841 | // Subscribe and collect subscription IDs | 845 | // Subscribe and collect subscription IDs |
| 842 | let conn = self.connections.get(&relay_url).unwrap(); | 846 | let conn = self.connections.get(&relay_url).unwrap(); |
| 843 | let mut sub_ids = HashSet::new(); | 847 | let mut sub_ids = HashSet::new(); |
| 844 | 848 | ||
| 845 | for filter in filters { | 849 | for filter in filters { |
| 846 | match conn.client.subscribe(filter, None).await { | 850 | match conn.client.subscribe(filter, None).await { |
| 847 | Ok(output) => { | 851 | Ok(output) => { |
| @@ -854,14 +858,14 @@ impl SyncManager { | |||
| 854 | } | 858 | } |
| 855 | } | 859 | } |
| 856 | } | 860 | } |
| 857 | 861 | ||
| 858 | // Create pending batch (unchanged) | 862 | // Create pending batch (unchanged) |
| 859 | let batch = PendingBatch { | 863 | let batch = PendingBatch { |
| 860 | batch_id: self.next_batch_id(), | 864 | batch_id: self.next_batch_id(), |
| 861 | items: PendingItems { repos, root_events }, | 865 | items: PendingItems { repos, root_events }, |
| 862 | outstanding_subs: sub_ids, | 866 | outstanding_subs: sub_ids, |
| 863 | }; | 867 | }; |
| 864 | 868 | ||
| 865 | self.pending_sync_index.write().await | 869 | self.pending_sync_index.write().await |
| 866 | .entry(relay_url) | 870 | .entry(relay_url) |
| 867 | .or_default() | 871 | .or_default() |
| @@ -881,25 +885,25 @@ impl SyncManager { | |||
| 881 | let confirmed = self.relay_sync_index.read().await; | 885 | let confirmed = self.relay_sync_index.read().await; |
| 882 | let relays_to_disconnect: Vec<_> = confirmed.iter() | 886 | let relays_to_disconnect: Vec<_> = confirmed.iter() |
| 883 | .filter(|(_, state)| { | 887 | .filter(|(_, state)| { |
| 884 | !state.is_bootstrap && | 888 | !state.is_bootstrap && |
| 885 | state.repos.is_empty() && | 889 | state.repos.is_empty() && |
| 886 | state.root_events.is_empty() | 890 | state.root_events.is_empty() |
| 887 | }) | 891 | }) |
| 888 | .map(|(url, _)| url.clone()) | 892 | .map(|(url, _)| url.clone()) |
| 889 | .collect(); | 893 | .collect(); |
| 890 | drop(confirmed); | 894 | drop(confirmed); |
| 891 | 895 | ||
| 892 | for relay_url in relays_to_disconnect { | 896 | for relay_url in relays_to_disconnect { |
| 893 | self.disconnect_relay(&relay_url).await; | 897 | self.disconnect_relay(&relay_url).await; |
| 894 | } | 898 | } |
| 895 | } | 899 | } |
| 896 | 900 | ||
| 897 | async fn disconnect_relay(&mut self, relay_url: &str) { | 901 | async fn disconnect_relay(&mut self, relay_url: &str) { |
| 898 | tracing::info!("Disconnecting relay {} (no repos)", relay_url); | 902 | tracing::info!("Disconnecting relay {} (no repos)", relay_url); |
| 899 | 903 | ||
| 900 | self.relay_sync_index.write().await.remove(relay_url); | 904 | self.relay_sync_index.write().await.remove(relay_url); |
| 901 | self.pending_sync_index.write().await.remove(relay_url); | 905 | self.pending_sync_index.write().await.remove(relay_url); |
| 902 | 906 | ||
| 903 | if let Some(conn) = self.connections.remove(relay_url) { | 907 | if let Some(conn) = self.connections.remove(relay_url) { |
| 904 | let _ = conn.client.disconnect().await; | 908 | let _ = conn.client.disconnect().await; |
| 905 | } | 909 | } |
| @@ -917,31 +921,31 @@ flowchart TB | |||
| 917 | SS[SelfSubscriber] | 921 | SS[SelfSubscriber] |
| 918 | OWN[Own Relay] | 922 | OWN[Own Relay] |
| 919 | end | 923 | end |
| 920 | 924 | ||
| 921 | subgraph RepoSyncIndex - What We Want | 925 | subgraph RepoSyncIndex - What We Want |
| 922 | RSI[HashMap: Repo to Relays+Events] | 926 | RSI[HashMap: Repo to Relays+Events] |
| 923 | end | 927 | end |
| 924 | 928 | ||
| 925 | subgraph Derived Target | 929 | subgraph Derived Target |
| 926 | DT[derive_relay_targets fn] | 930 | DT[derive_relay_targets fn] |
| 927 | TGT[Per-relay: repos + events we should sync] | 931 | TGT[Per-relay: repos + events we should sync] |
| 928 | end | 932 | end |
| 929 | 933 | ||
| 930 | subgraph compute_actions - Decision Point | 934 | subgraph compute_actions - Decision Point |
| 931 | CA[Three-way diff: target - pending - confirmed] | 935 | CA[Three-way diff: target - pending - confirmed] |
| 932 | end | 936 | end |
| 933 | 937 | ||
| 934 | subgraph PendingSyncIndex - In Flight | 938 | subgraph PendingSyncIndex - In Flight |
| 935 | PSI[Vec PendingBatch per relay] | 939 | PSI[Vec PendingBatch per relay] |
| 936 | end | 940 | end |
| 937 | 941 | ||
| 938 | subgraph RelaySyncIndex - Confirmed State | 942 | subgraph RelaySyncIndex - Confirmed State |
| 939 | RLI[RelayState per relay] | 943 | RLI[RelayState per relay] |
| 940 | CONN[connection_status] | 944 | CONN[connection_status] |
| 941 | REPOS[repos + root_events] | 945 | REPOS[repos + root_events] |
| 942 | TIMES[last_connected + disconnected_at] | 946 | TIMES[last_connected + disconnected_at] |
| 943 | end | 947 | end |
| 944 | 948 | ||
| 945 | SS -->|subscribe| OWN | 949 | SS -->|subscribe| OWN |
| 946 | OWN -->|events| SS | 950 | OWN -->|events| SS |
| 947 | SS -->|batch fires| RSI | 951 | SS -->|batch fires| RSI |
| @@ -959,10 +963,10 @@ flowchart TB | |||
| 959 | AF -->|spawn if needed| CONN | 963 | AF -->|spawn if needed| CONN |
| 960 | SUB --> PSI | 964 | SUB --> PSI |
| 961 | PSI -->|EOSE| REPOS | 965 | PSI -->|EOSE| REPOS |
| 962 | 966 | ||
| 963 | CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] | 967 | CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] |
| 964 | DISC -->|any reconnect| HC[handle_connect_or_reconnect] | 968 | DISC -->|any reconnect| HC[handle_connect_or_reconnect] |
| 965 | 969 | ||
| 966 | subgraph handle_connect_or_reconnect | 970 | subgraph handle_connect_or_reconnect |
| 967 | HC --> FRESH_CHECK{is_fresh_sync?} | 971 | HC --> FRESH_CHECK{is_fresh_sync?} |
| 968 | FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] | 972 | FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] |
| @@ -977,25 +981,25 @@ flowchart TB | |||
| 977 | 981 | ||
| 978 | ## Key Design Decisions | 982 | ## Key Design Decisions |
| 979 | 983 | ||
| 980 | | Decision | Choice | Rationale | | 984 | | Decision | Choice | Rationale | |
| 981 | |----------|--------|-----------| | 985 | | -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | |
| 982 | | Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | | 986 | | Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | |
| 983 | | Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | | 987 | | Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | |
| 984 | | Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | | 988 | | Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | |
| 985 | | Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | | 989 | | Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | |
| 986 | | Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | | 990 | | Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | |
| 987 | | Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | | 991 | | Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | |
| 988 | | Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | | 992 | | Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | |
| 989 | | compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | | 993 | | compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | |
| 990 | | Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | | 994 | | Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | |
| 991 | | Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | | 995 | | Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | |
| 992 | | Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | | 996 | | Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | |
| 993 | | Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | | 997 | | Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | |
| 994 | | Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | | 998 | | Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | |
| 995 | | 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | | 999 | | 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | |
| 996 | | Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | | 1000 | | Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | |
| 997 | | Connection spawning | Via AddFilters handler | Single path for new relay discovery | | 1001 | | Connection spawning | Via AddFilters handler | Single path for new relay discovery | |
| 998 | | Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | | 1002 | | Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | |
| 999 | 1003 | ||
| 1000 | --- | 1004 | --- |
| 1001 | 1005 | ||
| @@ -1017,16 +1021,16 @@ src/sync/ | |||
| 1017 | 1021 | ||
| 1018 | ## Comparison: v3 vs v4 | 1022 | ## Comparison: v3 vs v4 |
| 1019 | 1023 | ||
| 1020 | | Aspect | v3 | v4 | | 1024 | | Aspect | v3 | v4 | |
| 1021 | |--------|----|----| | 1025 | | ------------------------ | ----------------------------------------- | --------------------------------------------- | |
| 1022 | | Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | | 1026 | | Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | |
| 1023 | | Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | | 1027 | | Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | |
| 1024 | | Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | | 1028 | | Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | |
| 1025 | | Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | | 1029 | | Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | |
| 1026 | | Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | | 1030 | | Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | |
| 1027 | | Since filter application | Applied in handle_reconnect | build_all_filters with optional since | | 1031 | | Since filter application | Applied in handle_reconnect | build_all_filters with optional since | |
| 1028 | | PSI clearing | On disconnect | On disconnect (confirmed) | | 1032 | | PSI clearing | On disconnect | On disconnect (confirmed) | |
| 1029 | | Daily timer | Consolidation-style | Fresh sync (different from consolidation) | | 1033 | | Daily timer | Consolidation-style | Fresh sync (different from consolidation) | |
| 1030 | 1034 | ||
| 1031 | --- | 1035 | --- |
| 1032 | 1036 | ||
| @@ -1065,28 +1069,28 @@ impl SelfSubscriber { | |||
| 1065 | tokio::time::sleep(Duration::from_secs(5)).await; | 1069 | tokio::time::sleep(Duration::from_secs(5)).await; |
| 1066 | continue; | 1070 | continue; |
| 1067 | } | 1071 | } |
| 1068 | 1072 | ||
| 1069 | // Run event loop until disconnection | 1073 | // Run event loop until disconnection |
| 1070 | self.event_loop().await; | 1074 | self.event_loop().await; |
| 1071 | 1075 | ||
| 1072 | // Loop will retry connection | 1076 | // Loop will retry connection |
| 1073 | } | 1077 | } |
| 1074 | } | 1078 | } |
| 1075 | 1079 | ||
| 1076 | async fn connect_and_subscribe(&mut self) -> Result<(), Error> { | 1080 | async fn connect_and_subscribe(&mut self) -> Result<(), Error> { |
| 1077 | let client = Client::new(Keys::generate()); | 1081 | let client = Client::new(Keys::generate()); |
| 1078 | client.add_relay(&self.own_relay_url).await?; | 1082 | client.add_relay(&self.own_relay_url).await?; |
| 1079 | client.connect().await; | 1083 | client.connect().await; |
| 1080 | 1084 | ||
| 1081 | // Build filter - add since only on reconnect | 1085 | // Build filter - add since only on reconnect |
| 1082 | let filter = Filter::new().kinds([ | 1086 | let filter = Filter::new().kinds([ |
| 1083 | Kind::Custom(30617), // Repository announcements | 1087 | Kind::Custom(30617), // Repository announcements |
| 1084 | Kind::GitPatch, // 1617 | 1088 | Kind::GitPatch, // 1617 |
| 1085 | Kind::Custom(1618), // PRs | 1089 | Kind::Custom(1618), // PRs |
| 1086 | Kind::Custom(1619), // PR updates | 1090 | Kind::Custom(1619), // PR updates |
| 1087 | Kind::GitIssue, // 1621 | 1091 | Kind::GitIssue, // 1621 |
| 1088 | ]); | 1092 | ]); |
| 1089 | 1093 | ||
| 1090 | let filter = if let Some(ts) = self.last_connected { | 1094 | let filter = if let Some(ts) = self.last_connected { |
| 1091 | // Reconnection: use since filter | 1095 | // Reconnection: use since filter |
| 1092 | let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer | 1096 | let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer |
| @@ -1095,10 +1099,10 @@ impl SelfSubscriber { | |||
| 1095 | // Initial connect: no since filter - get full history | 1099 | // Initial connect: no since filter - get full history |
| 1096 | filter | 1100 | filter |
| 1097 | }; | 1101 | }; |
| 1098 | 1102 | ||
| 1099 | // Update last_connected AFTER computing since | 1103 | // Update last_connected AFTER computing since |
| 1100 | self.last_connected = Some(Timestamp::now()); | 1104 | self.last_connected = Some(Timestamp::now()); |
| 1101 | 1105 | ||
| 1102 | client.subscribe(filter, None).await?; | 1106 | client.subscribe(filter, None).await?; |
| 1103 | self.client = Some(client); | 1107 | self.client = Some(client); |
| 1104 | Ok(()) | 1108 | Ok(()) |
| @@ -1115,18 +1119,18 @@ impl SelfSubscriber { | |||
| 1115 | let mut pending_events: Vec<Event> = Vec::new(); | 1119 | let mut pending_events: Vec<Event> = Vec::new(); |
| 1116 | let mut batch_timer: Option<Instant> = None; | 1120 | let mut batch_timer: Option<Instant> = None; |
| 1117 | let batch_window = Duration::from_secs(5); | 1121 | let batch_window = Duration::from_secs(5); |
| 1118 | 1122 | ||
| 1119 | loop { | 1123 | loop { |
| 1120 | let timeout = batch_timer | 1124 | let timeout = batch_timer |
| 1121 | .map(|t| batch_window.saturating_sub(t.elapsed())) | 1125 | .map(|t| batch_window.saturating_sub(t.elapsed())) |
| 1122 | .unwrap_or(Duration::from_secs(60)); | 1126 | .unwrap_or(Duration::from_secs(60)); |
| 1123 | 1127 | ||
| 1124 | tokio::select! { | 1128 | tokio::select! { |
| 1125 | notification = client.notifications().recv() => { | 1129 | notification = client.notifications().recv() => { |
| 1126 | match notification { | 1130 | match notification { |
| 1127 | Ok(RelayPoolNotification::Event { event, .. }) => { | 1131 | Ok(RelayPoolNotification::Event { event, .. }) => { |
| 1128 | pending_events.push(*event); | 1132 | pending_events.push(*event); |
| 1129 | 1133 | ||
| 1130 | // Start timer on first event - does NOT reset | 1134 | // Start timer on first event - does NOT reset |
| 1131 | if batch_timer.is_none() { | 1135 | if batch_timer.is_none() { |
| 1132 | batch_timer = Some(Instant::now()); | 1136 | batch_timer = Some(Instant::now()); |
| @@ -1147,7 +1151,7 @@ impl SelfSubscriber { | |||
| 1147 | } | 1151 | } |
| 1148 | } | 1152 | } |
| 1149 | } | 1153 | } |
| 1150 | 1154 | ||
| 1151 | async fn process_batch(&self, events: Vec<Event>) { | 1155 | async fn process_batch(&self, events: Vec<Event>) { |
| 1152 | // 1. Update RepoSyncIndex | 1156 | // 1. Update RepoSyncIndex |
| 1153 | for event in events { | 1157 | for event in events { |
| @@ -1157,26 +1161,26 @@ impl SelfSubscriber { | |||
| 1157 | _ => {} | 1161 | _ => {} |
| 1158 | } | 1162 | } |
| 1159 | } | 1163 | } |
| 1160 | 1164 | ||
| 1161 | // 2. Derive targets and compute actions | 1165 | // 2. Derive targets and compute actions |
| 1162 | let repo_index = self.repo_sync_index.read().await; | 1166 | let repo_index = self.repo_sync_index.read().await; |
| 1163 | let targets = derive_relay_targets(&repo_index); | 1167 | let targets = derive_relay_targets(&repo_index); |
| 1164 | 1168 | ||
| 1165 | let pending = self.pending_sync_index.read().await; | 1169 | let pending = self.pending_sync_index.read().await; |
| 1166 | let confirmed = self.relay_sync_index.read().await; | 1170 | let confirmed = self.relay_sync_index.read().await; |
| 1167 | 1171 | ||
| 1168 | let actions = compute_actions(&targets, &pending, &confirmed); | 1172 | let actions = compute_actions(&targets, &pending, &confirmed); |
| 1169 | 1173 | ||
| 1170 | drop(repo_index); | 1174 | drop(repo_index); |
| 1171 | drop(pending); | 1175 | drop(pending); |
| 1172 | drop(confirmed); | 1176 | drop(confirmed); |
| 1173 | 1177 | ||
| 1174 | // 3. Send actions to SyncManager | 1178 | // 3. Send actions to SyncManager |
| 1175 | for action in actions { | 1179 | for action in actions { |
| 1176 | let _ = self.action_tx.send(action).await; | 1180 | let _ = self.action_tx.send(action).await; |
| 1177 | } | 1181 | } |
| 1178 | } | 1182 | } |
| 1179 | 1183 | ||
| 1180 | async fn handle_announcement(&self, event: &Event) { | 1184 | async fn handle_announcement(&self, event: &Event) { |
| 1181 | // Extract repo_ref from event - 30617:pubkey:identifier | 1185 | // Extract repo_ref from event - 30617:pubkey:identifier |
| 1182 | let d_tag = event.tags.iter() | 1186 | let d_tag = event.tags.iter() |
| @@ -1188,9 +1192,9 @@ impl SelfSubscriber { | |||
| 1188 | } | 1192 | } |
| 1189 | }) | 1193 | }) |
| 1190 | .unwrap_or_default(); | 1194 | .unwrap_or_default(); |
| 1191 | 1195 | ||
| 1192 | let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); | 1196 | let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); |
| 1193 | 1197 | ||
| 1194 | // Extract relay URLs from 'r' tags | 1198 | // Extract relay URLs from 'r' tags |
| 1195 | let relays: HashSet<String> = event.tags.iter() | 1199 | let relays: HashSet<String> = event.tags.iter() |
| 1196 | .filter_map(|tag| { | 1200 | .filter_map(|tag| { |
| @@ -1201,13 +1205,13 @@ impl SelfSubscriber { | |||
| 1201 | } | 1205 | } |
| 1202 | }) | 1206 | }) |
| 1203 | .collect(); | 1207 | .collect(); |
| 1204 | 1208 | ||
| 1205 | // Update RepoSyncIndex | 1209 | // Update RepoSyncIndex |
| 1206 | let mut index = self.repo_sync_index.write().await; | 1210 | let mut index = self.repo_sync_index.write().await; |
| 1207 | let needs = index.entry(repo_ref).or_default(); | 1211 | let needs = index.entry(repo_ref).or_default(); |
| 1208 | needs.relays = relays; | 1212 | needs.relays = relays; |
| 1209 | } | 1213 | } |
| 1210 | 1214 | ||
| 1211 | async fn handle_root_event(&self, event: &Event) { | 1215 | async fn handle_root_event(&self, event: &Event) { |
| 1212 | // Extract repo_ref from 'a' tag | 1216 | // Extract repo_ref from 'a' tag |
| 1213 | let repo_ref = event.tags.iter() | 1217 | let repo_ref = event.tags.iter() |
| @@ -1218,7 +1222,7 @@ impl SelfSubscriber { | |||
| 1218 | None | 1222 | None |
| 1219 | } | 1223 | } |
| 1220 | }); | 1224 | }); |
| 1221 | 1225 | ||
| 1222 | if let Some(repo_ref) = repo_ref { | 1226 | if let Some(repo_ref) = repo_ref { |
| 1223 | let mut index = self.repo_sync_index.write().await; | 1227 | let mut index = self.repo_sync_index.write().await; |
| 1224 | let needs = index.entry(repo_ref).or_default(); | 1228 | let needs = index.entry(repo_ref).or_default(); |
| @@ -1246,6 +1250,7 @@ let sync_manager = Arc::new(Mutex::new(self)); | |||
| 1246 | This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. | 1250 | This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. |
| 1247 | 1251 | ||
| 1248 | **Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: | 1252 | **Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: |
| 1253 | |||
| 1249 | - `DashMap` for thread-safe concurrent access without external locking | 1254 | - `DashMap` for thread-safe concurrent access without external locking |
| 1250 | - Three states: `Healthy`, `Degraded`, `Dead` | 1255 | - Three states: `Healthy`, `Degraded`, `Dead` |
| 1251 | - Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff | 1256 | - Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff |
| @@ -1254,14 +1259,14 @@ This allows background tasks (daily timer, disconnect checker) to acquire the lo | |||
| 1254 | 1259 | ||
| 1255 | ### Implementation Constants | 1260 | ### Implementation Constants |
| 1256 | 1261 | ||
| 1257 | | Constant | Value | Purpose | | 1262 | | Constant | Value | Purpose | |
| 1258 | |----------|-------|---------| | 1263 | | --------------------------------- | ---------- | ------------------------------------------------ | |
| 1259 | | `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | | 1264 | | `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | |
| 1260 | | `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | | 1265 | | `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | |
| 1261 | | `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | | 1266 | | `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | |
| 1262 | | `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | | 1267 | | `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | |
| 1263 | | `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | | 1268 | | `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | |
| 1264 | | `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | | 1269 | | `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | |
| 1265 | 1270 | ||
| 1266 | ### Daily Timer Randomization | 1271 | ### Daily Timer Randomization |
| 1267 | 1272 | ||
| @@ -1308,6 +1313,7 @@ src/sync/ | |||
| 1308 | ``` | 1313 | ``` |
| 1309 | 1314 | ||
| 1310 | Key differences from spec: | 1315 | Key differences from spec: |
| 1316 | |||
| 1311 | - No separate `state.rs` - types are defined in `mod.rs` | 1317 | - No separate `state.rs` - types are defined in `mod.rs` |
| 1312 | - No separate `actions.rs` - moved to `algorithms.rs` | 1318 | - No separate `actions.rs` - moved to `algorithms.rs` |
| 1313 | - No separate `consolidation.rs` - consolidation logic in `mod.rs` | 1319 | - No separate `consolidation.rs` - consolidation logic in `mod.rs` |
| @@ -1321,4 +1327,4 @@ Key differences from spec: | |||
| 1321 | 1327 | ||
| 1322 | 3. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. | 1328 | 3. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. |
| 1323 | 1329 | ||
| 1324 | 4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. \ No newline at end of file | 1330 | 4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. |