upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync-v4.md
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
commit18ad93f8d0b8ce172c9c227385a21af66a507950 (patch)
tree275ada806570a2105f4e75388a565f61276209c8 /docs/explanation/grasp-02-proactive-sync-v4.md
parent4e5a937a4ef5288e702ba2bae3daf2a78398b690 (diff)
docs: remove old grasp-02 design doc versions
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v4.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v4.md1330
1 files changed, 0 insertions, 1330 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md
deleted file mode 100644
index dd508b3..0000000
--- a/docs/explanation/grasp-02-proactive-sync-v4.md
+++ /dev/null
@@ -1,1330 +0,0 @@
1# GRASP-02: Proactive Sync v4 - Health & Reconnection Design
2
3## Overview
4
5This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles:
6
71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create
93. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions)
104. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch
115. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary
12
13---
14
15## Data Model
16
17### RepoSyncIndex (Source of Truth)
18
19```rust
20/// What we WANT to sync - derived from events received via self-subscription.
21/// Updated immediately when self-subscriber batch fires.
22/// Key: repo addressable ref - 30617:pubkey:identifier
23pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
24
25#[derive(Debug, Clone, Default)]
26pub struct RepoSyncNeeds {
27 /// Relay URLs listed in this repo's 30617 announcement
28 pub relays: HashSet<String>,
29 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
30 pub root_events: HashSet<EventId>,
31}
32```
33
34### RelaySyncIndex (Confirmed State + Connection)
35
36```rust
37/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
38/// Key: relay URL
39pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
40
41/// Connection status for a relay
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ConnectionStatus {
44 /// Not currently connected
45 Disconnected,
46 /// Connection attempt in progress
47 Connecting,
48 /// Successfully connected and subscribed
49 Connected,
50}
51
52/// Complete state for a single relay - combines sync needs with connection lifecycle
53#[derive(Debug)]
54pub struct RelayState {
55 /// Repos we have confirmed syncing from this relay
56 pub repos: HashSet<String>,
57 /// Root events we have confirmed tracking
58 pub root_events: HashSet<EventId>,
59 /// If true, never disconnect this relay
60 pub is_bootstrap: bool,
61 /// Current connection status
62 pub connection_status: ConnectionStatus,
63 /// When we last successfully connected - used for since filter on reconnect
64 pub last_connected: Option<Timestamp>,
65 /// When we disconnected - for 15-minute state retention rule
66 pub disconnected_at: Option<Timestamp>,
67}
68
69impl RelayState {
70 /// Check if state should be cleared based on 15-minute rule
71 pub fn should_clear_state(&self) -> bool {
72 match self.disconnected_at {
73 Some(disconnected) => {
74 let now = Timestamp::now();
75 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
76 }
77 None => false, // Still connected or never connected
78 }
79 }
80
81 /// Clear repos and root_events - called when reconnect takes > 15 minutes
82 pub fn clear_sync_state(&mut self) {
83 self.repos.clear();
84 self.root_events.clear();
85 }
86}
87```
88
89### PendingSyncIndex (In-Flight Batches)
90
91```rust
92/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
93/// Each batch has its own ID and can confirm independently.
94/// Key: relay URL
95pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
96
97#[derive(Debug, Clone)]
98pub struct PendingBatch {
99 /// Unique ID for this batch - for debugging/logging
100 pub batch_id: u64,
101 /// The items this batch is syncing
102 pub items: PendingItems,
103 /// Subscription IDs that must ALL receive EOSE before confirming
104 pub outstanding_subs: HashSet<SubscriptionId>,
105}
106
107#[derive(Debug, Clone, Default)]
108pub struct PendingItems {
109 pub repos: HashSet<String>,
110 pub root_events: HashSet<EventId>,
111}
112```
113
114---
115
116## Connection Lifecycle State Machine
117
118```mermaid
119stateDiagram-v2
120 [*] --> Disconnected: discover relay via RepoSyncIndex
121 Disconnected --> Connecting: AddFilters triggers spawn_connection
122 Connecting --> Connected: success
123 Connecting --> Disconnected: failure + record in health tracker
124 Connected --> Disconnected: connection lost
125 Connected --> [*]: intentional disconnect via check_disconnects
126
127 note right of Disconnected: disconnected_at set for 15min rule
128 note right of Connected: last_connected tracked for since filter
129```
130
131---
132
133## Flow Scenarios
134
135### Scenario 1: Initial Connect via handle_connect_or_reconnect
136
137```mermaid
138flowchart TB
139 START[Startup] --> SS[Self-subscribe to own relay]
140 SS --> |no since filter| EVENTS[Receive historical events]
141 EVENTS --> RSI[Update RepoSyncIndex]
142 RSI --> DT[derive_relay_targets]
143 DT --> CA[compute_actions with targets and empty confirmed]
144 CA --> AF[AddFilters for each relay]
145 AF --> SPAWN{Relay connected?}
146 SPAWN --> |no| CONN[spawn_connection]
147 CONN --> HC[handle_connect_or_reconnect]
148 SPAWN --> |yes| SUB
149
150 subgraph handle_connect_or_reconnect - Fresh Sync
151 HC --> CHECK_FRESH{is_fresh_sync?}
152 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since]
153 L1 --> RCA[recompute_actions_for_relay]
154 end
155
156 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters]
157 SUB --> PB[Create PendingBatch]
158 PB --> EOSE[Wait for EOSE]
159 EOSE --> CONFIRM[Move items to confirmed repos/root_events]
160```
161
162**Key points:**
163
164- No `since` filter on initial connect - get full history
165- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()`
166- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
167- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking
168
169### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes
170
171```mermaid
172flowchart TB
173 DISC[Connection lost] --> MARK[Set disconnected_at = now]
174 MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay]
175 CLEAR_PEND --> WAIT[Wait for reconnection]
176 WAIT --> RECONN[Connection restored]
177 RECONN --> HC[handle_connect_or_reconnect]
178
179 subgraph handle_connect_or_reconnect - Quick Reconnect
180 HC --> CHECK{is_fresh_sync?}
181 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min]
182 SINCE --> L1[build_announcement_filter - with since]
183 L1 --> L23[rebuild_layer2_and_layer3 - with since]
184 L23 --> RCA[recompute_actions_for_relay]
185 end
186
187 RCA --> AF[AddFilters for new items only]
188 AF --> SUB[Subscribe]
189 SUB --> PB[Create PendingBatch]
190 PB --> EOSE[Wait for EOSE]
191 EOSE --> EXTEND[Extend confirmed state]
192```
193
194**Key points:**
195
196- PendingSyncIndex cleared on disconnect (not reconnect)
197- `handle_connect_or_reconnect`:
198 1. `build_announcement_filter(Some(since))` - Layer 1 with since
199 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
200 3. `recompute_actions_for_relay` - check for new items
201- since = last_connected - 15min ensures we catch events during disconnection
202
203### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes
204
205```mermaid
206flowchart TB
207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect]
208
209 subgraph handle_connect_or_reconnect - Stale Reconnect
210 HC --> CHECK{is_fresh_sync?}
211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state]
212 CLEAR --> L1[build_announcement_filter - no since]
213 L1 --> RCA[recompute_actions_for_relay]
214 end
215
216 RCA --> CA[compute_actions with empty confirmed]
217 CA --> AF[AddFilters for everything]
218 AF --> SUB[Subscribe - no since filter]
219 SUB --> PB[Create PendingBatch]
220 PB --> EOSE[Wait for EOSE]
221 EOSE --> CONFIRM[Populate confirmed state fresh]
222```
223
224**Key points:**
225
226- `should_clear_state()` returns true → triggers fresh sync
227- Same path as initial connect after clearing state
228- Layer 1: `build_announcement_filter(None)` - full history
229- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything
230
231### Scenario 4: Consolidation - Triggered on Filter Add
232
233```mermaid
234flowchart TB
235 AF[handle_add_filters called] --> COUNT{current + new > 70?}
236 COUNT --> |yes| CONSOLIDATE[consolidate]
237 CONSOLIDATE --> WAIT_PEND[wait_pending_complete]
238 WAIT_PEND --> CLOSE[unsubscribe_all]
239 CLOSE --> SINCE[since = now - 15min]
240 SINCE --> L1[build_announcement_filter - with since]
241 L1 --> L23[rebuild_layer2_and_layer3 - with since]
242 COUNT --> |no| SUB[Subscribe new filters]
243 SUB --> PB[Create PendingBatch]
244```
245
246**Key points:**
247
248- Consolidation checked in `handle_add_filters` BEFORE adding new filters
249- After closing all subscriptions, re-subscribe:
250 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
251 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
252- `since = now - 15min` prevents re-fetching old events
253- Keeps confirmed state, just reduces filter count
254
255### Scenario 5: Daily Timer - 23 to 25h Random
256
257```mermaid
258flowchart TB
259 DAILY[Daily timer fires] --> CLOSE[unsubscribe_all]
260 CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay]
261 CLEAR_PEND --> CLEAR_STATE[clear_sync_state]
262 CLEAR_STATE --> L1[build_announcement_filter - no since]
263 L1 --> RCA[recompute_actions_for_relay]
264 RCA --> CA[compute_actions with empty confirmed]
265 CA --> AF[AddFilters for everything]
266 AF --> SUB[Subscribe - no since filter]
267 SUB --> PB[Create PendingBatch]
268 PB --> EOSE[Wait for EOSE]
269 EOSE --> CONFIRM[Repopulate confirmed state]
270```
271
272**Key points:**
273
274- Daily timer is a full fresh sync, NOT consolidation
275- Clears both PendingSyncIndex and confirmed state
276- Layer 1: `build_announcement_filter(None)` - full history
277- Layer 2+3: via compute_actions with empty confirmed - full history
278- Detects any state drift accumulated over 24 hours
279
280---
281
282## Core Algorithms
283
284### 1. derive_relay_targets
285
286Transform RepoSyncIndex into per-relay sync targets:
287
288```rust
289/// Inverts RepoSyncIndex to get per-relay view
290fn derive_relay_targets(
291 repo_index: &HashMap<String, RepoSyncNeeds>
292) -> HashMap<String, RelaySyncNeeds> {
293 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
294
295 for (repo_ref, needs) in repo_index {
296 for relay_url in &needs.relays {
297 let target = targets.entry(relay_url.clone()).or_default();
298 target.repos.insert(repo_ref.clone());
299 target.root_events.extend(needs.root_events.iter().cloned());
300 }
301 }
302
303 targets
304}
305```
306
307### 2. compute_actions (Three-Way Diff)
308
309**This is the ONLY decision point for what NEW subscriptions to create.**
310
311```rust
312/// Computes AddFilters for items that are:
313/// - In targets (what we want)
314/// - NOT in pending (already in-flight)
315/// - NOT in confirmed (already confirmed)
316fn compute_actions(
317 targets: &HashMap<String, RelaySyncNeeds>,
318 pending: &HashMap<String, Vec<PendingBatch>>,
319 confirmed: &HashMap<String, RelayState>,
320) -> Vec<AddFilters> {
321 let mut actions = Vec::new();
322
323 for (relay_url, target) in targets {
324 // Skip disconnected relays - they will get AddFilters on reconnect
325 if let Some(state) = confirmed.get(relay_url) {
326 if state.connection_status != ConnectionStatus::Connected {
327 continue;
328 }
329 }
330
331 // Collect all pending items for this relay
332 let pending_repos: HashSet<_> = pending.get(relay_url)
333 .map(|batches| batches.iter()
334 .flat_map(|b| b.items.repos.iter().cloned())
335 .collect())
336 .unwrap_or_default();
337 let pending_events: HashSet<_> = pending.get(relay_url)
338 .map(|batches| batches.iter()
339 .flat_map(|b| b.items.root_events.iter().cloned())
340 .collect())
341 .unwrap_or_default();
342
343 // Collect confirmed items for this relay
344 let confirmed_repos = confirmed.get(relay_url)
345 .map(|c| &c.repos)
346 .unwrap_or(&HashSet::new());
347 let confirmed_events = confirmed.get(relay_url)
348 .map(|c| &c.root_events)
349 .unwrap_or(&HashSet::new());
350
351 // New = target - pending - confirmed
352 let new_repos: HashSet<_> = target.repos.iter()
353 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
354 .cloned()
355 .collect();
356 let new_events: HashSet<_> = target.root_events.iter()
357 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
358 .cloned()
359 .collect();
360
361 if !new_repos.is_empty() || !new_events.is_empty() {
362 let filters = build_filters(&new_repos, &new_events);
363 actions.push(AddFilters {
364 relay_url: relay_url.clone(),
365 repos: new_repos,
366 root_events: new_events,
367 filters,
368 });
369 }
370 }
371
372 actions
373}
374```
375
376### 3. Filter Building Functions (Three-Layer Strategy)
377
378The filter strategy uses three layers:
379
380- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
381- **Layer 2**: Events tagging our repos
382- **Layer 3**: Events tagging our root events
383
384**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
385
386```rust
387/// Layer 1: Announcements filter (kinds 30617 + 30618)
388/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
389/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
390fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
391 let filter = Filter::new().kinds([
392 Kind::Custom(30617), // Repository announcements
393 Kind::Custom(30618), // Maintainer lists
394 ]);
395
396 match since {
397 Some(ts) => filter.since(ts),
398 None => filter,
399 }
400}
401
402/// Layer 2: Events tagging one of our repos
403/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
404/// Batched per 100 repo refs.
405fn tagged_one_of_our_repo_event_filters(
406 repos: &HashSet<String>,
407 since: Option<Timestamp>,
408) -> Vec<Filter> {
409 let mut filters = Vec::new();
410 let repo_refs: Vec<_> = repos.iter().collect();
411
412 for chunk in repo_refs.chunks(100) {
413 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
414
415 // Lowercase 'a' tag - standard addressable reference
416 let mut f1 = Filter::new()
417 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
418 // Uppercase 'A' tag - some clients use this
419 let mut f2 = Filter::new()
420 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone());
421 // Quote 'q' tag - NIP-10 quote references to addressable events
422 let mut f3 = Filter::new()
423 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
424
425 if let Some(ts) = since {
426 f1 = f1.since(ts);
427 f2 = f2.since(ts);
428 f3 = f3.since(ts);
429 }
430
431 filters.push(f1);
432 filters.push(f2);
433 filters.push(f3);
434 }
435
436 filters
437}
438
439/// Layer 3: Events tagging one of our root events
440/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
441/// Batched per 100 event IDs.
442fn tagged_one_of_our_root_event_filters(
443 root_events: &HashSet<EventId>,
444 since: Option<Timestamp>,
445) -> Vec<Filter> {
446 let mut filters = Vec::new();
447 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
448
449 for chunk in event_ids.chunks(100) {
450 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
451
452 // Lowercase 'e' tag - standard event reference
453 let mut f1 = Filter::new()
454 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
455 // Uppercase 'E' tag - some clients use this
456 let mut f2 = Filter::new()
457 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone());
458 // Quote 'q' tag - NIP-10 quote references to events
459 let mut f3 = Filter::new()
460 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
461
462 if let Some(ts) = since {
463 f1 = f1.since(ts);
464 f2 = f2.since(ts);
465 f3 = f3.since(ts);
466 }
467
468 filters.push(f1);
469 filters.push(f2);
470 filters.push(f3);
471 }
472
473 filters
474}
475
476/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
477/// Used by:
478/// - compute_actions for incremental subscriptions
479/// - consolidation rebuilds (Layer 1 remains active)
480fn build_layer2_and_layer3_filters(
481 repos: &HashSet<String>,
482 root_events: &HashSet<EventId>,
483 since: Option<Timestamp>,
484) -> Vec<Filter> {
485 let mut filters = Vec::new();
486 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
487 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
488 filters
489}
490```
491
492**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently.
493
494### 4. handle_add_filters (SyncManager)
495
496```rust
497impl SyncManager {
498 async fn handle_add_filters(&mut self, action: AddFilters) {
499 let AddFilters { relay_url, repos, root_events, filters } = action;
500
501 // Auto-spawn connection if needed
502 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
503 match state {
504 None => {
505 // New relay discovered - create entry and spawn connection
506 self.relay_sync_index.write().await.insert(
507 relay_url.clone(),
508 RelayState {
509 repos: HashSet::new(),
510 root_events: HashSet::new(),
511 is_bootstrap: false,
512 connection_status: ConnectionStatus::Connecting,
513 last_connected: None,
514 disconnected_at: None,
515 connection: None,
516 }
517 );
518 self.spawn_connection(&relay_url).await;
519 return; // Subscriptions will happen on connection success
520 }
521 Some(state) if state.connection_status != ConnectionStatus::Connected => {
522 // Not connected - subscriptions will happen on connection success
523 return;
524 }
525 Some(_) => {
526 // Already connected - proceed with subscription
527 }
528 }
529
530 // Subscribe and collect subscription IDs
531 let conn = self.connections.get(&relay_url).unwrap();
532 let mut sub_ids = HashSet::new();
533
534 for filter in filters {
535 match conn.client.subscribe(filter, None).await {
536 Ok(output) => {
537 for sub_id in output.val {
538 sub_ids.insert(sub_id);
539 }
540 }
541 Err(e) => {
542 tracing::warn!("Failed to subscribe: {}", e);
543 }
544 }
545 }
546
547 // Create pending batch
548 let batch = PendingBatch {
549 batch_id: self.next_batch_id(),
550 items: PendingItems { repos, root_events },
551 outstanding_subs: sub_ids,
552 };
553
554 // Add to pending index
555 self.pending_sync_index.write().await
556 .entry(relay_url)
557 .or_default()
558 .push(batch);
559 }
560}
561```
562
563### 5. handle_disconnect
564
565```rust
566impl SyncManager {
567 /// Called when connection to a relay is lost
568 async fn handle_disconnect(&mut self, relay_url: &str) {
569 let mut index = self.relay_sync_index.write().await;
570
571 if let Some(state) = index.get_mut(relay_url) {
572 state.connection_status = ConnectionStatus::Disconnected;
573 state.disconnected_at = Some(Timestamp::now());
574 state.connection = None;
575 }
576
577 // Clear pending batches - these items were not confirmed
578 self.pending_sync_index.write().await.remove(relay_url);
579
580 // Remove from active connections map
581 self.connections.remove(relay_url);
582
583 // Health tracker records failure for backoff
584 self.health_tracker.record_failure(relay_url);
585 }
586}
587```
588
589### 6. handle_connect_or_reconnect (Unified)
590
591This method handles BOTH initial connection AND reconnection with unified logic:
592
593```rust
594impl SyncManager {
595 /// Called when connection to a relay succeeds - handles both initial connect and reconnect.
596 ///
597 /// Decision tree:
598 /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history
599 /// - Quick reconnect (<15min): since = last_connected - 15min
600 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
601 let mut index = self.relay_sync_index.write().await;
602 let state = match index.get_mut(relay_url) {
603 Some(s) => s,
604 None => return, // Relay was removed while disconnected
605 };
606
607 // Determine if this is a fresh sync or quick reconnect
608 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
609 let last_connected = state.last_connected;
610
611 if is_fresh_sync && state.last_connected.is_some() {
612 // Stale reconnect (>15min) - clear state
613 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
614 state.clear_sync_state();
615 }
616
617 // Update connection state
618 state.connection_status = ConnectionStatus::Connected;
619 state.last_connected = Some(Timestamp::now());
620 state.disconnected_at = None;
621
622 // Record success in health tracker
623 self.health_tracker.record_success(relay_url);
624
625 drop(index); // Release lock
626
627 let conn = match self.connections.get(relay_url) {
628 Some(c) => c,
629 None => return,
630 };
631
632 if is_fresh_sync {
633 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
634
635 // Step 1: Subscribe Layer 1 (announcements) without since
636 let layer1 = build_announcement_filter(None);
637 let _ = conn.client.subscribe(layer1, None).await;
638
639 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build)
640 self.recompute_actions_for_relay(relay_url).await;
641 } else {
642 // Quick reconnect: Layer 1 with since, Layer 2+3 with since
643 let since = last_connected
644 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
645 .unwrap_or(Timestamp::from(0));
646
647 // Step 1: Subscribe Layer 1 (announcements) with since
648 let layer1 = build_announcement_filter(Some(since));
649 let _ = conn.client.subscribe(layer1, None).await;
650
651 // Step 2: Rebuild Layer 2+3 for confirmed items with since
652 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
653
654 // Step 3: Check for NEW items via compute_actions
655 self.recompute_actions_for_relay(relay_url).await;
656 }
657 }
658
659 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1).
660 /// Used by:
661 /// - Quick reconnect: rebuild confirmed items with since filter
662 /// - Consolidation: close and rebuild with since filter
663 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
664 let confirmed = self.relay_sync_index.read().await;
665 let state = match confirmed.get(relay_url) {
666 Some(s) => s,
667 None => return,
668 };
669
670 // Build Layer 2+3 filters WITH since
671 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
672 drop(confirmed);
673
674 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
675 let conn = match self.connections.get(relay_url) {
676 Some(c) => c,
677 None => return,
678 };
679
680 for filter in filters {
681 let _ = conn.client.subscribe(filter, None).await;
682 }
683 }
684
685 /// Rerun compute_actions for a specific relay and process resulting AddFilters.
686 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
687 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
688 let repo_index = self.repo_sync_index.read().await;
689 let targets = derive_relay_targets(&repo_index);
690 drop(repo_index);
691
692 // Filter to just this relay
693 let target = match targets.get(relay_url) {
694 Some(t) => t.clone(),
695 None => return, // No repos reference this relay anymore
696 };
697
698 let pending = self.pending_sync_index.read().await;
699 let confirmed = self.relay_sync_index.read().await;
700
701 let mut single_relay_targets = HashMap::new();
702 single_relay_targets.insert(relay_url.to_string(), target);
703
704 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
705
706 drop(pending);
707 drop(confirmed);
708
709 // Process AddFilters
710 for action in actions {
711 self.handle_add_filters(action).await;
712 }
713 }
714}
715```
716
717### 7. Daily Timer
718
719```rust
720impl SyncManager {
721 async fn run_daily_timer(&self) {
722 loop {
723 // Random 23-25 hours
724 let hours = 23.0 + rand::random::<f64>() * 2.0;
725 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
726
727 let relay_urls: Vec<_> = self.relay_sync_index.read().await
728 .keys()
729 .cloned()
730 .collect();
731
732 for relay_url in relay_urls {
733 self.daily_sync(&relay_url).await;
734 }
735 }
736 }
737
738 /// Perform daily fresh sync for a relay
739 async fn daily_sync(&mut self, relay_url: &str) {
740 tracing::info!("Daily sync triggered for {}", relay_url);
741
742 // Close all subscriptions
743 if let Some(conn) = self.connections.get(relay_url) {
744 conn.client.unsubscribe_all().await;
745 }
746
747 // Clear PendingSyncIndex
748 self.pending_sync_index.write().await.remove(relay_url);
749
750 // Clear confirmed state - triggers fresh sync
751 {
752 let mut index = self.relay_sync_index.write().await;
753 if let Some(state) = index.get_mut(relay_url) {
754 state.clear_sync_state();
755 }
756 }
757
758 // Recompute actions - will generate AddFilters for everything
759 self.recompute_actions_for_relay(relay_url).await;
760 }
761}
762```
763
764### 8. Consolidation (Threshold-Based, Triggered on Add)
765
766Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active.
767
768```rust
769impl SyncManager {
770 /// Check filter count and consolidate if needed.
771 /// Called from handle_add_filters BEFORE adding new filters.
772 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
773 let current_count = self.get_filter_count(relay_url).await;
774
775 if current_count + new_filter_count > 70 {
776 self.consolidate(relay_url).await;
777 }
778 }
779
780 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active.
781 /// Does NOT clear state - just reduces filter count.
782 async fn consolidate(&mut self, relay_url: &str) {
783 tracing::info!("Consolidating filters for {} (count > 70)", relay_url);
784
785 // Wait for all pending batches to complete first
786 self.wait_pending_complete(relay_url).await;
787
788 // Close Layer 2+3 subscriptions only - Layer 1 remains active
789 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately
790 // For simplicity, we close all and re-add Layer 1
791 if let Some(conn) = self.connections.get(relay_url) {
792 conn.client.unsubscribe_all().await;
793 }
794
795 // Re-subscribe Layer 1 with since (maintains announcements stream)
796 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
797 let conn = self.connections.get(relay_url).unwrap();
798 let layer1 = build_announcement_filter(Some(since));
799 let _ = conn.client.subscribe(layer1, None).await;
800
801 // Rebuild Layer 2+3 only
802 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
803 }
804}
805```
806
807**Updated handle_add_filters to check consolidation:**
808
809```rust
810impl SyncManager {
811 async fn handle_add_filters(&mut self, action: AddFilters) {
812 let AddFilters { relay_url, repos, root_events, filters } = action;
813
814 // Auto-spawn connection if needed (unchanged)
815 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
816 match state {
817 None => {
818 // New relay discovered - create entry and spawn connection
819 self.relay_sync_index.write().await.insert(
820 relay_url.clone(),
821 RelayState {
822 repos: HashSet::new(),
823 root_events: HashSet::new(),
824 is_bootstrap: false,
825 connection_status: ConnectionStatus::Connecting,
826 last_connected: None,
827 disconnected_at: None,
828 connection: None,
829 }
830 );
831 self.spawn_connection(&relay_url).await;
832 return; // Subscriptions will happen on connection success
833 }
834 Some(state) if state.connection_status != ConnectionStatus::Connected => {
835 return; // Not connected - subscriptions will happen on connection success
836 }
837 Some(_) => {
838 // Already connected - proceed
839 }
840 }
841
842 // CHECK CONSOLIDATION BEFORE ADDING
843 self.maybe_consolidate(&relay_url, filters.len()).await;
844
845 // Subscribe and collect subscription IDs
846 let conn = self.connections.get(&relay_url).unwrap();
847 let mut sub_ids = HashSet::new();
848
849 for filter in filters {
850 match conn.client.subscribe(filter, None).await {
851 Ok(output) => {
852 for sub_id in output.val {
853 sub_ids.insert(sub_id);
854 }
855 }
856 Err(e) => {
857 tracing::warn!("Failed to subscribe: {}", e);
858 }
859 }
860 }
861
862 // Create pending batch (unchanged)
863 let batch = PendingBatch {
864 batch_id: self.next_batch_id(),
865 items: PendingItems { repos, root_events },
866 outstanding_subs: sub_ids,
867 };
868
869 self.pending_sync_index.write().await
870 .entry(relay_url)
871 .or_default()
872 .push(batch);
873 }
874}
875```
876
877---
878
879## Disconnect (Relay Removal) Handling
880
881```rust
882impl SyncManager {
883 /// Periodically check for relays that should be disconnected
884 async fn check_disconnects(&mut self) {
885 let confirmed = self.relay_sync_index.read().await;
886 let relays_to_disconnect: Vec<_> = confirmed.iter()
887 .filter(|(_, state)| {
888 !state.is_bootstrap &&
889 state.repos.is_empty() &&
890 state.root_events.is_empty()
891 })
892 .map(|(url, _)| url.clone())
893 .collect();
894 drop(confirmed);
895
896 for relay_url in relays_to_disconnect {
897 self.disconnect_relay(&relay_url).await;
898 }
899 }
900
901 async fn disconnect_relay(&mut self, relay_url: &str) {
902 tracing::info!("Disconnecting relay {} (no repos)", relay_url);
903
904 self.relay_sync_index.write().await.remove(relay_url);
905 self.pending_sync_index.write().await.remove(relay_url);
906
907 if let Some(conn) = self.connections.remove(relay_url) {
908 let _ = conn.client.disconnect().await;
909 }
910 }
911}
912```
913
914---
915
916## State Flow Summary
917
918```mermaid
919flowchart TB
920 subgraph Input
921 SS[SelfSubscriber]
922 OWN[Own Relay]
923 end
924
925 subgraph RepoSyncIndex - What We Want
926 RSI[HashMap: Repo to Relays+Events]
927 end
928
929 subgraph Derived Target
930 DT[derive_relay_targets fn]
931 TGT[Per-relay: repos + events we should sync]
932 end
933
934 subgraph compute_actions - Decision Point
935 CA[Three-way diff: target - pending - confirmed]
936 end
937
938 subgraph PendingSyncIndex - In Flight
939 PSI[Vec PendingBatch per relay]
940 end
941
942 subgraph RelaySyncIndex - Confirmed State
943 RLI[RelayState per relay]
944 CONN[connection_status]
945 REPOS[repos + root_events]
946 TIMES[last_connected + disconnected_at]
947 end
948
949 SS -->|subscribe| OWN
950 OWN -->|events| SS
951 SS -->|batch fires| RSI
952 RSI --> DT
953 DT --> TGT
954 TGT --> CA
955 PSI --> CA
956 RLI --> CA
957 CA -->|Layer 2+3 new items| AF[AddFilters]
958 AF -->|check filter count| CONSOL{count + new > 70?}
959 CONSOL -->|yes| CONSOLIDATE[consolidate]
960 CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since]
961 L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since]
962 CONSOL -->|no| SUB[subscribe]
963 AF -->|spawn if needed| CONN
964 SUB --> PSI
965 PSI -->|EOSE| REPOS
966
967 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
968 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
969
970 subgraph handle_connect_or_reconnect
971 HC --> FRESH_CHECK{is_fresh_sync?}
972 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
973 FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since]
974 L1_FRESH --> RCA1[recompute_actions_for_relay]
975 L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since]
976 L23_QUICK --> RCA2[recompute_actions_for_relay]
977 end
978```
979
980---
981
982## Key Design Decisions
983
984| Decision | Choice | Rationale |
985| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- |
986| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
987| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
988| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
989| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
990| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
991| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events |
992| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
993| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
994| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
995| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
996| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream |
997| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect |
998| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
999| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
1000| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
1001| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
1002| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
1003
1004---
1005
1006## Module Structure
1007
1008```
1009src/sync/
1010├── mod.rs # SyncManager, main loop
1011├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
1012├── actions.rs # AddFilters struct, compute_actions, build_filters
1013├── self_subscriber.rs # SelfSubscriber, batching logic
1014├── relay_connection.rs # Per-relay WebSocket connection
1015├── consolidation.rs # Consolidation logic, daily timer
1016├── health.rs # Health tracking (reuse from v2)
1017└── metrics.rs # Prometheus metrics (reuse from v2)
1018```
1019
1020---
1021
1022## Comparison: v3 vs v4
1023
1024| Aspect | v3 | v4 |
1025| ------------------------ | ----------------------------------------- | --------------------------------------------- |
1026| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1027| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1028| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1029| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1030| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1031| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1032| PSI clearing | On disconnect | On disconnect (confirmed) |
1033| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
1034
1035---
1036
1037## Self-Subscriber Flow
1038
1039The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions.
1040
1041### State Tracking
1042
1043```rust
1044pub struct SelfSubscriber {
1045 own_relay_url: String,
1046 relay_domain: String,
1047 repo_sync_index: RepoSyncIndex,
1048 pending_sync_index: PendingSyncIndex,
1049 relay_sync_index: RelaySyncIndex,
1050 action_tx: mpsc::Sender<AddFilters>,
1051 /// Timestamp of last successful connection - used for since filter on reconnection
1052 last_connected: Option<Timestamp>,
1053 /// The active client connection
1054 client: Option<Client>,
1055}
1056```
1057
1058### On Startup / Reconnect (Unified)
1059
1060Both initial startup and reconnection use the same `connect_and_subscribe` method:
1061
1062```rust
1063impl SelfSubscriber {
1064 async fn run(mut self) {
1065 loop {
1066 // Connect or reconnect
1067 if let Err(e) = self.connect_and_subscribe().await {
1068 tracing::warn!("Connection failed: {}, will retry", e);
1069 tokio::time::sleep(Duration::from_secs(5)).await;
1070 continue;
1071 }
1072
1073 // Run event loop until disconnection
1074 self.event_loop().await;
1075
1076 // Loop will retry connection
1077 }
1078 }
1079
1080 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
1081 let client = Client::new(Keys::generate());
1082 client.add_relay(&self.own_relay_url).await?;
1083 client.connect().await;
1084
1085 // Build filter - add since only on reconnect
1086 let filter = Filter::new().kinds([
1087 Kind::Custom(30617), // Repository announcements
1088 Kind::GitPatch, // 1617
1089 Kind::Custom(1618), // PRs
1090 Kind::Custom(1619), // PR updates
1091 Kind::GitIssue, // 1621
1092 ]);
1093
1094 let filter = if let Some(ts) = self.last_connected {
1095 // Reconnection: use since filter
1096 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
1097 filter.since(since)
1098 } else {
1099 // Initial connect: no since filter - get full history
1100 filter
1101 };
1102
1103 // Update last_connected AFTER computing since
1104 self.last_connected = Some(Timestamp::now());
1105
1106 client.subscribe(filter, None).await?;
1107 self.client = Some(client);
1108 Ok(())
1109 }
1110}
1111```
1112
1113### Event Loop with Batching
1114
1115```rust
1116impl SelfSubscriber {
1117 async fn event_loop(&mut self) {
1118 let client = self.client.as_ref().unwrap();
1119 let mut pending_events: Vec<Event> = Vec::new();
1120 let mut batch_timer: Option<Instant> = None;
1121 let batch_window = Duration::from_secs(5);
1122
1123 loop {
1124 let timeout = batch_timer
1125 .map(|t| batch_window.saturating_sub(t.elapsed()))
1126 .unwrap_or(Duration::from_secs(60));
1127
1128 tokio::select! {
1129 notification = client.notifications().recv() => {
1130 match notification {
1131 Ok(RelayPoolNotification::Event { event, .. }) => {
1132 pending_events.push(*event);
1133
1134 // Start timer on first event - does NOT reset
1135 if batch_timer.is_none() {
1136 batch_timer = Some(Instant::now());
1137 }
1138 }
1139 Ok(RelayPoolNotification::Shutdown) => {
1140 // Connection lost
1141 break;
1142 }
1143 _ => {}
1144 }
1145 }
1146 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
1147 // Batch window elapsed
1148 self.process_batch(pending_events.drain(..).collect()).await;
1149 batch_timer = None;
1150 }
1151 }
1152 }
1153 }
1154
1155 async fn process_batch(&self, events: Vec<Event>) {
1156 // 1. Update RepoSyncIndex
1157 for event in events {
1158 match event.kind.as_u16() {
1159 30617 => self.handle_announcement(&event).await,
1160 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
1161 _ => {}
1162 }
1163 }
1164
1165 // 2. Derive targets and compute actions
1166 let repo_index = self.repo_sync_index.read().await;
1167 let targets = derive_relay_targets(&repo_index);
1168
1169 let pending = self.pending_sync_index.read().await;
1170 let confirmed = self.relay_sync_index.read().await;
1171
1172 let actions = compute_actions(&targets, &pending, &confirmed);
1173
1174 drop(repo_index);
1175 drop(pending);
1176 drop(confirmed);
1177
1178 // 3. Send actions to SyncManager
1179 for action in actions {
1180 let _ = self.action_tx.send(action).await;
1181 }
1182 }
1183
1184 async fn handle_announcement(&self, event: &Event) {
1185 // Extract repo_ref from event - 30617:pubkey:identifier
1186 let d_tag = event.tags.iter()
1187 .find_map(|tag| {
1188 if tag.kind() == TagKind::D {
1189 tag.content().map(|s| s.to_string())
1190 } else {
1191 None
1192 }
1193 })
1194 .unwrap_or_default();
1195
1196 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1197
1198 // Extract relay URLs from 'r' tags
1199 let relays: HashSet<String> = event.tags.iter()
1200 .filter_map(|tag| {
1201 if tag.kind() == TagKind::Relay {
1202 tag.content().map(|s| s.to_string())
1203 } else {
1204 None
1205 }
1206 })
1207 .collect();
1208
1209 // Update RepoSyncIndex
1210 let mut index = self.repo_sync_index.write().await;
1211 let needs = index.entry(repo_ref).or_default();
1212 needs.relays = relays;
1213 }
1214
1215 async fn handle_root_event(&self, event: &Event) {
1216 // Extract repo_ref from 'a' tag
1217 let repo_ref = event.tags.iter()
1218 .find_map(|tag| {
1219 if tag.kind() == TagKind::A {
1220 tag.content().map(|s| s.to_string())
1221 } else {
1222 None
1223 }
1224 });
1225
1226 if let Some(repo_ref) = repo_ref {
1227 let mut index = self.repo_sync_index.write().await;
1228 let needs = index.entry(repo_ref).or_default();
1229 needs.root_events.insert(event.id);
1230 }
1231 }
1232}
1233```
1234
1235---
1236
1237## Implementation Notes
1238
1239This section documents the actual implementation details as of December 2024 (Phases 1-10 complete).
1240
1241### Architectural Decisions During Implementation
1242
1243**Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc<Mutex<SyncManager>>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup:
1244
1245```rust
1246// 7. Wrap self in Arc<Mutex> for sharing with timer task
1247let sync_manager = Arc::new(Mutex::new(self));
1248```
1249
1250This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber.
1251
1252**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses:
1253
1254- `DashMap` for thread-safe concurrent access without external locking
1255- Three states: `Healthy`, `Degraded`, `Dead`
1256- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff
1257- Dead threshold: 24 hours of continuous failures
1258- Dead relay retry: Once per 24 hours
1259
1260### Implementation Constants
1261
1262| Constant | Value | Purpose |
1263| --------------------------------- | ---------- | ------------------------------------------------ |
1264| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation |
1265| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation |
1266| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync |
1267| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect |
1268| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead |
1269| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff |
1270
1271### Daily Timer Randomization
1272
1273The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running:
1274
1275```rust
1276let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0;
1277```
1278
1279### Bootstrap Relay Protection
1280
1281Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out:
1282
1283```rust
1284.filter(|(_, state)| {
1285 !state.is_bootstrap &&
1286 state.repos.is_empty() &&
1287 state.root_events.is_empty()
1288})
1289```
1290
1291### Graceful Shutdown
1292
1293Shutdown uses a tokio broadcast channel for coordinated termination:
1294
1295```rust
1296let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
1297```
1298
1299Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop.
1300
1301### Actual Module Structure
1302
1303The implemented module structure differs from the original spec:
1304
1305```
1306src/sync/
1307├── mod.rs # SyncManager, main loop, index types, metrics
1308├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters
1309├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters
1310├── health.rs # RelayHealthTracker, HealthState, exponential backoff
1311├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling
1312└── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic
1313```
1314
1315Key differences from spec:
1316
1317- No separate `state.rs` - types are defined in `mod.rs`
1318- No separate `actions.rs` - moved to `algorithms.rs`
1319- No separate `consolidation.rs` - consolidation logic in `mod.rs`
1320- No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs`
1321
1322### Deviations from Original v4 Spec
1323
13241. **RelayState lacks `connection` field**: The spec showed `connection: Option<RelayConnection>` in `RelayState`, but the implementation stores connections in a separate `HashMap<String, RelayConnection>` in `SyncManager`.
1325
13262. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct.
1327
13283. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches.
1329
13304. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API.