upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync-v4.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v4.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v4.md314
1 files changed, 160 insertions, 154 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md
index 5ac92cd..dd508b3 100644
--- a/docs/explanation/grasp-02-proactive-sync-v4.md
+++ b/docs/explanation/grasp-02-proactive-sync-v4.md
@@ -64,8 +64,6 @@ pub struct RelayState {
64 pub last_connected: Option<Timestamp>, 64 pub last_connected: Option<Timestamp>,
65 /// When we disconnected - for 15-minute state retention rule 65 /// When we disconnected - for 15-minute state retention rule
66 pub disconnected_at: Option<Timestamp>, 66 pub disconnected_at: Option<Timestamp>,
67 /// The active connection - None if disconnected
68 pub connection: Option<RelayConnection>,
69} 67}
70 68
71impl RelayState { 69impl RelayState {
@@ -79,7 +77,7 @@ impl RelayState {
79 None => false, // Still connected or never connected 77 None => false, // Still connected or never connected
80 } 78 }
81 } 79 }
82 80
83 /// Clear repos and root_events - called when reconnect takes > 15 minutes 81 /// Clear repos and root_events - called when reconnect takes > 15 minutes
84 pub fn clear_sync_state(&mut self) { 82 pub fn clear_sync_state(&mut self) {
85 self.repos.clear(); 83 self.repos.clear();
@@ -125,7 +123,7 @@ stateDiagram-v2
125 Connecting --> Disconnected: failure + record in health tracker 123 Connecting --> Disconnected: failure + record in health tracker
126 Connected --> Disconnected: connection lost 124 Connected --> Disconnected: connection lost
127 Connected --> [*]: intentional disconnect via check_disconnects 125 Connected --> [*]: intentional disconnect via check_disconnects
128 126
129 note right of Disconnected: disconnected_at set for 15min rule 127 note right of Disconnected: disconnected_at set for 15min rule
130 note right of Connected: last_connected tracked for since filter 128 note right of Connected: last_connected tracked for since filter
131``` 129```
@@ -148,13 +146,13 @@ flowchart TB
148 SPAWN --> |no| CONN[spawn_connection] 146 SPAWN --> |no| CONN[spawn_connection]
149 CONN --> HC[handle_connect_or_reconnect] 147 CONN --> HC[handle_connect_or_reconnect]
150 SPAWN --> |yes| SUB 148 SPAWN --> |yes| SUB
151 149
152 subgraph handle_connect_or_reconnect - Fresh Sync 150 subgraph handle_connect_or_reconnect - Fresh Sync
153 HC --> CHECK_FRESH{is_fresh_sync?} 151 HC --> CHECK_FRESH{is_fresh_sync?}
154 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] 152 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since]
155 L1 --> RCA[recompute_actions_for_relay] 153 L1 --> RCA[recompute_actions_for_relay]
156 end 154 end
157 155
158 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] 156 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters]
159 SUB --> PB[Create PendingBatch] 157 SUB --> PB[Create PendingBatch]
160 PB --> EOSE[Wait for EOSE] 158 PB --> EOSE[Wait for EOSE]
@@ -162,6 +160,7 @@ flowchart TB
162``` 160```
163 161
164**Key points:** 162**Key points:**
163
165- No `since` filter on initial connect - get full history 164- No `since` filter on initial connect - get full history
166- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` 165- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()`
167- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since 166- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
@@ -176,7 +175,7 @@ flowchart TB
176 CLEAR_PEND --> WAIT[Wait for reconnection] 175 CLEAR_PEND --> WAIT[Wait for reconnection]
177 WAIT --> RECONN[Connection restored] 176 WAIT --> RECONN[Connection restored]
178 RECONN --> HC[handle_connect_or_reconnect] 177 RECONN --> HC[handle_connect_or_reconnect]
179 178
180 subgraph handle_connect_or_reconnect - Quick Reconnect 179 subgraph handle_connect_or_reconnect - Quick Reconnect
181 HC --> CHECK{is_fresh_sync?} 180 HC --> CHECK{is_fresh_sync?}
182 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] 181 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min]
@@ -184,7 +183,7 @@ flowchart TB
184 L1 --> L23[rebuild_layer2_and_layer3 - with since] 183 L1 --> L23[rebuild_layer2_and_layer3 - with since]
185 L23 --> RCA[recompute_actions_for_relay] 184 L23 --> RCA[recompute_actions_for_relay]
186 end 185 end
187 186
188 RCA --> AF[AddFilters for new items only] 187 RCA --> AF[AddFilters for new items only]
189 AF --> SUB[Subscribe] 188 AF --> SUB[Subscribe]
190 SUB --> PB[Create PendingBatch] 189 SUB --> PB[Create PendingBatch]
@@ -193,6 +192,7 @@ flowchart TB
193``` 192```
194 193
195**Key points:** 194**Key points:**
195
196- PendingSyncIndex cleared on disconnect (not reconnect) 196- PendingSyncIndex cleared on disconnect (not reconnect)
197- `handle_connect_or_reconnect`: 197- `handle_connect_or_reconnect`:
198 1. `build_announcement_filter(Some(since))` - Layer 1 with since 198 1. `build_announcement_filter(Some(since))` - Layer 1 with since
@@ -205,14 +205,14 @@ flowchart TB
205```mermaid 205```mermaid
206flowchart TB 206flowchart TB
207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect] 207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect]
208 208
209 subgraph handle_connect_or_reconnect - Stale Reconnect 209 subgraph handle_connect_or_reconnect - Stale Reconnect
210 HC --> CHECK{is_fresh_sync?} 210 HC --> CHECK{is_fresh_sync?}
211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] 211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state]
212 CLEAR --> L1[build_announcement_filter - no since] 212 CLEAR --> L1[build_announcement_filter - no since]
213 L1 --> RCA[recompute_actions_for_relay] 213 L1 --> RCA[recompute_actions_for_relay]
214 end 214 end
215 215
216 RCA --> CA[compute_actions with empty confirmed] 216 RCA --> CA[compute_actions with empty confirmed]
217 CA --> AF[AddFilters for everything] 217 CA --> AF[AddFilters for everything]
218 AF --> SUB[Subscribe - no since filter] 218 AF --> SUB[Subscribe - no since filter]
@@ -222,6 +222,7 @@ flowchart TB
222``` 222```
223 223
224**Key points:** 224**Key points:**
225
225- `should_clear_state()` returns true → triggers fresh sync 226- `should_clear_state()` returns true → triggers fresh sync
226- Same path as initial connect after clearing state 227- Same path as initial connect after clearing state
227- Layer 1: `build_announcement_filter(None)` - full history 228- Layer 1: `build_announcement_filter(None)` - full history
@@ -243,6 +244,7 @@ flowchart TB
243``` 244```
244 245
245**Key points:** 246**Key points:**
247
246- Consolidation checked in `handle_add_filters` BEFORE adding new filters 248- Consolidation checked in `handle_add_filters` BEFORE adding new filters
247- After closing all subscriptions, re-subscribe: 249- After closing all subscriptions, re-subscribe:
248 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since 250 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
@@ -268,6 +270,7 @@ flowchart TB
268``` 270```
269 271
270**Key points:** 272**Key points:**
273
271- Daily timer is a full fresh sync, NOT consolidation 274- Daily timer is a full fresh sync, NOT consolidation
272- Clears both PendingSyncIndex and confirmed state 275- Clears both PendingSyncIndex and confirmed state
273- Layer 1: `build_announcement_filter(None)` - full history 276- Layer 1: `build_announcement_filter(None)` - full history
@@ -288,7 +291,7 @@ fn derive_relay_targets(
288 repo_index: &HashMap<String, RepoSyncNeeds> 291 repo_index: &HashMap<String, RepoSyncNeeds>
289) -> HashMap<String, RelaySyncNeeds> { 292) -> HashMap<String, RelaySyncNeeds> {
290 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new(); 293 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
291 294
292 for (repo_ref, needs) in repo_index { 295 for (repo_ref, needs) in repo_index {
293 for relay_url in &needs.relays { 296 for relay_url in &needs.relays {
294 let target = targets.entry(relay_url.clone()).or_default(); 297 let target = targets.entry(relay_url.clone()).or_default();
@@ -296,7 +299,7 @@ fn derive_relay_targets(
296 target.root_events.extend(needs.root_events.iter().cloned()); 299 target.root_events.extend(needs.root_events.iter().cloned());
297 } 300 }
298 } 301 }
299 302
300 targets 303 targets
301} 304}
302``` 305```
@@ -316,7 +319,7 @@ fn compute_actions(
316 confirmed: &HashMap<String, RelayState>, 319 confirmed: &HashMap<String, RelayState>,
317) -> Vec<AddFilters> { 320) -> Vec<AddFilters> {
318 let mut actions = Vec::new(); 321 let mut actions = Vec::new();
319 322
320 for (relay_url, target) in targets { 323 for (relay_url, target) in targets {
321 // Skip disconnected relays - they will get AddFilters on reconnect 324 // Skip disconnected relays - they will get AddFilters on reconnect
322 if let Some(state) = confirmed.get(relay_url) { 325 if let Some(state) = confirmed.get(relay_url) {
@@ -324,7 +327,7 @@ fn compute_actions(
324 continue; 327 continue;
325 } 328 }
326 } 329 }
327 330
328 // Collect all pending items for this relay 331 // Collect all pending items for this relay
329 let pending_repos: HashSet<_> = pending.get(relay_url) 332 let pending_repos: HashSet<_> = pending.get(relay_url)
330 .map(|batches| batches.iter() 333 .map(|batches| batches.iter()
@@ -336,7 +339,7 @@ fn compute_actions(
336 .flat_map(|b| b.items.root_events.iter().cloned()) 339 .flat_map(|b| b.items.root_events.iter().cloned())
337 .collect()) 340 .collect())
338 .unwrap_or_default(); 341 .unwrap_or_default();
339 342
340 // Collect confirmed items for this relay 343 // Collect confirmed items for this relay
341 let confirmed_repos = confirmed.get(relay_url) 344 let confirmed_repos = confirmed.get(relay_url)
342 .map(|c| &c.repos) 345 .map(|c| &c.repos)
@@ -344,7 +347,7 @@ fn compute_actions(
344 let confirmed_events = confirmed.get(relay_url) 347 let confirmed_events = confirmed.get(relay_url)
345 .map(|c| &c.root_events) 348 .map(|c| &c.root_events)
346 .unwrap_or(&HashSet::new()); 349 .unwrap_or(&HashSet::new());
347 350
348 // New = target - pending - confirmed 351 // New = target - pending - confirmed
349 let new_repos: HashSet<_> = target.repos.iter() 352 let new_repos: HashSet<_> = target.repos.iter()
350 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) 353 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
@@ -354,7 +357,7 @@ fn compute_actions(
354 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) 357 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
355 .cloned() 358 .cloned()
356 .collect(); 359 .collect();
357 360
358 if !new_repos.is_empty() || !new_events.is_empty() { 361 if !new_repos.is_empty() || !new_events.is_empty() {
359 let filters = build_filters(&new_repos, &new_events); 362 let filters = build_filters(&new_repos, &new_events);
360 actions.push(AddFilters { 363 actions.push(AddFilters {
@@ -365,7 +368,7 @@ fn compute_actions(
365 }); 368 });
366 } 369 }
367 } 370 }
368 371
369 actions 372 actions
370} 373}
371``` 374```
@@ -373,6 +376,7 @@ fn compute_actions(
373### 3. Filter Building Functions (Three-Layer Strategy) 376### 3. Filter Building Functions (Three-Layer Strategy)
374 377
375The filter strategy uses three layers: 378The filter strategy uses three layers:
379
376- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation 380- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
377- **Layer 2**: Events tagging our repos 381- **Layer 2**: Events tagging our repos
378- **Layer 3**: Events tagging our root events 382- **Layer 3**: Events tagging our root events
@@ -388,7 +392,7 @@ fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
388 Kind::Custom(30617), // Repository announcements 392 Kind::Custom(30617), // Repository announcements
389 Kind::Custom(30618), // Maintainer lists 393 Kind::Custom(30618), // Maintainer lists
390 ]); 394 ]);
391 395
392 match since { 396 match since {
393 Some(ts) => filter.since(ts), 397 Some(ts) => filter.since(ts),
394 None => filter, 398 None => filter,
@@ -404,10 +408,10 @@ fn tagged_one_of_our_repo_event_filters(
404) -> Vec<Filter> { 408) -> Vec<Filter> {
405 let mut filters = Vec::new(); 409 let mut filters = Vec::new();
406 let repo_refs: Vec<_> = repos.iter().collect(); 410 let repo_refs: Vec<_> = repos.iter().collect();
407 411
408 for chunk in repo_refs.chunks(100) { 412 for chunk in repo_refs.chunks(100) {
409 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); 413 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
410 414
411 // Lowercase 'a' tag - standard addressable reference 415 // Lowercase 'a' tag - standard addressable reference
412 let mut f1 = Filter::new() 416 let mut f1 = Filter::new()
413 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); 417 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
@@ -417,18 +421,18 @@ fn tagged_one_of_our_repo_event_filters(
417 // Quote 'q' tag - NIP-10 quote references to addressable events 421 // Quote 'q' tag - NIP-10 quote references to addressable events
418 let mut f3 = Filter::new() 422 let mut f3 = Filter::new()
419 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); 423 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
420 424
421 if let Some(ts) = since { 425 if let Some(ts) = since {
422 f1 = f1.since(ts); 426 f1 = f1.since(ts);
423 f2 = f2.since(ts); 427 f2 = f2.since(ts);
424 f3 = f3.since(ts); 428 f3 = f3.since(ts);
425 } 429 }
426 430
427 filters.push(f1); 431 filters.push(f1);
428 filters.push(f2); 432 filters.push(f2);
429 filters.push(f3); 433 filters.push(f3);
430 } 434 }
431 435
432 filters 436 filters
433} 437}
434 438
@@ -441,10 +445,10 @@ fn tagged_one_of_our_root_event_filters(
441) -> Vec<Filter> { 445) -> Vec<Filter> {
442 let mut filters = Vec::new(); 446 let mut filters = Vec::new();
443 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); 447 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
444 448
445 for chunk in event_ids.chunks(100) { 449 for chunk in event_ids.chunks(100) {
446 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); 450 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
447 451
448 // Lowercase 'e' tag - standard event reference 452 // Lowercase 'e' tag - standard event reference
449 let mut f1 = Filter::new() 453 let mut f1 = Filter::new()
450 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); 454 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
@@ -454,18 +458,18 @@ fn tagged_one_of_our_root_event_filters(
454 // Quote 'q' tag - NIP-10 quote references to events 458 // Quote 'q' tag - NIP-10 quote references to events
455 let mut f3 = Filter::new() 459 let mut f3 = Filter::new()
456 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); 460 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
457 461
458 if let Some(ts) = since { 462 if let Some(ts) = since {
459 f1 = f1.since(ts); 463 f1 = f1.since(ts);
460 f2 = f2.since(ts); 464 f2 = f2.since(ts);
461 f3 = f3.since(ts); 465 f3 = f3.since(ts);
462 } 466 }
463 467
464 filters.push(f1); 468 filters.push(f1);
465 filters.push(f2); 469 filters.push(f2);
466 filters.push(f3); 470 filters.push(f3);
467 } 471 }
468 472
469 filters 473 filters
470} 474}
471 475
@@ -493,7 +497,7 @@ fn build_layer2_and_layer3_filters(
493impl SyncManager { 497impl SyncManager {
494 async fn handle_add_filters(&mut self, action: AddFilters) { 498 async fn handle_add_filters(&mut self, action: AddFilters) {
495 let AddFilters { relay_url, repos, root_events, filters } = action; 499 let AddFilters { relay_url, repos, root_events, filters } = action;
496 500
497 // Auto-spawn connection if needed 501 // Auto-spawn connection if needed
498 let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); 502 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
499 match state { 503 match state {
@@ -522,11 +526,11 @@ impl SyncManager {
522 // Already connected - proceed with subscription 526 // Already connected - proceed with subscription
523 } 527 }
524 } 528 }
525 529
526 // Subscribe and collect subscription IDs 530 // Subscribe and collect subscription IDs
527 let conn = self.connections.get(&relay_url).unwrap(); 531 let conn = self.connections.get(&relay_url).unwrap();
528 let mut sub_ids = HashSet::new(); 532 let mut sub_ids = HashSet::new();
529 533
530 for filter in filters { 534 for filter in filters {
531 match conn.client.subscribe(filter, None).await { 535 match conn.client.subscribe(filter, None).await {
532 Ok(output) => { 536 Ok(output) => {
@@ -539,14 +543,14 @@ impl SyncManager {
539 } 543 }
540 } 544 }
541 } 545 }
542 546
543 // Create pending batch 547 // Create pending batch
544 let batch = PendingBatch { 548 let batch = PendingBatch {
545 batch_id: self.next_batch_id(), 549 batch_id: self.next_batch_id(),
546 items: PendingItems { repos, root_events }, 550 items: PendingItems { repos, root_events },
547 outstanding_subs: sub_ids, 551 outstanding_subs: sub_ids,
548 }; 552 };
549 553
550 // Add to pending index 554 // Add to pending index
551 self.pending_sync_index.write().await 555 self.pending_sync_index.write().await
552 .entry(relay_url) 556 .entry(relay_url)
@@ -563,19 +567,19 @@ impl SyncManager {
563 /// Called when connection to a relay is lost 567 /// Called when connection to a relay is lost
564 async fn handle_disconnect(&mut self, relay_url: &str) { 568 async fn handle_disconnect(&mut self, relay_url: &str) {
565 let mut index = self.relay_sync_index.write().await; 569 let mut index = self.relay_sync_index.write().await;
566 570
567 if let Some(state) = index.get_mut(relay_url) { 571 if let Some(state) = index.get_mut(relay_url) {
568 state.connection_status = ConnectionStatus::Disconnected; 572 state.connection_status = ConnectionStatus::Disconnected;
569 state.disconnected_at = Some(Timestamp::now()); 573 state.disconnected_at = Some(Timestamp::now());
570 state.connection = None; 574 state.connection = None;
571 } 575 }
572 576
573 // Clear pending batches - these items were not confirmed 577 // Clear pending batches - these items were not confirmed
574 self.pending_sync_index.write().await.remove(relay_url); 578 self.pending_sync_index.write().await.remove(relay_url);
575 579
576 // Remove from active connections map 580 // Remove from active connections map
577 self.connections.remove(relay_url); 581 self.connections.remove(relay_url);
578 582
579 // Health tracker records failure for backoff 583 // Health tracker records failure for backoff
580 self.health_tracker.record_failure(relay_url); 584 self.health_tracker.record_failure(relay_url);
581 } 585 }
@@ -599,39 +603,39 @@ impl SyncManager {
599 Some(s) => s, 603 Some(s) => s,
600 None => return, // Relay was removed while disconnected 604 None => return, // Relay was removed while disconnected
601 }; 605 };
602 606
603 // Determine if this is a fresh sync or quick reconnect 607 // Determine if this is a fresh sync or quick reconnect
604 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); 608 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
605 let last_connected = state.last_connected; 609 let last_connected = state.last_connected;
606 610
607 if is_fresh_sync && state.last_connected.is_some() { 611 if is_fresh_sync && state.last_connected.is_some() {
608 // Stale reconnect (>15min) - clear state 612 // Stale reconnect (>15min) - clear state
609 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); 613 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
610 state.clear_sync_state(); 614 state.clear_sync_state();
611 } 615 }
612 616
613 // Update connection state 617 // Update connection state
614 state.connection_status = ConnectionStatus::Connected; 618 state.connection_status = ConnectionStatus::Connected;
615 state.last_connected = Some(Timestamp::now()); 619 state.last_connected = Some(Timestamp::now());
616 state.disconnected_at = None; 620 state.disconnected_at = None;
617 621
618 // Record success in health tracker 622 // Record success in health tracker
619 self.health_tracker.record_success(relay_url); 623 self.health_tracker.record_success(relay_url);
620 624
621 drop(index); // Release lock 625 drop(index); // Release lock
622 626
623 let conn = match self.connections.get(relay_url) { 627 let conn = match self.connections.get(relay_url) {
624 Some(c) => c, 628 Some(c) => c,
625 None => return, 629 None => return,
626 }; 630 };
627 631
628 if is_fresh_sync { 632 if is_fresh_sync {
629 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions 633 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
630 634
631 // Step 1: Subscribe Layer 1 (announcements) without since 635 // Step 1: Subscribe Layer 1 (announcements) without since
632 let layer1 = build_announcement_filter(None); 636 let layer1 = build_announcement_filter(None);
633 let _ = conn.client.subscribe(layer1, None).await; 637 let _ = conn.client.subscribe(layer1, None).await;
634 638
635 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) 639 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build)
636 self.recompute_actions_for_relay(relay_url).await; 640 self.recompute_actions_for_relay(relay_url).await;
637 } else { 641 } else {
@@ -639,19 +643,19 @@ impl SyncManager {
639 let since = last_connected 643 let since = last_connected
640 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) 644 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
641 .unwrap_or(Timestamp::from(0)); 645 .unwrap_or(Timestamp::from(0));
642 646
643 // Step 1: Subscribe Layer 1 (announcements) with since 647 // Step 1: Subscribe Layer 1 (announcements) with since
644 let layer1 = build_announcement_filter(Some(since)); 648 let layer1 = build_announcement_filter(Some(since));
645 let _ = conn.client.subscribe(layer1, None).await; 649 let _ = conn.client.subscribe(layer1, None).await;
646 650
647 // Step 2: Rebuild Layer 2+3 for confirmed items with since 651 // Step 2: Rebuild Layer 2+3 for confirmed items with since
648 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; 652 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
649 653
650 // Step 3: Check for NEW items via compute_actions 654 // Step 3: Check for NEW items via compute_actions
651 self.recompute_actions_for_relay(relay_url).await; 655 self.recompute_actions_for_relay(relay_url).await;
652 } 656 }
653 } 657 }
654 658
655 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). 659 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1).
656 /// Used by: 660 /// Used by:
657 /// - Quick reconnect: rebuild confirmed items with since filter 661 /// - Quick reconnect: rebuild confirmed items with since filter
@@ -662,46 +666,46 @@ impl SyncManager {
662 Some(s) => s, 666 Some(s) => s,
663 None => return, 667 None => return,
664 }; 668 };
665 669
666 // Build Layer 2+3 filters WITH since 670 // Build Layer 2+3 filters WITH since
667 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); 671 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
668 drop(confirmed); 672 drop(confirmed);
669 673
670 // Subscribe directly - no PendingBatch for catch-up (items already confirmed) 674 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
671 let conn = match self.connections.get(relay_url) { 675 let conn = match self.connections.get(relay_url) {
672 Some(c) => c, 676 Some(c) => c,
673 None => return, 677 None => return,
674 }; 678 };
675 679
676 for filter in filters { 680 for filter in filters {
677 let _ = conn.client.subscribe(filter, None).await; 681 let _ = conn.client.subscribe(filter, None).await;
678 } 682 }
679 } 683 }
680 684
681 /// Rerun compute_actions for a specific relay and process resulting AddFilters. 685 /// Rerun compute_actions for a specific relay and process resulting AddFilters.
682 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. 686 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
683 async fn recompute_actions_for_relay(&mut self, relay_url: &str) { 687 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
684 let repo_index = self.repo_sync_index.read().await; 688 let repo_index = self.repo_sync_index.read().await;
685 let targets = derive_relay_targets(&repo_index); 689 let targets = derive_relay_targets(&repo_index);
686 drop(repo_index); 690 drop(repo_index);
687 691
688 // Filter to just this relay 692 // Filter to just this relay
689 let target = match targets.get(relay_url) { 693 let target = match targets.get(relay_url) {
690 Some(t) => t.clone(), 694 Some(t) => t.clone(),
691 None => return, // No repos reference this relay anymore 695 None => return, // No repos reference this relay anymore
692 }; 696 };
693 697
694 let pending = self.pending_sync_index.read().await; 698 let pending = self.pending_sync_index.read().await;
695 let confirmed = self.relay_sync_index.read().await; 699 let confirmed = self.relay_sync_index.read().await;
696 700
697 let mut single_relay_targets = HashMap::new(); 701 let mut single_relay_targets = HashMap::new();
698 single_relay_targets.insert(relay_url.to_string(), target); 702 single_relay_targets.insert(relay_url.to_string(), target);
699 703
700 let actions = compute_actions(&single_relay_targets, &pending, &confirmed); 704 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
701 705
702 drop(pending); 706 drop(pending);
703 drop(confirmed); 707 drop(confirmed);
704 708
705 // Process AddFilters 709 // Process AddFilters
706 for action in actions { 710 for action in actions {
707 self.handle_add_filters(action).await; 711 self.handle_add_filters(action).await;
@@ -719,30 +723,30 @@ impl SyncManager {
719 // Random 23-25 hours 723 // Random 23-25 hours
720 let hours = 23.0 + rand::random::<f64>() * 2.0; 724 let hours = 23.0 + rand::random::<f64>() * 2.0;
721 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; 725 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
722 726
723 let relay_urls: Vec<_> = self.relay_sync_index.read().await 727 let relay_urls: Vec<_> = self.relay_sync_index.read().await
724 .keys() 728 .keys()
725 .cloned() 729 .cloned()
726 .collect(); 730 .collect();
727 731
728 for relay_url in relay_urls { 732 for relay_url in relay_urls {
729 self.daily_sync(&relay_url).await; 733 self.daily_sync(&relay_url).await;
730 } 734 }
731 } 735 }
732 } 736 }
733 737
734 /// Perform daily fresh sync for a relay 738 /// Perform daily fresh sync for a relay
735 async fn daily_sync(&mut self, relay_url: &str) { 739 async fn daily_sync(&mut self, relay_url: &str) {
736 tracing::info!("Daily sync triggered for {}", relay_url); 740 tracing::info!("Daily sync triggered for {}", relay_url);
737 741
738 // Close all subscriptions 742 // Close all subscriptions
739 if let Some(conn) = self.connections.get(relay_url) { 743 if let Some(conn) = self.connections.get(relay_url) {
740 conn.client.unsubscribe_all().await; 744 conn.client.unsubscribe_all().await;
741 } 745 }
742 746
743 // Clear PendingSyncIndex 747 // Clear PendingSyncIndex
744 self.pending_sync_index.write().await.remove(relay_url); 748 self.pending_sync_index.write().await.remove(relay_url);
745 749
746 // Clear confirmed state - triggers fresh sync 750 // Clear confirmed state - triggers fresh sync
747 { 751 {
748 let mut index = self.relay_sync_index.write().await; 752 let mut index = self.relay_sync_index.write().await;
@@ -750,7 +754,7 @@ impl SyncManager {
750 state.clear_sync_state(); 754 state.clear_sync_state();
751 } 755 }
752 } 756 }
753 757
754 // Recompute actions - will generate AddFilters for everything 758 // Recompute actions - will generate AddFilters for everything
755 self.recompute_actions_for_relay(relay_url).await; 759 self.recompute_actions_for_relay(relay_url).await;
756 } 760 }
@@ -767,33 +771,33 @@ impl SyncManager {
767 /// Called from handle_add_filters BEFORE adding new filters. 771 /// Called from handle_add_filters BEFORE adding new filters.
768 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { 772 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
769 let current_count = self.get_filter_count(relay_url).await; 773 let current_count = self.get_filter_count(relay_url).await;
770 774
771 if current_count + new_filter_count > 70 { 775 if current_count + new_filter_count > 70 {
772 self.consolidate(relay_url).await; 776 self.consolidate(relay_url).await;
773 } 777 }
774 } 778 }
775 779
776 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. 780 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active.
777 /// Does NOT clear state - just reduces filter count. 781 /// Does NOT clear state - just reduces filter count.
778 async fn consolidate(&mut self, relay_url: &str) { 782 async fn consolidate(&mut self, relay_url: &str) {
779 tracing::info!("Consolidating filters for {} (count > 70)", relay_url); 783 tracing::info!("Consolidating filters for {} (count > 70)", relay_url);
780 784
781 // Wait for all pending batches to complete first 785 // Wait for all pending batches to complete first
782 self.wait_pending_complete(relay_url).await; 786 self.wait_pending_complete(relay_url).await;
783 787
784 // Close Layer 2+3 subscriptions only - Layer 1 remains active 788 // Close Layer 2+3 subscriptions only - Layer 1 remains active
785 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately 789 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately
786 // For simplicity, we close all and re-add Layer 1 790 // For simplicity, we close all and re-add Layer 1
787 if let Some(conn) = self.connections.get(relay_url) { 791 if let Some(conn) = self.connections.get(relay_url) {
788 conn.client.unsubscribe_all().await; 792 conn.client.unsubscribe_all().await;
789 } 793 }
790 794
791 // Re-subscribe Layer 1 with since (maintains announcements stream) 795 // Re-subscribe Layer 1 with since (maintains announcements stream)
792 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); 796 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
793 let conn = self.connections.get(relay_url).unwrap(); 797 let conn = self.connections.get(relay_url).unwrap();
794 let layer1 = build_announcement_filter(Some(since)); 798 let layer1 = build_announcement_filter(Some(since));
795 let _ = conn.client.subscribe(layer1, None).await; 799 let _ = conn.client.subscribe(layer1, None).await;
796 800
797 // Rebuild Layer 2+3 only 801 // Rebuild Layer 2+3 only
798 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; 802 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
799 } 803 }
@@ -806,7 +810,7 @@ impl SyncManager {
806impl SyncManager { 810impl SyncManager {
807 async fn handle_add_filters(&mut self, action: AddFilters) { 811 async fn handle_add_filters(&mut self, action: AddFilters) {
808 let AddFilters { relay_url, repos, root_events, filters } = action; 812 let AddFilters { relay_url, repos, root_events, filters } = action;
809 813
810 // Auto-spawn connection if needed (unchanged) 814 // Auto-spawn connection if needed (unchanged)
811 let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); 815 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
812 match state { 816 match state {
@@ -834,14 +838,14 @@ impl SyncManager {
834 // Already connected - proceed 838 // Already connected - proceed
835 } 839 }
836 } 840 }
837 841
838 // CHECK CONSOLIDATION BEFORE ADDING 842 // CHECK CONSOLIDATION BEFORE ADDING
839 self.maybe_consolidate(&relay_url, filters.len()).await; 843 self.maybe_consolidate(&relay_url, filters.len()).await;
840 844
841 // Subscribe and collect subscription IDs 845 // Subscribe and collect subscription IDs
842 let conn = self.connections.get(&relay_url).unwrap(); 846 let conn = self.connections.get(&relay_url).unwrap();
843 let mut sub_ids = HashSet::new(); 847 let mut sub_ids = HashSet::new();
844 848
845 for filter in filters { 849 for filter in filters {
846 match conn.client.subscribe(filter, None).await { 850 match conn.client.subscribe(filter, None).await {
847 Ok(output) => { 851 Ok(output) => {
@@ -854,14 +858,14 @@ impl SyncManager {
854 } 858 }
855 } 859 }
856 } 860 }
857 861
858 // Create pending batch (unchanged) 862 // Create pending batch (unchanged)
859 let batch = PendingBatch { 863 let batch = PendingBatch {
860 batch_id: self.next_batch_id(), 864 batch_id: self.next_batch_id(),
861 items: PendingItems { repos, root_events }, 865 items: PendingItems { repos, root_events },
862 outstanding_subs: sub_ids, 866 outstanding_subs: sub_ids,
863 }; 867 };
864 868
865 self.pending_sync_index.write().await 869 self.pending_sync_index.write().await
866 .entry(relay_url) 870 .entry(relay_url)
867 .or_default() 871 .or_default()
@@ -881,25 +885,25 @@ impl SyncManager {
881 let confirmed = self.relay_sync_index.read().await; 885 let confirmed = self.relay_sync_index.read().await;
882 let relays_to_disconnect: Vec<_> = confirmed.iter() 886 let relays_to_disconnect: Vec<_> = confirmed.iter()
883 .filter(|(_, state)| { 887 .filter(|(_, state)| {
884 !state.is_bootstrap && 888 !state.is_bootstrap &&
885 state.repos.is_empty() && 889 state.repos.is_empty() &&
886 state.root_events.is_empty() 890 state.root_events.is_empty()
887 }) 891 })
888 .map(|(url, _)| url.clone()) 892 .map(|(url, _)| url.clone())
889 .collect(); 893 .collect();
890 drop(confirmed); 894 drop(confirmed);
891 895
892 for relay_url in relays_to_disconnect { 896 for relay_url in relays_to_disconnect {
893 self.disconnect_relay(&relay_url).await; 897 self.disconnect_relay(&relay_url).await;
894 } 898 }
895 } 899 }
896 900
897 async fn disconnect_relay(&mut self, relay_url: &str) { 901 async fn disconnect_relay(&mut self, relay_url: &str) {
898 tracing::info!("Disconnecting relay {} (no repos)", relay_url); 902 tracing::info!("Disconnecting relay {} (no repos)", relay_url);
899 903
900 self.relay_sync_index.write().await.remove(relay_url); 904 self.relay_sync_index.write().await.remove(relay_url);
901 self.pending_sync_index.write().await.remove(relay_url); 905 self.pending_sync_index.write().await.remove(relay_url);
902 906
903 if let Some(conn) = self.connections.remove(relay_url) { 907 if let Some(conn) = self.connections.remove(relay_url) {
904 let _ = conn.client.disconnect().await; 908 let _ = conn.client.disconnect().await;
905 } 909 }
@@ -917,31 +921,31 @@ flowchart TB
917 SS[SelfSubscriber] 921 SS[SelfSubscriber]
918 OWN[Own Relay] 922 OWN[Own Relay]
919 end 923 end
920 924
921 subgraph RepoSyncIndex - What We Want 925 subgraph RepoSyncIndex - What We Want
922 RSI[HashMap: Repo to Relays+Events] 926 RSI[HashMap: Repo to Relays+Events]
923 end 927 end
924 928
925 subgraph Derived Target 929 subgraph Derived Target
926 DT[derive_relay_targets fn] 930 DT[derive_relay_targets fn]
927 TGT[Per-relay: repos + events we should sync] 931 TGT[Per-relay: repos + events we should sync]
928 end 932 end
929 933
930 subgraph compute_actions - Decision Point 934 subgraph compute_actions - Decision Point
931 CA[Three-way diff: target - pending - confirmed] 935 CA[Three-way diff: target - pending - confirmed]
932 end 936 end
933 937
934 subgraph PendingSyncIndex - In Flight 938 subgraph PendingSyncIndex - In Flight
935 PSI[Vec PendingBatch per relay] 939 PSI[Vec PendingBatch per relay]
936 end 940 end
937 941
938 subgraph RelaySyncIndex - Confirmed State 942 subgraph RelaySyncIndex - Confirmed State
939 RLI[RelayState per relay] 943 RLI[RelayState per relay]
940 CONN[connection_status] 944 CONN[connection_status]
941 REPOS[repos + root_events] 945 REPOS[repos + root_events]
942 TIMES[last_connected + disconnected_at] 946 TIMES[last_connected + disconnected_at]
943 end 947 end
944 948
945 SS -->|subscribe| OWN 949 SS -->|subscribe| OWN
946 OWN -->|events| SS 950 OWN -->|events| SS
947 SS -->|batch fires| RSI 951 SS -->|batch fires| RSI
@@ -959,10 +963,10 @@ flowchart TB
959 AF -->|spawn if needed| CONN 963 AF -->|spawn if needed| CONN
960 SUB --> PSI 964 SUB --> PSI
961 PSI -->|EOSE| REPOS 965 PSI -->|EOSE| REPOS
962 966
963 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] 967 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
964 DISC -->|any reconnect| HC[handle_connect_or_reconnect] 968 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
965 969
966 subgraph handle_connect_or_reconnect 970 subgraph handle_connect_or_reconnect
967 HC --> FRESH_CHECK{is_fresh_sync?} 971 HC --> FRESH_CHECK{is_fresh_sync?}
968 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] 972 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
@@ -977,25 +981,25 @@ flowchart TB
977 981
978## Key Design Decisions 982## Key Design Decisions
979 983
980| Decision | Choice | Rationale | 984| Decision | Choice | Rationale |
981|----------|--------|-----------| 985| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- |
982| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | 986| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
983| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | 987| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
984| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | 988| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
985| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | 989| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
986| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | 990| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
987| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | 991| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events |
988| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | 992| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
989| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | 993| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
990| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | 994| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
991| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | 995| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
992| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | 996| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream |
993| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | 997| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect |
994| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | 998| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
995| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | 999| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
996| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | 1000| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
997| Connection spawning | Via AddFilters handler | Single path for new relay discovery | 1001| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
998| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | 1002| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
999 1003
1000--- 1004---
1001 1005
@@ -1017,16 +1021,16 @@ src/sync/
1017 1021
1018## Comparison: v3 vs v4 1022## Comparison: v3 vs v4
1019 1023
1020| Aspect | v3 | v4 | 1024| Aspect | v3 | v4 |
1021|--------|----|----| 1025| ------------------------ | ----------------------------------------- | --------------------------------------------- |
1022| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | 1026| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1023| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | 1027| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1024| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | 1028| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1025| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | 1029| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1026| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | 1030| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1027| Since filter application | Applied in handle_reconnect | build_all_filters with optional since | 1031| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1028| PSI clearing | On disconnect | On disconnect (confirmed) | 1032| PSI clearing | On disconnect | On disconnect (confirmed) |
1029| Daily timer | Consolidation-style | Fresh sync (different from consolidation) | 1033| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
1030 1034
1031--- 1035---
1032 1036
@@ -1065,28 +1069,28 @@ impl SelfSubscriber {
1065 tokio::time::sleep(Duration::from_secs(5)).await; 1069 tokio::time::sleep(Duration::from_secs(5)).await;
1066 continue; 1070 continue;
1067 } 1071 }
1068 1072
1069 // Run event loop until disconnection 1073 // Run event loop until disconnection
1070 self.event_loop().await; 1074 self.event_loop().await;
1071 1075
1072 // Loop will retry connection 1076 // Loop will retry connection
1073 } 1077 }
1074 } 1078 }
1075 1079
1076 async fn connect_and_subscribe(&mut self) -> Result<(), Error> { 1080 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
1077 let client = Client::new(Keys::generate()); 1081 let client = Client::new(Keys::generate());
1078 client.add_relay(&self.own_relay_url).await?; 1082 client.add_relay(&self.own_relay_url).await?;
1079 client.connect().await; 1083 client.connect().await;
1080 1084
1081 // Build filter - add since only on reconnect 1085 // Build filter - add since only on reconnect
1082 let filter = Filter::new().kinds([ 1086 let filter = Filter::new().kinds([
1083 Kind::Custom(30617), // Repository announcements 1087 Kind::Custom(30617), // Repository announcements
1084 Kind::GitPatch, // 1617 1088 Kind::GitPatch, // 1617
1085 Kind::Custom(1618), // PRs 1089 Kind::Custom(1618), // PRs
1086 Kind::Custom(1619), // PR updates 1090 Kind::Custom(1619), // PR updates
1087 Kind::GitIssue, // 1621 1091 Kind::GitIssue, // 1621
1088 ]); 1092 ]);
1089 1093
1090 let filter = if let Some(ts) = self.last_connected { 1094 let filter = if let Some(ts) = self.last_connected {
1091 // Reconnection: use since filter 1095 // Reconnection: use since filter
1092 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer 1096 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
@@ -1095,10 +1099,10 @@ impl SelfSubscriber {
1095 // Initial connect: no since filter - get full history 1099 // Initial connect: no since filter - get full history
1096 filter 1100 filter
1097 }; 1101 };
1098 1102
1099 // Update last_connected AFTER computing since 1103 // Update last_connected AFTER computing since
1100 self.last_connected = Some(Timestamp::now()); 1104 self.last_connected = Some(Timestamp::now());
1101 1105
1102 client.subscribe(filter, None).await?; 1106 client.subscribe(filter, None).await?;
1103 self.client = Some(client); 1107 self.client = Some(client);
1104 Ok(()) 1108 Ok(())
@@ -1115,18 +1119,18 @@ impl SelfSubscriber {
1115 let mut pending_events: Vec<Event> = Vec::new(); 1119 let mut pending_events: Vec<Event> = Vec::new();
1116 let mut batch_timer: Option<Instant> = None; 1120 let mut batch_timer: Option<Instant> = None;
1117 let batch_window = Duration::from_secs(5); 1121 let batch_window = Duration::from_secs(5);
1118 1122
1119 loop { 1123 loop {
1120 let timeout = batch_timer 1124 let timeout = batch_timer
1121 .map(|t| batch_window.saturating_sub(t.elapsed())) 1125 .map(|t| batch_window.saturating_sub(t.elapsed()))
1122 .unwrap_or(Duration::from_secs(60)); 1126 .unwrap_or(Duration::from_secs(60));
1123 1127
1124 tokio::select! { 1128 tokio::select! {
1125 notification = client.notifications().recv() => { 1129 notification = client.notifications().recv() => {
1126 match notification { 1130 match notification {
1127 Ok(RelayPoolNotification::Event { event, .. }) => { 1131 Ok(RelayPoolNotification::Event { event, .. }) => {
1128 pending_events.push(*event); 1132 pending_events.push(*event);
1129 1133
1130 // Start timer on first event - does NOT reset 1134 // Start timer on first event - does NOT reset
1131 if batch_timer.is_none() { 1135 if batch_timer.is_none() {
1132 batch_timer = Some(Instant::now()); 1136 batch_timer = Some(Instant::now());
@@ -1147,7 +1151,7 @@ impl SelfSubscriber {
1147 } 1151 }
1148 } 1152 }
1149 } 1153 }
1150 1154
1151 async fn process_batch(&self, events: Vec<Event>) { 1155 async fn process_batch(&self, events: Vec<Event>) {
1152 // 1. Update RepoSyncIndex 1156 // 1. Update RepoSyncIndex
1153 for event in events { 1157 for event in events {
@@ -1157,26 +1161,26 @@ impl SelfSubscriber {
1157 _ => {} 1161 _ => {}
1158 } 1162 }
1159 } 1163 }
1160 1164
1161 // 2. Derive targets and compute actions 1165 // 2. Derive targets and compute actions
1162 let repo_index = self.repo_sync_index.read().await; 1166 let repo_index = self.repo_sync_index.read().await;
1163 let targets = derive_relay_targets(&repo_index); 1167 let targets = derive_relay_targets(&repo_index);
1164 1168
1165 let pending = self.pending_sync_index.read().await; 1169 let pending = self.pending_sync_index.read().await;
1166 let confirmed = self.relay_sync_index.read().await; 1170 let confirmed = self.relay_sync_index.read().await;
1167 1171
1168 let actions = compute_actions(&targets, &pending, &confirmed); 1172 let actions = compute_actions(&targets, &pending, &confirmed);
1169 1173
1170 drop(repo_index); 1174 drop(repo_index);
1171 drop(pending); 1175 drop(pending);
1172 drop(confirmed); 1176 drop(confirmed);
1173 1177
1174 // 3. Send actions to SyncManager 1178 // 3. Send actions to SyncManager
1175 for action in actions { 1179 for action in actions {
1176 let _ = self.action_tx.send(action).await; 1180 let _ = self.action_tx.send(action).await;
1177 } 1181 }
1178 } 1182 }
1179 1183
1180 async fn handle_announcement(&self, event: &Event) { 1184 async fn handle_announcement(&self, event: &Event) {
1181 // Extract repo_ref from event - 30617:pubkey:identifier 1185 // Extract repo_ref from event - 30617:pubkey:identifier
1182 let d_tag = event.tags.iter() 1186 let d_tag = event.tags.iter()
@@ -1188,9 +1192,9 @@ impl SelfSubscriber {
1188 } 1192 }
1189 }) 1193 })
1190 .unwrap_or_default(); 1194 .unwrap_or_default();
1191 1195
1192 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); 1196 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1193 1197
1194 // Extract relay URLs from 'r' tags 1198 // Extract relay URLs from 'r' tags
1195 let relays: HashSet<String> = event.tags.iter() 1199 let relays: HashSet<String> = event.tags.iter()
1196 .filter_map(|tag| { 1200 .filter_map(|tag| {
@@ -1201,13 +1205,13 @@ impl SelfSubscriber {
1201 } 1205 }
1202 }) 1206 })
1203 .collect(); 1207 .collect();
1204 1208
1205 // Update RepoSyncIndex 1209 // Update RepoSyncIndex
1206 let mut index = self.repo_sync_index.write().await; 1210 let mut index = self.repo_sync_index.write().await;
1207 let needs = index.entry(repo_ref).or_default(); 1211 let needs = index.entry(repo_ref).or_default();
1208 needs.relays = relays; 1212 needs.relays = relays;
1209 } 1213 }
1210 1214
1211 async fn handle_root_event(&self, event: &Event) { 1215 async fn handle_root_event(&self, event: &Event) {
1212 // Extract repo_ref from 'a' tag 1216 // Extract repo_ref from 'a' tag
1213 let repo_ref = event.tags.iter() 1217 let repo_ref = event.tags.iter()
@@ -1218,7 +1222,7 @@ impl SelfSubscriber {
1218 None 1222 None
1219 } 1223 }
1220 }); 1224 });
1221 1225
1222 if let Some(repo_ref) = repo_ref { 1226 if let Some(repo_ref) = repo_ref {
1223 let mut index = self.repo_sync_index.write().await; 1227 let mut index = self.repo_sync_index.write().await;
1224 let needs = index.entry(repo_ref).or_default(); 1228 let needs = index.entry(repo_ref).or_default();
@@ -1246,6 +1250,7 @@ let sync_manager = Arc::new(Mutex::new(self));
1246This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. 1250This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber.
1247 1251
1248**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: 1252**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses:
1253
1249- `DashMap` for thread-safe concurrent access without external locking 1254- `DashMap` for thread-safe concurrent access without external locking
1250- Three states: `Healthy`, `Degraded`, `Dead` 1255- Three states: `Healthy`, `Degraded`, `Dead`
1251- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff 1256- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff
@@ -1254,14 +1259,14 @@ This allows background tasks (daily timer, disconnect checker) to acquire the lo
1254 1259
1255### Implementation Constants 1260### Implementation Constants
1256 1261
1257| Constant | Value | Purpose | 1262| Constant | Value | Purpose |
1258|----------|-------|---------| 1263| --------------------------------- | ---------- | ------------------------------------------------ |
1259| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | 1264| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation |
1260| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | 1265| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation |
1261| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | 1266| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync |
1262| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | 1267| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect |
1263| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | 1268| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead |
1264| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | 1269| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff |
1265 1270
1266### Daily Timer Randomization 1271### Daily Timer Randomization
1267 1272
@@ -1308,6 +1313,7 @@ src/sync/
1308``` 1313```
1309 1314
1310Key differences from spec: 1315Key differences from spec:
1316
1311- No separate `state.rs` - types are defined in `mod.rs` 1317- No separate `state.rs` - types are defined in `mod.rs`
1312- No separate `actions.rs` - moved to `algorithms.rs` 1318- No separate `actions.rs` - moved to `algorithms.rs`
1313- No separate `consolidation.rs` - consolidation logic in `mod.rs` 1319- No separate `consolidation.rs` - consolidation logic in `mod.rs`
@@ -1321,4 +1327,4 @@ Key differences from spec:
1321 1327
13223. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. 13283. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches.
1323 1329
13244. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. \ No newline at end of file 13304. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API.