upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync.md
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:45:56 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:51:42 +0000
commit30a9acbd35bd4d4fd10fef725314046c65a76fae (patch)
tree32f3fa50eb2f20bf88a07f490d31c86481f28725 /docs/explanation/grasp-02-proactive-sync.md
parent18ad93f8d0b8ce172c9c227385a21af66a507950 (diff)
docs: simplify grasp-02 doc
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync.md1048
1 files changed, 124 insertions, 924 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md
index dd508b3..d6665cc 100644
--- a/docs/explanation/grasp-02-proactive-sync.md
+++ b/docs/explanation/grasp-02-proactive-sync.md
@@ -1,8 +1,8 @@
1# GRASP-02: Proactive Sync v4 - Health & Reconnection Design 1# GRASP-02: Proactive Sync - Design & Implementation
2 2
3## Overview 3## Overview
4 4
5This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles: 5This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles:
6 6
71. **Self-subscription as the only mechanism** - No database initialization at startup 71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create 82. **compute_actions as single decision point** - Determines what NEW subscriptions to create
@@ -72,7 +72,7 @@ impl RelayState {
72 match self.disconnected_at { 72 match self.disconnected_at {
73 Some(disconnected) => { 73 Some(disconnected) => {
74 let now = Timestamp::now(); 74 let now = Timestamp::now();
75 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes 75 now.as_secs().saturating_sub(disconnected.as_secs()) > 900 // 15 minutes
76 } 76 }
77 None => false, // Still connected or never connected 77 None => false, // Still connected or never connected
78 } 78 }
@@ -132,7 +132,7 @@ stateDiagram-v2
132 132
133## Flow Scenarios 133## Flow Scenarios
134 134
135### Scenario 1: Initial Connect via handle_connect_or_reconnect 135### Scenario 1: Initial Connect
136 136
137```mermaid 137```mermaid
138flowchart TB 138flowchart TB
@@ -166,7 +166,7 @@ flowchart TB
166- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since 166- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
167- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking 167- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking
168 168
169### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes 169### Scenario 2: Quick Reconnect (less than 15 minutes)
170 170
171```mermaid 171```mermaid
172flowchart TB 172flowchart TB
@@ -178,7 +178,7 @@ flowchart TB
178 178
179 subgraph handle_connect_or_reconnect - Quick Reconnect 179 subgraph handle_connect_or_reconnect - Quick Reconnect
180 HC --> CHECK{is_fresh_sync?} 180 HC --> CHECK{is_fresh_sync?}
181 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] 181 CHECK --> |no - last_connected exists AND less than 15min| SINCE[since = last_connected - 15min]
182 SINCE --> L1[build_announcement_filter - with since] 182 SINCE --> L1[build_announcement_filter - with since]
183 L1 --> L23[rebuild_layer2_and_layer3 - with since] 183 L1 --> L23[rebuild_layer2_and_layer3 - with since]
184 L23 --> RCA[recompute_actions_for_relay] 184 L23 --> RCA[recompute_actions_for_relay]
@@ -200,7 +200,7 @@ flowchart TB
200 3. `recompute_actions_for_relay` - check for new items 200 3. `recompute_actions_for_relay` - check for new items
201- since = last_connected - 15min ensures we catch events during disconnection 201- since = last_connected - 15min ensures we catch events during disconnection
202 202
203### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes 203### Scenario 3: Stale Reconnect (greater than 15 minutes)
204 204
205```mermaid 205```mermaid
206flowchart TB 206flowchart TB
@@ -208,7 +208,7 @@ flowchart TB
208 208
209 subgraph handle_connect_or_reconnect - Stale Reconnect 209 subgraph handle_connect_or_reconnect - Stale Reconnect
210 HC --> CHECK{is_fresh_sync?} 210 HC --> CHECK{is_fresh_sync?}
211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] 211 CHECK --> |yes - disconnected greater than 15min| CLEAR[clear_sync_state]
212 CLEAR --> L1[build_announcement_filter - no since] 212 CLEAR --> L1[build_announcement_filter - no since]
213 L1 --> RCA[recompute_actions_for_relay] 213 L1 --> RCA[recompute_actions_for_relay]
214 end 214 end
@@ -228,7 +228,7 @@ flowchart TB
228- Layer 1: `build_announcement_filter(None)` - full history 228- Layer 1: `build_announcement_filter(None)` - full history
229- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything 229- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything
230 230
231### Scenario 4: Consolidation - Triggered on Filter Add 231### Scenario 4: Consolidation (Triggered on Filter Add)
232 232
233```mermaid 233```mermaid
234flowchart TB 234flowchart TB
@@ -252,7 +252,7 @@ flowchart TB
252- `since = now - 15min` prevents re-fetching old events 252- `since = now - 15min` prevents re-fetching old events
253- Keeps confirmed state, just reduces filter count 253- Keeps confirmed state, just reduces filter count
254 254
255### Scenario 5: Daily Timer - 23 to 25h Random 255### Scenario 5: Daily Timer (23-25h Random)
256 256
257```mermaid 257```mermaid
258flowchart TB 258flowchart TB
@@ -281,635 +281,127 @@ flowchart TB
281 281
282## Core Algorithms 282## Core Algorithms
283 283
284### 1. derive_relay_targets 284### derive_relay_targets
285 285
286Transform RepoSyncIndex into per-relay sync targets: 286Transforms the repo-centric `RepoSyncIndex` into a relay-centric view. For each relay URL mentioned in any repo's announcements, collects all the repos and root events that should be synced from that relay.
287 287
288```rust 288**Implementation:** [`derive_relay_targets()`](../../src/sync/algorithms.rs:61)
289/// Inverts RepoSyncIndex to get per-relay view
290fn derive_relay_targets(
291 repo_index: &HashMap<String, RepoSyncNeeds>
292) -> HashMap<String, RelaySyncNeeds> {
293 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
294
295 for (repo_ref, needs) in repo_index {
296 for relay_url in &needs.relays {
297 let target = targets.entry(relay_url.clone()).or_default();
298 target.repos.insert(repo_ref.clone());
299 target.root_events.extend(needs.root_events.iter().cloned());
300 }
301 }
302
303 targets
304}
305```
306
307### 2. compute_actions (Three-Way Diff)
308
309**This is the ONLY decision point for what NEW subscriptions to create.**
310
311```rust
312/// Computes AddFilters for items that are:
313/// - In targets (what we want)
314/// - NOT in pending (already in-flight)
315/// - NOT in confirmed (already confirmed)
316fn compute_actions(
317 targets: &HashMap<String, RelaySyncNeeds>,
318 pending: &HashMap<String, Vec<PendingBatch>>,
319 confirmed: &HashMap<String, RelayState>,
320) -> Vec<AddFilters> {
321 let mut actions = Vec::new();
322
323 for (relay_url, target) in targets {
324 // Skip disconnected relays - they will get AddFilters on reconnect
325 if let Some(state) = confirmed.get(relay_url) {
326 if state.connection_status != ConnectionStatus::Connected {
327 continue;
328 }
329 }
330
331 // Collect all pending items for this relay
332 let pending_repos: HashSet<_> = pending.get(relay_url)
333 .map(|batches| batches.iter()
334 .flat_map(|b| b.items.repos.iter().cloned())
335 .collect())
336 .unwrap_or_default();
337 let pending_events: HashSet<_> = pending.get(relay_url)
338 .map(|batches| batches.iter()
339 .flat_map(|b| b.items.root_events.iter().cloned())
340 .collect())
341 .unwrap_or_default();
342
343 // Collect confirmed items for this relay
344 let confirmed_repos = confirmed.get(relay_url)
345 .map(|c| &c.repos)
346 .unwrap_or(&HashSet::new());
347 let confirmed_events = confirmed.get(relay_url)
348 .map(|c| &c.root_events)
349 .unwrap_or(&HashSet::new());
350
351 // New = target - pending - confirmed
352 let new_repos: HashSet<_> = target.repos.iter()
353 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
354 .cloned()
355 .collect();
356 let new_events: HashSet<_> = target.root_events.iter()
357 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
358 .cloned()
359 .collect();
360
361 if !new_repos.is_empty() || !new_events.is_empty() {
362 let filters = build_filters(&new_repos, &new_events);
363 actions.push(AddFilters {
364 relay_url: relay_url.clone(),
365 repos: new_repos,
366 root_events: new_events,
367 filters,
368 });
369 }
370 }
371
372 actions
373}
374```
375
376### 3. Filter Building Functions (Three-Layer Strategy)
377
378The filter strategy uses three layers:
379
380- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
381- **Layer 2**: Events tagging our repos
382- **Layer 3**: Events tagging our root events
383
384**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
385
386```rust
387/// Layer 1: Announcements filter (kinds 30617 + 30618)
388/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
389/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
390fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
391 let filter = Filter::new().kinds([
392 Kind::Custom(30617), // Repository announcements
393 Kind::Custom(30618), // Maintainer lists
394 ]);
395
396 match since {
397 Some(ts) => filter.since(ts),
398 None => filter,
399 }
400}
401
402/// Layer 2: Events tagging one of our repos
403/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
404/// Batched per 100 repo refs.
405fn tagged_one_of_our_repo_event_filters(
406 repos: &HashSet<String>,
407 since: Option<Timestamp>,
408) -> Vec<Filter> {
409 let mut filters = Vec::new();
410 let repo_refs: Vec<_> = repos.iter().collect();
411
412 for chunk in repo_refs.chunks(100) {
413 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
414
415 // Lowercase 'a' tag - standard addressable reference
416 let mut f1 = Filter::new()
417 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
418 // Uppercase 'A' tag - some clients use this
419 let mut f2 = Filter::new()
420 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone());
421 // Quote 'q' tag - NIP-10 quote references to addressable events
422 let mut f3 = Filter::new()
423 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
424
425 if let Some(ts) = since {
426 f1 = f1.since(ts);
427 f2 = f2.since(ts);
428 f3 = f3.since(ts);
429 }
430
431 filters.push(f1);
432 filters.push(f2);
433 filters.push(f3);
434 }
435
436 filters
437}
438
439/// Layer 3: Events tagging one of our root events
440/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
441/// Batched per 100 event IDs.
442fn tagged_one_of_our_root_event_filters(
443 root_events: &HashSet<EventId>,
444 since: Option<Timestamp>,
445) -> Vec<Filter> {
446 let mut filters = Vec::new();
447 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
448
449 for chunk in event_ids.chunks(100) {
450 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
451
452 // Lowercase 'e' tag - standard event reference
453 let mut f1 = Filter::new()
454 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
455 // Uppercase 'E' tag - some clients use this
456 let mut f2 = Filter::new()
457 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone());
458 // Quote 'q' tag - NIP-10 quote references to events
459 let mut f3 = Filter::new()
460 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
461
462 if let Some(ts) = since {
463 f1 = f1.since(ts);
464 f2 = f2.since(ts);
465 f3 = f3.since(ts);
466 }
467
468 filters.push(f1);
469 filters.push(f2);
470 filters.push(f3);
471 }
472
473 filters
474}
475
476/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
477/// Used by:
478/// - compute_actions for incremental subscriptions
479/// - consolidation rebuilds (Layer 1 remains active)
480fn build_layer2_and_layer3_filters(
481 repos: &HashSet<String>,
482 root_events: &HashSet<EventId>,
483 since: Option<Timestamp>,
484) -> Vec<Filter> {
485 let mut filters = Vec::new();
486 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
487 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
488 filters
489}
490```
491
492**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently.
493
494### 4. handle_add_filters (SyncManager)
495
496```rust
497impl SyncManager {
498 async fn handle_add_filters(&mut self, action: AddFilters) {
499 let AddFilters { relay_url, repos, root_events, filters } = action;
500
501 // Auto-spawn connection if needed
502 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
503 match state {
504 None => {
505 // New relay discovered - create entry and spawn connection
506 self.relay_sync_index.write().await.insert(
507 relay_url.clone(),
508 RelayState {
509 repos: HashSet::new(),
510 root_events: HashSet::new(),
511 is_bootstrap: false,
512 connection_status: ConnectionStatus::Connecting,
513 last_connected: None,
514 disconnected_at: None,
515 connection: None,
516 }
517 );
518 self.spawn_connection(&relay_url).await;
519 return; // Subscriptions will happen on connection success
520 }
521 Some(state) if state.connection_status != ConnectionStatus::Connected => {
522 // Not connected - subscriptions will happen on connection success
523 return;
524 }
525 Some(_) => {
526 // Already connected - proceed with subscription
527 }
528 }
529
530 // Subscribe and collect subscription IDs
531 let conn = self.connections.get(&relay_url).unwrap();
532 let mut sub_ids = HashSet::new();
533
534 for filter in filters {
535 match conn.client.subscribe(filter, None).await {
536 Ok(output) => {
537 for sub_id in output.val {
538 sub_ids.insert(sub_id);
539 }
540 }
541 Err(e) => {
542 tracing::warn!("Failed to subscribe: {}", e);
543 }
544 }
545 }
546
547 // Create pending batch
548 let batch = PendingBatch {
549 batch_id: self.next_batch_id(),
550 items: PendingItems { repos, root_events },
551 outstanding_subs: sub_ids,
552 };
553
554 // Add to pending index
555 self.pending_sync_index.write().await
556 .entry(relay_url)
557 .or_default()
558 .push(batch);
559 }
560}
561```
562
563### 5. handle_disconnect
564 289
565```rust 290```rust
566impl SyncManager { 291// Conceptual: inverts repo → relays to relay → repos
567 /// Called when connection to a relay is lost 292fn derive_relay_targets(repo_index: &HashMap<String, RepoSyncNeeds>)
568 async fn handle_disconnect(&mut self, relay_url: &str) { 293 -> HashMap<String, RelaySyncNeeds>
569 let mut index = self.relay_sync_index.write().await;
570
571 if let Some(state) = index.get_mut(relay_url) {
572 state.connection_status = ConnectionStatus::Disconnected;
573 state.disconnected_at = Some(Timestamp::now());
574 state.connection = None;
575 }
576
577 // Clear pending batches - these items were not confirmed
578 self.pending_sync_index.write().await.remove(relay_url);
579
580 // Remove from active connections map
581 self.connections.remove(relay_url);
582
583 // Health tracker records failure for backoff
584 self.health_tracker.record_failure(relay_url);
585 }
586}
587``` 294```
588 295
589### 6. handle_connect_or_reconnect (Unified) 296### compute_actions (Three-Way Diff)
590
591This method handles BOTH initial connection AND reconnection with unified logic:
592
593```rust
594impl SyncManager {
595 /// Called when connection to a relay succeeds - handles both initial connect and reconnect.
596 ///
597 /// Decision tree:
598 /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history
599 /// - Quick reconnect (<15min): since = last_connected - 15min
600 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
601 let mut index = self.relay_sync_index.write().await;
602 let state = match index.get_mut(relay_url) {
603 Some(s) => s,
604 None => return, // Relay was removed while disconnected
605 };
606
607 // Determine if this is a fresh sync or quick reconnect
608 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
609 let last_connected = state.last_connected;
610
611 if is_fresh_sync && state.last_connected.is_some() {
612 // Stale reconnect (>15min) - clear state
613 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
614 state.clear_sync_state();
615 }
616
617 // Update connection state
618 state.connection_status = ConnectionStatus::Connected;
619 state.last_connected = Some(Timestamp::now());
620 state.disconnected_at = None;
621
622 // Record success in health tracker
623 self.health_tracker.record_success(relay_url);
624 297
625 drop(index); // Release lock 298**This is the ONLY decision point for what NEW subscriptions to create.**
626
627 let conn = match self.connections.get(relay_url) {
628 Some(c) => c,
629 None => return,
630 };
631
632 if is_fresh_sync {
633 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
634
635 // Step 1: Subscribe Layer 1 (announcements) without since
636 let layer1 = build_announcement_filter(None);
637 let _ = conn.client.subscribe(layer1, None).await;
638 299
639 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) 300Performs a three-way diff: `target - pending - confirmed = new`
640 self.recompute_actions_for_relay(relay_url).await;
641 } else {
642 // Quick reconnect: Layer 1 with since, Layer 2+3 with since
643 let since = last_connected
644 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
645 .unwrap_or(Timestamp::from(0));
646 301
647 // Step 1: Subscribe Layer 1 (announcements) with since 302- **targets**: What we want (from derive_relay_targets)
648 let layer1 = build_announcement_filter(Some(since)); 303- **pending**: What's already in-flight awaiting EOSE
649 let _ = conn.client.subscribe(layer1, None).await; 304- **confirmed**: What's already confirmed syncing
650 305
651 // Step 2: Rebuild Layer 2+3 for confirmed items with since 306Only creates `AddFilters` actions for items not already pending or confirmed. Skips disconnected relays (they will get AddFilters on reconnect).
652 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
653 307
654 // Step 3: Check for NEW items via compute_actions 308**Implementation:** [`compute_actions()`](../../src/sync/algorithms.rs:96)
655 self.recompute_actions_for_relay(relay_url).await;
656 }
657 }
658 309
659 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). 310---
660 /// Used by:
661 /// - Quick reconnect: rebuild confirmed items with since filter
662 /// - Consolidation: close and rebuild with since filter
663 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
664 let confirmed = self.relay_sync_index.read().await;
665 let state = match confirmed.get(relay_url) {
666 Some(s) => s,
667 None => return,
668 };
669
670 // Build Layer 2+3 filters WITH since
671 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
672 drop(confirmed);
673
674 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
675 let conn = match self.connections.get(relay_url) {
676 Some(c) => c,
677 None => return,
678 };
679
680 for filter in filters {
681 let _ = conn.client.subscribe(filter, None).await;
682 }
683 }
684 311
685 /// Rerun compute_actions for a specific relay and process resulting AddFilters. 312## Filter Building (Three-Layer Strategy)
686 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
687 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
688 let repo_index = self.repo_sync_index.read().await;
689 let targets = derive_relay_targets(&repo_index);
690 drop(repo_index);
691 313
692 // Filter to just this relay 314The filter strategy uses three layers to ensure comprehensive event coverage:
693 let target = match targets.get(relay_url) {
694 Some(t) => t.clone(),
695 None => return, // No repos reference this relay anymore
696 };
697 315
698 let pending = self.pending_sync_index.read().await; 316### Layer 1: Announcements
699 let confirmed = self.relay_sync_index.read().await;
700 317
701 let mut single_relay_targets = HashMap::new(); 318- **Kinds**: 30617 (Repository Announcements), 30618 (Maintainer Lists)
702 single_relay_targets.insert(relay_url.to_string(), target); 319- **When subscribed**: ONCE on connect, NOT rebuilt during consolidation
320- **Function**: [`build_announcement_filter()`](../../src/sync/filters.rs:20)
321- 30618 is ONLY synced from remote relays, not self-subscribed
703 322
704 let actions = compute_actions(&single_relay_targets, &pending, &confirmed); 323### Layer 2: Events Tagging Our Repos
705 324
706 drop(pending); 325- **Tags**: lowercase `a`, uppercase `A`, and `q` tags for comprehensive coverage
707 drop(confirmed); 326- **Batching**: Per 100 repo refs
327- **Function**: [`tagged_one_of_our_repo_event_filters()`](../../src/sync/filters.rs:43)
708 328
709 // Process AddFilters 329### Layer 3: Events Tagging Our Root Events
710 for action in actions {
711 self.handle_add_filters(action).await;
712 }
713 }
714}
715```
716 330
717### 7. Daily Timer 331- **Tags**: lowercase `e`, uppercase `E`, and `q` tags for comprehensive coverage
332- **Batching**: Per 100 event IDs
333- **Function**: [`tagged_one_of_our_root_event_filters()`](../../src/sync/filters.rs:98)
718 334
719```rust 335### Combined Layer 2+3
720impl SyncManager {
721 async fn run_daily_timer(&self) {
722 loop {
723 // Random 23-25 hours
724 let hours = 23.0 + rand::random::<f64>() * 2.0;
725 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
726
727 let relay_urls: Vec<_> = self.relay_sync_index.read().await
728 .keys()
729 .cloned()
730 .collect();
731
732 for relay_url in relay_urls {
733 self.daily_sync(&relay_url).await;
734 }
735 }
736 }
737
738 /// Perform daily fresh sync for a relay
739 async fn daily_sync(&mut self, relay_url: &str) {
740 tracing::info!("Daily sync triggered for {}", relay_url);
741 336
742 // Close all subscriptions 337The [`build_layer2_and_layer3_filters()`](../../src/sync/filters.rs:152) function combines both layers. Used by:
743 if let Some(conn) = self.connections.get(relay_url) { 338- `compute_actions` for incremental subscriptions
744 conn.client.unsubscribe_all().await; 339- `rebuild_layer2_and_layer3` during reconnection
745 } 340- Consolidation rebuilds (Layer 1 remains active separately)
746 341
747 // Clear PendingSyncIndex 342**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
748 self.pending_sync_index.write().await.remove(relay_url);
749
750 // Clear confirmed state - triggers fresh sync
751 {
752 let mut index = self.relay_sync_index.write().await;
753 if let Some(state) = index.get_mut(relay_url) {
754 state.clear_sync_state();
755 }
756 }
757 343
758 // Recompute actions - will generate AddFilters for everything 344---
759 self.recompute_actions_for_relay(relay_url).await;
760 }
761}
762```
763 345
764### 8. Consolidation (Threshold-Based, Triggered on Add) 346## SyncManager Key Methods
765 347
766Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active. 348The [`SyncManager`](../../src/sync/mod.rs:308) orchestrates all sync operations. Key methods:
767 349
768```rust 350### Connection Lifecycle
769impl SyncManager {
770 /// Check filter count and consolidate if needed.
771 /// Called from handle_add_filters BEFORE adding new filters.
772 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
773 let current_count = self.get_filter_count(relay_url).await;
774
775 if current_count + new_filter_count > 70 {
776 self.consolidate(relay_url).await;
777 }
778 }
779 351
780 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. 352| Method | Purpose |
781 /// Does NOT clear state - just reduces filter count. 353|--------|---------|
782 async fn consolidate(&mut self, relay_url: &str) { 354| `handle_connect_or_reconnect()` | Unified handler for initial connect and reconnect. Determines fresh vs quick reconnect based on `last_connected` and 15-minute rule |
783 tracing::info!("Consolidating filters for {} (count > 70)", relay_url); 355| `handle_disconnect()` | Updates RelayState to Disconnected, sets disconnected_at, clears pending batches, records failure in health tracker |
356| `spawn_relay_connection()` | Creates RelayConnection, subscribes to Layer 1, spawns event loop task |
784 357
785 // Wait for all pending batches to complete first 358### Sync Operations
786 self.wait_pending_complete(relay_url).await;
787 359
788 // Close Layer 2+3 subscriptions only - Layer 1 remains active 360| Method | Purpose |
789 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately 361|--------|---------|
790 // For simplicity, we close all and re-add Layer 1 362| `handle_add_filters()` | Auto-spawns connection if needed, checks consolidation threshold (>70 filters), subscribes and creates PendingBatch |
791 if let Some(conn) = self.connections.get(relay_url) { 363| `handle_eose()` | Processes EOSE for subscription, moves items from pending to confirmed when batch completes |
792 conn.client.unsubscribe_all().await; 364| `recompute_actions_for_relay()` | Runs derive_relay_targets compute_actions for a specific relay to find new items |
793 } 365| `rebuild_layer2_and_layer3()` | Rebuilds subscriptions from confirmed state with optional since filter |
794 366
795 // Re-subscribe Layer 1 with since (maintains announcements stream) 367### Maintenance
796 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
797 let conn = self.connections.get(relay_url).unwrap();
798 let layer1 = build_announcement_filter(Some(since));
799 let _ = conn.client.subscribe(layer1, None).await;
800 368
801 // Rebuild Layer 2+3 only 369| Method | Purpose |
802 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; 370|--------|---------|
803 } 371| `daily_sync()` | Full fresh sync - unsubscribes all, clears state, recomputes actions |
804} 372| `consolidate()` | Reduces filter count by unsubscribing and rebuilding with combined filters |
805``` 373| `check_disconnects()` | Periodic check for empty relays (no repos) to disconnect |
374| `check_reconnects()` | Attempts reconnection for disconnected relays with pending work |
806 375
807**Updated handle_add_filters to check consolidation:** 376---
808 377
809```rust 378## Self-Subscriber
810impl SyncManager {
811 async fn handle_add_filters(&mut self, action: AddFilters) {
812 let AddFilters { relay_url, repos, root_events, filters } = action;
813
814 // Auto-spawn connection if needed (unchanged)
815 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
816 match state {
817 None => {
818 // New relay discovered - create entry and spawn connection
819 self.relay_sync_index.write().await.insert(
820 relay_url.clone(),
821 RelayState {
822 repos: HashSet::new(),
823 root_events: HashSet::new(),
824 is_bootstrap: false,
825 connection_status: ConnectionStatus::Connecting,
826 last_connected: None,
827 disconnected_at: None,
828 connection: None,
829 }
830 );
831 self.spawn_connection(&relay_url).await;
832 return; // Subscriptions will happen on connection success
833 }
834 Some(state) if state.connection_status != ConnectionStatus::Connected => {
835 return; // Not connected - subscriptions will happen on connection success
836 }
837 Some(_) => {
838 // Already connected - proceed
839 }
840 }
841 379
842 // CHECK CONSOLIDATION BEFORE ADDING 380The [`SelfSubscriber`](../../src/sync/self_subscriber.rs:86) monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`.
843 self.maybe_consolidate(&relay_url, filters.len()).await;
844
845 // Subscribe and collect subscription IDs
846 let conn = self.connections.get(&relay_url).unwrap();
847 let mut sub_ids = HashSet::new();
848
849 for filter in filters {
850 match conn.client.subscribe(filter, None).await {
851 Ok(output) => {
852 for sub_id in output.val {
853 sub_ids.insert(sub_id);
854 }
855 }
856 Err(e) => {
857 tracing::warn!("Failed to subscribe: {}", e);
858 }
859 }
860 }
861 381
862 // Create pending batch (unchanged) 382### Event Kinds Monitored
863 let batch = PendingBatch {
864 batch_id: self.next_batch_id(),
865 items: PendingItems { repos, root_events },
866 outstanding_subs: sub_ids,
867 };
868
869 self.pending_sync_index.write().await
870 .entry(relay_url)
871 .or_default()
872 .push(batch);
873 }
874}
875```
876 383
877--- 384- **30617** - Repository Announcements (triggers discovery of repos listing our relay)
385- **1617** - Patches (root events referencing repos)
386- **1618** - Issues
387- **1619** - Replies/Status
388- **1621** - Pull Requests
878 389
879## Disconnect (Relay Removal) Handling 390Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays.
880 391
881```rust 392### Batching Flow
882impl SyncManager {
883 /// Periodically check for relays that should be disconnected
884 async fn check_disconnects(&mut self) {
885 let confirmed = self.relay_sync_index.read().await;
886 let relays_to_disconnect: Vec<_> = confirmed.iter()
887 .filter(|(_, state)| {
888 !state.is_bootstrap &&
889 state.repos.is_empty() &&
890 state.root_events.is_empty()
891 })
892 .map(|(url, _)| url.clone())
893 .collect();
894 drop(confirmed);
895
896 for relay_url in relays_to_disconnect {
897 self.disconnect_relay(&relay_url).await;
898 }
899 }
900 393
901 async fn disconnect_relay(&mut self, relay_url: &str) { 3941. **Receive events** from own relay subscription
902 tracing::info!("Disconnecting relay {} (no repos)", relay_url); 3952. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID
3963. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events
3974. **Process batch**:
398 - Update `RepoSyncIndex` with discovered repos and root events
399 - Call `derive_relay_targets()` → `compute_actions()`
400 - Send `AddFilters` actions to SyncManager
903 401
904 self.relay_sync_index.write().await.remove(relay_url); 402### Reconnection
905 self.pending_sync_index.write().await.remove(relay_url);
906 403
907 if let Some(conn) = self.connections.remove(relay_url) { 404Uses `last_connected` timestamp to apply since filter on reconnect (15-minute buffer), similar to external relay reconnection logic.
908 let _ = conn.client.disconnect().await;
909 }
910 }
911}
912```
913 405
914--- 406---
915 407
@@ -981,25 +473,20 @@ flowchart TB
981 473
982## Key Design Decisions 474## Key Design Decisions
983 475
984| Decision | Choice | Rationale | 476| Decision | Choice | Rationale |
985| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | 477|----------|--------|-----------|
986| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | 478| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
987| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | 479| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
988| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | 480| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
989| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | 481| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
990| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | 482| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
991| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | 483| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
992| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | 484| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
993| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | 485| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
994| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | 486| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
995| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | 487| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
996| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | 488| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
997| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | 489| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
998| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
999| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
1000| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
1001| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
1002| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
1003 490
1004--- 491---
1005 492
@@ -1007,324 +494,37 @@ flowchart TB
1007 494
1008``` 495```
1009src/sync/ 496src/sync/
1010├── mod.rs # SyncManager, main loop 497├── mod.rs # SyncManager, main loop, data structures (RepoSyncNeeds, RelayState, etc.)
1011├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types 498├── algorithms.rs # derive_relay_targets(), compute_actions(), AddFilters
1012├── actions.rs # AddFilters struct, compute_actions, build_filters 499├── filters.rs # build_announcement_filter(), build_layer2_and_layer3_filters()
1013├── self_subscriber.rs # SelfSubscriber, batching logic 500├── health.rs # RelayHealthTracker with exponential backoff
1014├── relay_connection.rs # Per-relay WebSocket connection 501├── relay_connection.rs # RelayConnection, RelayEvent handling
1015├── consolidation.rs # Consolidation logic, daily timer 502├── self_subscriber.rs # SelfSubscriber with batching
1016├── health.rs # Health tracking (reuse from v2) 503└── metrics.rs # SyncMetrics for Prometheus
1017└── metrics.rs # Prometheus metrics (reuse from v2)
1018``` 504```
1019 505
1020--- 506---
1021 507
1022## Comparison: v3 vs v4 508## Health Tracking
1023
1024| Aspect | v3 | v4 |
1025| ------------------------ | ----------------------------------------- | --------------------------------------------- |
1026| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1027| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1028| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1029| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1030| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1031| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1032| PSI clearing | On disconnect | On disconnect (confirmed) |
1033| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
1034
1035---
1036
1037## Self-Subscriber Flow
1038
1039The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions.
1040
1041### State Tracking
1042
1043```rust
1044pub struct SelfSubscriber {
1045 own_relay_url: String,
1046 relay_domain: String,
1047 repo_sync_index: RepoSyncIndex,
1048 pending_sync_index: PendingSyncIndex,
1049 relay_sync_index: RelaySyncIndex,
1050 action_tx: mpsc::Sender<AddFilters>,
1051 /// Timestamp of last successful connection - used for since filter on reconnection
1052 last_connected: Option<Timestamp>,
1053 /// The active client connection
1054 client: Option<Client>,
1055}
1056```
1057
1058### On Startup / Reconnect (Unified)
1059
1060Both initial startup and reconnection use the same `connect_and_subscribe` method:
1061
1062```rust
1063impl SelfSubscriber {
1064 async fn run(mut self) {
1065 loop {
1066 // Connect or reconnect
1067 if let Err(e) = self.connect_and_subscribe().await {
1068 tracing::warn!("Connection failed: {}, will retry", e);
1069 tokio::time::sleep(Duration::from_secs(5)).await;
1070 continue;
1071 }
1072
1073 // Run event loop until disconnection
1074 self.event_loop().await;
1075
1076 // Loop will retry connection
1077 }
1078 }
1079
1080 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
1081 let client = Client::new(Keys::generate());
1082 client.add_relay(&self.own_relay_url).await?;
1083 client.connect().await;
1084
1085 // Build filter - add since only on reconnect
1086 let filter = Filter::new().kinds([
1087 Kind::Custom(30617), // Repository announcements
1088 Kind::GitPatch, // 1617
1089 Kind::Custom(1618), // PRs
1090 Kind::Custom(1619), // PR updates
1091 Kind::GitIssue, // 1621
1092 ]);
1093
1094 let filter = if let Some(ts) = self.last_connected {
1095 // Reconnection: use since filter
1096 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
1097 filter.since(since)
1098 } else {
1099 // Initial connect: no since filter - get full history
1100 filter
1101 };
1102
1103 // Update last_connected AFTER computing since
1104 self.last_connected = Some(Timestamp::now());
1105
1106 client.subscribe(filter, None).await?;
1107 self.client = Some(client);
1108 Ok(())
1109 }
1110}
1111```
1112
1113### Event Loop with Batching
1114
1115```rust
1116impl SelfSubscriber {
1117 async fn event_loop(&mut self) {
1118 let client = self.client.as_ref().unwrap();
1119 let mut pending_events: Vec<Event> = Vec::new();
1120 let mut batch_timer: Option<Instant> = None;
1121 let batch_window = Duration::from_secs(5);
1122
1123 loop {
1124 let timeout = batch_timer
1125 .map(|t| batch_window.saturating_sub(t.elapsed()))
1126 .unwrap_or(Duration::from_secs(60));
1127
1128 tokio::select! {
1129 notification = client.notifications().recv() => {
1130 match notification {
1131 Ok(RelayPoolNotification::Event { event, .. }) => {
1132 pending_events.push(*event);
1133
1134 // Start timer on first event - does NOT reset
1135 if batch_timer.is_none() {
1136 batch_timer = Some(Instant::now());
1137 }
1138 }
1139 Ok(RelayPoolNotification::Shutdown) => {
1140 // Connection lost
1141 break;
1142 }
1143 _ => {}
1144 }
1145 }
1146 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
1147 // Batch window elapsed
1148 self.process_batch(pending_events.drain(..).collect()).await;
1149 batch_timer = None;
1150 }
1151 }
1152 }
1153 }
1154
1155 async fn process_batch(&self, events: Vec<Event>) {
1156 // 1. Update RepoSyncIndex
1157 for event in events {
1158 match event.kind.as_u16() {
1159 30617 => self.handle_announcement(&event).await,
1160 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
1161 _ => {}
1162 }
1163 }
1164
1165 // 2. Derive targets and compute actions
1166 let repo_index = self.repo_sync_index.read().await;
1167 let targets = derive_relay_targets(&repo_index);
1168 509
1169 let pending = self.pending_sync_index.read().await; 510The [`RelayHealthTracker`](../../src/sync/health.rs:93) manages connection health with exponential backoff:
1170 let confirmed = self.relay_sync_index.read().await;
1171 511
1172 let actions = compute_actions(&targets, &pending, &confirmed); 512- **States**: Healthy, Degraded, Dead
513- **Backoff**: `base * 2^(failures-1)`, capped at max_backoff
514- **Dead threshold**: 24 hours of continuous failures
515- **Dead relay retry**: Once per 24 hours
1173 516
1174 drop(repo_index); 517Bootstrap relays are never disconnected by the cleanup system, even if empty.
1175 drop(pending);
1176 drop(confirmed);
1177
1178 // 3. Send actions to SyncManager
1179 for action in actions {
1180 let _ = self.action_tx.send(action).await;
1181 }
1182 }
1183
1184 async fn handle_announcement(&self, event: &Event) {
1185 // Extract repo_ref from event - 30617:pubkey:identifier
1186 let d_tag = event.tags.iter()
1187 .find_map(|tag| {
1188 if tag.kind() == TagKind::D {
1189 tag.content().map(|s| s.to_string())
1190 } else {
1191 None
1192 }
1193 })
1194 .unwrap_or_default();
1195
1196 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1197
1198 // Extract relay URLs from 'r' tags
1199 let relays: HashSet<String> = event.tags.iter()
1200 .filter_map(|tag| {
1201 if tag.kind() == TagKind::Relay {
1202 tag.content().map(|s| s.to_string())
1203 } else {
1204 None
1205 }
1206 })
1207 .collect();
1208
1209 // Update RepoSyncIndex
1210 let mut index = self.repo_sync_index.write().await;
1211 let needs = index.entry(repo_ref).or_default();
1212 needs.relays = relays;
1213 }
1214
1215 async fn handle_root_event(&self, event: &Event) {
1216 // Extract repo_ref from 'a' tag
1217 let repo_ref = event.tags.iter()
1218 .find_map(|tag| {
1219 if tag.kind() == TagKind::A {
1220 tag.content().map(|s| s.to_string())
1221 } else {
1222 None
1223 }
1224 });
1225
1226 if let Some(repo_ref) = repo_ref {
1227 let mut index = self.repo_sync_index.write().await;
1228 let needs = index.entry(repo_ref).or_default();
1229 needs.root_events.insert(event.id);
1230 }
1231 }
1232}
1233```
1234 518
1235--- 519---
1236 520
1237## Implementation Notes 521## Disconnect Handling
1238
1239This section documents the actual implementation details as of December 2024 (Phases 1-10 complete).
1240
1241### Architectural Decisions During Implementation
1242
1243**Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc<Mutex<SyncManager>>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup:
1244
1245```rust
1246// 7. Wrap self in Arc<Mutex> for sharing with timer task
1247let sync_manager = Arc::new(Mutex::new(self));
1248```
1249
1250This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber.
1251
1252**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses:
1253
1254- `DashMap` for thread-safe concurrent access without external locking
1255- Three states: `Healthy`, `Degraded`, `Dead`
1256- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff
1257- Dead threshold: 24 hours of continuous failures
1258- Dead relay retry: Once per 24 hours
1259
1260### Implementation Constants
1261
1262| Constant | Value | Purpose |
1263| --------------------------------- | ---------- | ------------------------------------------------ |
1264| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation |
1265| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation |
1266| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync |
1267| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect |
1268| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead |
1269| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff |
1270
1271### Daily Timer Randomization
1272
1273The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running:
1274
1275```rust
1276let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0;
1277```
1278
1279### Bootstrap Relay Protection
1280
1281Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out:
1282
1283```rust
1284.filter(|(_, state)| {
1285 !state.is_bootstrap &&
1286 state.repos.is_empty() &&
1287 state.root_events.is_empty()
1288})
1289```
1290
1291### Graceful Shutdown
1292
1293Shutdown uses a tokio broadcast channel for coordinated termination:
1294
1295```rust
1296let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
1297```
1298
1299Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop.
1300
1301### Actual Module Structure
1302
1303The implemented module structure differs from the original spec:
1304
1305```
1306src/sync/
1307├── mod.rs # SyncManager, main loop, index types, metrics
1308├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters
1309├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters
1310├── health.rs # RelayHealthTracker, HealthState, exponential backoff
1311├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling
1312└── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic
1313```
1314
1315Key differences from spec:
1316
1317- No separate `state.rs` - types are defined in `mod.rs`
1318- No separate `actions.rs` - moved to `algorithms.rs`
1319- No separate `consolidation.rs` - consolidation logic in `mod.rs`
1320- No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs`
1321
1322### Deviations from Original v4 Spec
1323
13241. **RelayState lacks `connection` field**: The spec showed `connection: Option<RelayConnection>` in `RelayState`, but the implementation stores connections in a separate `HashMap<String, RelayConnection>` in `SyncManager`.
1325 522
13262. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct. 523The disconnect checker runs periodically (default: 60 seconds) to clean up empty relays:
1327 524
13283. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. 525- Finds relays with `repos.is_empty() && root_events.is_empty()`
526- Skips bootstrap relays (`is_bootstrap == true`)
527- Removes from relay_sync_index, pending_sync_index, and connections
528- Disconnects the WebSocket connection
1329 529
13304. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. 530Also triggers reconnection attempts for disconnected relays that have pending work.