upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync-v4.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v4.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v4.md1229
1 files changed, 1229 insertions, 0 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md
new file mode 100644
index 0000000..aba88a5
--- /dev/null
+++ b/docs/explanation/grasp-02-proactive-sync-v4.md
@@ -0,0 +1,1229 @@
1# GRASP-02: Proactive Sync v4 - Health & Reconnection Design
2
3## Overview
4
5This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles:
6
71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create
93. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions)
104. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch
115. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary
12
13---
14
15## Data Model
16
17### RepoSyncIndex (Source of Truth)
18
19```rust
20/// What we WANT to sync - derived from events received via self-subscription.
21/// Updated immediately when self-subscriber batch fires.
22/// Key: repo addressable ref - 30617:pubkey:identifier
23pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
24
25#[derive(Debug, Clone, Default)]
26pub struct RepoSyncNeeds {
27 /// Relay URLs listed in this repo's 30617 announcement
28 pub relays: HashSet<String>,
29 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
30 pub root_events: HashSet<EventId>,
31}
32```
33
34### RelaySyncIndex (Confirmed State + Connection)
35
36```rust
37/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
38/// Key: relay URL
39pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
40
41/// Connection status for a relay
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ConnectionStatus {
44 /// Not currently connected
45 Disconnected,
46 /// Connection attempt in progress
47 Connecting,
48 /// Successfully connected and subscribed
49 Connected,
50}
51
52/// Complete state for a single relay - combines sync needs with connection lifecycle
53#[derive(Debug)]
54pub struct RelayState {
55 /// Repos we have confirmed syncing from this relay
56 pub repos: HashSet<String>,
57 /// Root events we have confirmed tracking
58 pub root_events: HashSet<EventId>,
59 /// If true, never disconnect this relay
60 pub is_bootstrap: bool,
61 /// Current connection status
62 pub connection_status: ConnectionStatus,
63 /// When we last successfully connected - used for since filter on reconnect
64 pub last_connected: Option<Timestamp>,
65 /// When we disconnected - for 15-minute state retention rule
66 pub disconnected_at: Option<Timestamp>,
67 /// The active connection - None if disconnected
68 pub connection: Option<RelayConnection>,
69}
70
71impl RelayState {
72 /// Check if state should be cleared based on 15-minute rule
73 pub fn should_clear_state(&self) -> bool {
74 match self.disconnected_at {
75 Some(disconnected) => {
76 let now = Timestamp::now();
77 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
78 }
79 None => false, // Still connected or never connected
80 }
81 }
82
83 /// Clear repos and root_events - called when reconnect takes > 15 minutes
84 pub fn clear_sync_state(&mut self) {
85 self.repos.clear();
86 self.root_events.clear();
87 }
88}
89```
90
91### PendingSyncIndex (In-Flight Batches)
92
93```rust
94/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
95/// Each batch has its own ID and can confirm independently.
96/// Key: relay URL
97pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
98
99#[derive(Debug, Clone)]
100pub struct PendingBatch {
101 /// Unique ID for this batch - for debugging/logging
102 pub batch_id: u64,
103 /// The items this batch is syncing
104 pub items: PendingItems,
105 /// Subscription IDs that must ALL receive EOSE before confirming
106 pub outstanding_subs: HashSet<SubscriptionId>,
107}
108
109#[derive(Debug, Clone, Default)]
110pub struct PendingItems {
111 pub repos: HashSet<String>,
112 pub root_events: HashSet<EventId>,
113}
114```
115
116---
117
118## Connection Lifecycle State Machine
119
120```mermaid
121stateDiagram-v2
122 [*] --> Disconnected: discover relay via RepoSyncIndex
123 Disconnected --> Connecting: AddFilters triggers spawn_connection
124 Connecting --> Connected: success
125 Connecting --> Disconnected: failure + record in health tracker
126 Connected --> Disconnected: connection lost
127 Connected --> [*]: intentional disconnect via check_disconnects
128
129 note right of Disconnected: disconnected_at set for 15min rule
130 note right of Connected: last_connected tracked for since filter
131```
132
133---
134
135## Flow Scenarios
136
137### Scenario 1: Initial Connect via handle_connect_or_reconnect
138
139```mermaid
140flowchart TB
141 START[Startup] --> SS[Self-subscribe to own relay]
142 SS --> |no since filter| EVENTS[Receive historical events]
143 EVENTS --> RSI[Update RepoSyncIndex]
144 RSI --> DT[derive_relay_targets]
145 DT --> CA[compute_actions with targets and empty confirmed]
146 CA --> AF[AddFilters for each relay]
147 AF --> SPAWN{Relay connected?}
148 SPAWN --> |no| CONN[spawn_connection]
149 CONN --> HC[handle_connect_or_reconnect]
150 SPAWN --> |yes| SUB
151
152 subgraph handle_connect_or_reconnect - Fresh Sync
153 HC --> CHECK_FRESH{is_fresh_sync?}
154 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since]
155 L1 --> RCA[recompute_actions_for_relay]
156 end
157
158 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters]
159 SUB --> PB[Create PendingBatch]
160 PB --> EOSE[Wait for EOSE]
161 EOSE --> CONFIRM[Move items to confirmed repos/root_events]
162```
163
164**Key points:**
165- No `since` filter on initial connect - get full history
166- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()`
167- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
168- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking
169
170### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes
171
172```mermaid
173flowchart TB
174 DISC[Connection lost] --> MARK[Set disconnected_at = now]
175 MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay]
176 CLEAR_PEND --> WAIT[Wait for reconnection]
177 WAIT --> RECONN[Connection restored]
178 RECONN --> HC[handle_connect_or_reconnect]
179
180 subgraph handle_connect_or_reconnect - Quick Reconnect
181 HC --> CHECK{is_fresh_sync?}
182 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min]
183 SINCE --> L1[build_announcement_filter - with since]
184 L1 --> L23[rebuild_layer2_and_layer3 - with since]
185 L23 --> RCA[recompute_actions_for_relay]
186 end
187
188 RCA --> AF[AddFilters for new items only]
189 AF --> SUB[Subscribe]
190 SUB --> PB[Create PendingBatch]
191 PB --> EOSE[Wait for EOSE]
192 EOSE --> EXTEND[Extend confirmed state]
193```
194
195**Key points:**
196- PendingSyncIndex cleared on disconnect (not reconnect)
197- `handle_connect_or_reconnect`:
198 1. `build_announcement_filter(Some(since))` - Layer 1 with since
199 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
200 3. `recompute_actions_for_relay` - check for new items
201- since = last_connected - 15min ensures we catch events during disconnection
202
203### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes
204
205```mermaid
206flowchart TB
207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect]
208
209 subgraph handle_connect_or_reconnect - Stale Reconnect
210 HC --> CHECK{is_fresh_sync?}
211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state]
212 CLEAR --> L1[build_announcement_filter - no since]
213 L1 --> RCA[recompute_actions_for_relay]
214 end
215
216 RCA --> CA[compute_actions with empty confirmed]
217 CA --> AF[AddFilters for everything]
218 AF --> SUB[Subscribe - no since filter]
219 SUB --> PB[Create PendingBatch]
220 PB --> EOSE[Wait for EOSE]
221 EOSE --> CONFIRM[Populate confirmed state fresh]
222```
223
224**Key points:**
225- `should_clear_state()` returns true → triggers fresh sync
226- Same path as initial connect after clearing state
227- Layer 1: `build_announcement_filter(None)` - full history
228- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything
229
230### Scenario 4: Consolidation - Triggered on Filter Add
231
232```mermaid
233flowchart TB
234 AF[handle_add_filters called] --> COUNT{current + new > 70?}
235 COUNT --> |yes| CONSOLIDATE[consolidate]
236 CONSOLIDATE --> WAIT_PEND[wait_pending_complete]
237 WAIT_PEND --> CLOSE[unsubscribe_all]
238 CLOSE --> SINCE[since = now - 15min]
239 SINCE --> L1[build_announcement_filter - with since]
240 L1 --> L23[rebuild_layer2_and_layer3 - with since]
241 COUNT --> |no| SUB[Subscribe new filters]
242 SUB --> PB[Create PendingBatch]
243```
244
245**Key points:**
246- Consolidation checked in `handle_add_filters` BEFORE adding new filters
247- After closing all subscriptions, re-subscribe:
248 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
249 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
250- `since = now - 15min` prevents re-fetching old events
251- Keeps confirmed state, just reduces filter count
252
253### Scenario 5: Daily Timer - 23 to 25h Random
254
255```mermaid
256flowchart TB
257 DAILY[Daily timer fires] --> CLOSE[unsubscribe_all]
258 CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay]
259 CLEAR_PEND --> CLEAR_STATE[clear_sync_state]
260 CLEAR_STATE --> L1[build_announcement_filter - no since]
261 L1 --> RCA[recompute_actions_for_relay]
262 RCA --> CA[compute_actions with empty confirmed]
263 CA --> AF[AddFilters for everything]
264 AF --> SUB[Subscribe - no since filter]
265 SUB --> PB[Create PendingBatch]
266 PB --> EOSE[Wait for EOSE]
267 EOSE --> CONFIRM[Repopulate confirmed state]
268```
269
270**Key points:**
271- Daily timer is a full fresh sync, NOT consolidation
272- Clears both PendingSyncIndex and confirmed state
273- Layer 1: `build_announcement_filter(None)` - full history
274- Layer 2+3: via compute_actions with empty confirmed - full history
275- Detects any state drift accumulated over 24 hours
276
277---
278
279## Core Algorithms
280
281### 1. derive_relay_targets
282
283Transform RepoSyncIndex into per-relay sync targets:
284
285```rust
286/// Inverts RepoSyncIndex to get per-relay view
287fn derive_relay_targets(
288 repo_index: &HashMap<String, RepoSyncNeeds>
289) -> HashMap<String, RelaySyncNeeds> {
290 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
291
292 for (repo_ref, needs) in repo_index {
293 for relay_url in &needs.relays {
294 let target = targets.entry(relay_url.clone()).or_default();
295 target.repos.insert(repo_ref.clone());
296 target.root_events.extend(needs.root_events.iter().cloned());
297 }
298 }
299
300 targets
301}
302```
303
304### 2. compute_actions (Three-Way Diff)
305
306**This is the ONLY decision point for what NEW subscriptions to create.**
307
308```rust
309/// Computes AddFilters for items that are:
310/// - In targets (what we want)
311/// - NOT in pending (already in-flight)
312/// - NOT in confirmed (already confirmed)
313fn compute_actions(
314 targets: &HashMap<String, RelaySyncNeeds>,
315 pending: &HashMap<String, Vec<PendingBatch>>,
316 confirmed: &HashMap<String, RelayState>,
317) -> Vec<AddFilters> {
318 let mut actions = Vec::new();
319
320 for (relay_url, target) in targets {
321 // Skip disconnected relays - they will get AddFilters on reconnect
322 if let Some(state) = confirmed.get(relay_url) {
323 if state.connection_status != ConnectionStatus::Connected {
324 continue;
325 }
326 }
327
328 // Collect all pending items for this relay
329 let pending_repos: HashSet<_> = pending.get(relay_url)
330 .map(|batches| batches.iter()
331 .flat_map(|b| b.items.repos.iter().cloned())
332 .collect())
333 .unwrap_or_default();
334 let pending_events: HashSet<_> = pending.get(relay_url)
335 .map(|batches| batches.iter()
336 .flat_map(|b| b.items.root_events.iter().cloned())
337 .collect())
338 .unwrap_or_default();
339
340 // Collect confirmed items for this relay
341 let confirmed_repos = confirmed.get(relay_url)
342 .map(|c| &c.repos)
343 .unwrap_or(&HashSet::new());
344 let confirmed_events = confirmed.get(relay_url)
345 .map(|c| &c.root_events)
346 .unwrap_or(&HashSet::new());
347
348 // New = target - pending - confirmed
349 let new_repos: HashSet<_> = target.repos.iter()
350 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
351 .cloned()
352 .collect();
353 let new_events: HashSet<_> = target.root_events.iter()
354 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
355 .cloned()
356 .collect();
357
358 if !new_repos.is_empty() || !new_events.is_empty() {
359 let filters = build_filters(&new_repos, &new_events);
360 actions.push(AddFilters {
361 relay_url: relay_url.clone(),
362 repos: new_repos,
363 root_events: new_events,
364 filters,
365 });
366 }
367 }
368
369 actions
370}
371```
372
373### 3. Filter Building Functions (Three-Layer Strategy)
374
375The filter strategy uses three layers:
376- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
377- **Layer 2**: Events tagging our repos
378- **Layer 3**: Events tagging our root events
379
380**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
381
382```rust
383/// Layer 1: Announcements filter (kinds 30617 + 30618)
384/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
385/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
386fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
387 let filter = Filter::new().kinds([
388 Kind::Custom(30617), // Repository announcements
389 Kind::Custom(30618), // Maintainer lists
390 ]);
391
392 match since {
393 Some(ts) => filter.since(ts),
394 None => filter,
395 }
396}
397
398/// Layer 2: Events tagging one of our repos
399/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
400/// Batched per 100 repo refs.
401fn tagged_one_of_our_repo_event_filters(
402 repos: &HashSet<String>,
403 since: Option<Timestamp>,
404) -> Vec<Filter> {
405 let mut filters = Vec::new();
406 let repo_refs: Vec<_> = repos.iter().collect();
407
408 for chunk in repo_refs.chunks(100) {
409 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
410
411 // Lowercase 'a' tag - standard addressable reference
412 let mut f1 = Filter::new()
413 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
414 // Uppercase 'A' tag - some clients use this
415 let mut f2 = Filter::new()
416 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone());
417 // Quote 'q' tag - NIP-10 quote references to addressable events
418 let mut f3 = Filter::new()
419 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
420
421 if let Some(ts) = since {
422 f1 = f1.since(ts);
423 f2 = f2.since(ts);
424 f3 = f3.since(ts);
425 }
426
427 filters.push(f1);
428 filters.push(f2);
429 filters.push(f3);
430 }
431
432 filters
433}
434
435/// Layer 3: Events tagging one of our root events
436/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
437/// Batched per 100 event IDs.
438fn tagged_one_of_our_root_event_filters(
439 root_events: &HashSet<EventId>,
440 since: Option<Timestamp>,
441) -> Vec<Filter> {
442 let mut filters = Vec::new();
443 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
444
445 for chunk in event_ids.chunks(100) {
446 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
447
448 // Lowercase 'e' tag - standard event reference
449 let mut f1 = Filter::new()
450 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
451 // Uppercase 'E' tag - some clients use this
452 let mut f2 = Filter::new()
453 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone());
454 // Quote 'q' tag - NIP-10 quote references to events
455 let mut f3 = Filter::new()
456 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
457
458 if let Some(ts) = since {
459 f1 = f1.since(ts);
460 f2 = f2.since(ts);
461 f3 = f3.since(ts);
462 }
463
464 filters.push(f1);
465 filters.push(f2);
466 filters.push(f3);
467 }
468
469 filters
470}
471
472/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
473/// Used by:
474/// - compute_actions for incremental subscriptions
475/// - consolidation rebuilds (Layer 1 remains active)
476fn build_layer2_and_layer3_filters(
477 repos: &HashSet<String>,
478 root_events: &HashSet<EventId>,
479 since: Option<Timestamp>,
480) -> Vec<Filter> {
481 let mut filters = Vec::new();
482 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
483 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
484 filters
485}
486```
487
488**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently.
489
490### 4. handle_add_filters (SyncManager)
491
492```rust
493impl SyncManager {
494 async fn handle_add_filters(&mut self, action: AddFilters) {
495 let AddFilters { relay_url, repos, root_events, filters } = action;
496
497 // Auto-spawn connection if needed
498 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
499 match state {
500 None => {
501 // New relay discovered - create entry and spawn connection
502 self.relay_sync_index.write().await.insert(
503 relay_url.clone(),
504 RelayState {
505 repos: HashSet::new(),
506 root_events: HashSet::new(),
507 is_bootstrap: false,
508 connection_status: ConnectionStatus::Connecting,
509 last_connected: None,
510 disconnected_at: None,
511 connection: None,
512 }
513 );
514 self.spawn_connection(&relay_url).await;
515 return; // Subscriptions will happen on connection success
516 }
517 Some(state) if state.connection_status != ConnectionStatus::Connected => {
518 // Not connected - subscriptions will happen on connection success
519 return;
520 }
521 Some(_) => {
522 // Already connected - proceed with subscription
523 }
524 }
525
526 // Subscribe and collect subscription IDs
527 let conn = self.connections.get(&relay_url).unwrap();
528 let mut sub_ids = HashSet::new();
529
530 for filter in filters {
531 match conn.client.subscribe(filter, None).await {
532 Ok(output) => {
533 for sub_id in output.val {
534 sub_ids.insert(sub_id);
535 }
536 }
537 Err(e) => {
538 tracing::warn!("Failed to subscribe: {}", e);
539 }
540 }
541 }
542
543 // Create pending batch
544 let batch = PendingBatch {
545 batch_id: self.next_batch_id(),
546 items: PendingItems { repos, root_events },
547 outstanding_subs: sub_ids,
548 };
549
550 // Add to pending index
551 self.pending_sync_index.write().await
552 .entry(relay_url)
553 .or_default()
554 .push(batch);
555 }
556}
557```
558
559### 5. handle_disconnect
560
561```rust
562impl SyncManager {
563 /// Called when connection to a relay is lost
564 async fn handle_disconnect(&mut self, relay_url: &str) {
565 let mut index = self.relay_sync_index.write().await;
566
567 if let Some(state) = index.get_mut(relay_url) {
568 state.connection_status = ConnectionStatus::Disconnected;
569 state.disconnected_at = Some(Timestamp::now());
570 state.connection = None;
571 }
572
573 // Clear pending batches - these items were not confirmed
574 self.pending_sync_index.write().await.remove(relay_url);
575
576 // Remove from active connections map
577 self.connections.remove(relay_url);
578
579 // Health tracker records failure for backoff
580 self.health_tracker.record_failure(relay_url);
581 }
582}
583```
584
585### 6. handle_connect_or_reconnect (Unified)
586
587This method handles BOTH initial connection AND reconnection with unified logic:
588
589```rust
590impl SyncManager {
591 /// Called when connection to a relay succeeds - handles both initial connect and reconnect.
592 ///
593 /// Decision tree:
594 /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history
595 /// - Quick reconnect (<15min): since = last_connected - 15min
596 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
597 let mut index = self.relay_sync_index.write().await;
598 let state = match index.get_mut(relay_url) {
599 Some(s) => s,
600 None => return, // Relay was removed while disconnected
601 };
602
603 // Determine if this is a fresh sync or quick reconnect
604 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
605 let last_connected = state.last_connected;
606
607 if is_fresh_sync && state.last_connected.is_some() {
608 // Stale reconnect (>15min) - clear state
609 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
610 state.clear_sync_state();
611 }
612
613 // Update connection state
614 state.connection_status = ConnectionStatus::Connected;
615 state.last_connected = Some(Timestamp::now());
616 state.disconnected_at = None;
617
618 // Record success in health tracker
619 self.health_tracker.record_success(relay_url);
620
621 drop(index); // Release lock
622
623 let conn = match self.connections.get(relay_url) {
624 Some(c) => c,
625 None => return,
626 };
627
628 if is_fresh_sync {
629 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
630
631 // Step 1: Subscribe Layer 1 (announcements) without since
632 let layer1 = build_announcement_filter(None);
633 let _ = conn.client.subscribe(layer1, None).await;
634
635 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build)
636 self.recompute_actions_for_relay(relay_url).await;
637 } else {
638 // Quick reconnect: Layer 1 with since, Layer 2+3 with since
639 let since = last_connected
640 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
641 .unwrap_or(Timestamp::from(0));
642
643 // Step 1: Subscribe Layer 1 (announcements) with since
644 let layer1 = build_announcement_filter(Some(since));
645 let _ = conn.client.subscribe(layer1, None).await;
646
647 // Step 2: Rebuild Layer 2+3 for confirmed items with since
648 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
649
650 // Step 3: Check for NEW items via compute_actions
651 self.recompute_actions_for_relay(relay_url).await;
652 }
653 }
654
655 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1).
656 /// Used by:
657 /// - Quick reconnect: rebuild confirmed items with since filter
658 /// - Consolidation: close and rebuild with since filter
659 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
660 let confirmed = self.relay_sync_index.read().await;
661 let state = match confirmed.get(relay_url) {
662 Some(s) => s,
663 None => return,
664 };
665
666 // Build Layer 2+3 filters WITH since
667 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
668 drop(confirmed);
669
670 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
671 let conn = match self.connections.get(relay_url) {
672 Some(c) => c,
673 None => return,
674 };
675
676 for filter in filters {
677 let _ = conn.client.subscribe(filter, None).await;
678 }
679 }
680
681 /// Rerun compute_actions for a specific relay and process resulting AddFilters.
682 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
683 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
684 let repo_index = self.repo_sync_index.read().await;
685 let targets = derive_relay_targets(&repo_index);
686 drop(repo_index);
687
688 // Filter to just this relay
689 let target = match targets.get(relay_url) {
690 Some(t) => t.clone(),
691 None => return, // No repos reference this relay anymore
692 };
693
694 let pending = self.pending_sync_index.read().await;
695 let confirmed = self.relay_sync_index.read().await;
696
697 let mut single_relay_targets = HashMap::new();
698 single_relay_targets.insert(relay_url.to_string(), target);
699
700 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
701
702 drop(pending);
703 drop(confirmed);
704
705 // Process AddFilters
706 for action in actions {
707 self.handle_add_filters(action).await;
708 }
709 }
710}
711```
712
713### 7. Daily Timer
714
715```rust
716impl SyncManager {
717 async fn run_daily_timer(&self) {
718 loop {
719 // Random 23-25 hours
720 let hours = 23.0 + rand::random::<f64>() * 2.0;
721 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
722
723 let relay_urls: Vec<_> = self.relay_sync_index.read().await
724 .keys()
725 .cloned()
726 .collect();
727
728 for relay_url in relay_urls {
729 self.daily_sync(&relay_url).await;
730 }
731 }
732 }
733
734 /// Perform daily fresh sync for a relay
735 async fn daily_sync(&mut self, relay_url: &str) {
736 tracing::info!("Daily sync triggered for {}", relay_url);
737
738 // Close all subscriptions
739 if let Some(conn) = self.connections.get(relay_url) {
740 conn.client.unsubscribe_all().await;
741 }
742
743 // Clear PendingSyncIndex
744 self.pending_sync_index.write().await.remove(relay_url);
745
746 // Clear confirmed state - triggers fresh sync
747 {
748 let mut index = self.relay_sync_index.write().await;
749 if let Some(state) = index.get_mut(relay_url) {
750 state.clear_sync_state();
751 }
752 }
753
754 // Recompute actions - will generate AddFilters for everything
755 self.recompute_actions_for_relay(relay_url).await;
756 }
757}
758```
759
760### 8. Consolidation (Threshold-Based, Triggered on Add)
761
762Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active.
763
764```rust
765impl SyncManager {
766 /// Check filter count and consolidate if needed.
767 /// Called from handle_add_filters BEFORE adding new filters.
768 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
769 let current_count = self.get_filter_count(relay_url).await;
770
771 if current_count + new_filter_count > 70 {
772 self.consolidate(relay_url).await;
773 }
774 }
775
776 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active.
777 /// Does NOT clear state - just reduces filter count.
778 async fn consolidate(&mut self, relay_url: &str) {
779 tracing::info!("Consolidating filters for {} (count > 70)", relay_url);
780
781 // Wait for all pending batches to complete first
782 self.wait_pending_complete(relay_url).await;
783
784 // Close Layer 2+3 subscriptions only - Layer 1 remains active
785 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately
786 // For simplicity, we close all and re-add Layer 1
787 if let Some(conn) = self.connections.get(relay_url) {
788 conn.client.unsubscribe_all().await;
789 }
790
791 // Re-subscribe Layer 1 with since (maintains announcements stream)
792 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
793 let conn = self.connections.get(relay_url).unwrap();
794 let layer1 = build_announcement_filter(Some(since));
795 let _ = conn.client.subscribe(layer1, None).await;
796
797 // Rebuild Layer 2+3 only
798 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
799 }
800}
801```
802
803**Updated handle_add_filters to check consolidation:**
804
805```rust
806impl SyncManager {
807 async fn handle_add_filters(&mut self, action: AddFilters) {
808 let AddFilters { relay_url, repos, root_events, filters } = action;
809
810 // Auto-spawn connection if needed (unchanged)
811 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
812 match state {
813 None => {
814 // New relay discovered - create entry and spawn connection
815 self.relay_sync_index.write().await.insert(
816 relay_url.clone(),
817 RelayState {
818 repos: HashSet::new(),
819 root_events: HashSet::new(),
820 is_bootstrap: false,
821 connection_status: ConnectionStatus::Connecting,
822 last_connected: None,
823 disconnected_at: None,
824 connection: None,
825 }
826 );
827 self.spawn_connection(&relay_url).await;
828 return; // Subscriptions will happen on connection success
829 }
830 Some(state) if state.connection_status != ConnectionStatus::Connected => {
831 return; // Not connected - subscriptions will happen on connection success
832 }
833 Some(_) => {
834 // Already connected - proceed
835 }
836 }
837
838 // CHECK CONSOLIDATION BEFORE ADDING
839 self.maybe_consolidate(&relay_url, filters.len()).await;
840
841 // Subscribe and collect subscription IDs
842 let conn = self.connections.get(&relay_url).unwrap();
843 let mut sub_ids = HashSet::new();
844
845 for filter in filters {
846 match conn.client.subscribe(filter, None).await {
847 Ok(output) => {
848 for sub_id in output.val {
849 sub_ids.insert(sub_id);
850 }
851 }
852 Err(e) => {
853 tracing::warn!("Failed to subscribe: {}", e);
854 }
855 }
856 }
857
858 // Create pending batch (unchanged)
859 let batch = PendingBatch {
860 batch_id: self.next_batch_id(),
861 items: PendingItems { repos, root_events },
862 outstanding_subs: sub_ids,
863 };
864
865 self.pending_sync_index.write().await
866 .entry(relay_url)
867 .or_default()
868 .push(batch);
869 }
870}
871```
872
873---
874
875## Disconnect (Relay Removal) Handling
876
877```rust
878impl SyncManager {
879 /// Periodically check for relays that should be disconnected
880 async fn check_disconnects(&mut self) {
881 let confirmed = self.relay_sync_index.read().await;
882 let relays_to_disconnect: Vec<_> = confirmed.iter()
883 .filter(|(_, state)| {
884 !state.is_bootstrap &&
885 state.repos.is_empty() &&
886 state.root_events.is_empty()
887 })
888 .map(|(url, _)| url.clone())
889 .collect();
890 drop(confirmed);
891
892 for relay_url in relays_to_disconnect {
893 self.disconnect_relay(&relay_url).await;
894 }
895 }
896
897 async fn disconnect_relay(&mut self, relay_url: &str) {
898 tracing::info!("Disconnecting relay {} (no repos)", relay_url);
899
900 self.relay_sync_index.write().await.remove(relay_url);
901 self.pending_sync_index.write().await.remove(relay_url);
902
903 if let Some(conn) = self.connections.remove(relay_url) {
904 let _ = conn.client.disconnect().await;
905 }
906 }
907}
908```
909
910---
911
912## State Flow Summary
913
914```mermaid
915flowchart TB
916 subgraph Input
917 SS[SelfSubscriber]
918 OWN[Own Relay]
919 end
920
921 subgraph RepoSyncIndex - What We Want
922 RSI[HashMap: Repo to Relays+Events]
923 end
924
925 subgraph Derived Target
926 DT[derive_relay_targets fn]
927 TGT[Per-relay: repos + events we should sync]
928 end
929
930 subgraph compute_actions - Decision Point
931 CA[Three-way diff: target - pending - confirmed]
932 end
933
934 subgraph PendingSyncIndex - In Flight
935 PSI[Vec PendingBatch per relay]
936 end
937
938 subgraph RelaySyncIndex - Confirmed State
939 RLI[RelayState per relay]
940 CONN[connection_status]
941 REPOS[repos + root_events]
942 TIMES[last_connected + disconnected_at]
943 end
944
945 SS -->|subscribe| OWN
946 OWN -->|events| SS
947 SS -->|batch fires| RSI
948 RSI --> DT
949 DT --> TGT
950 TGT --> CA
951 PSI --> CA
952 RLI --> CA
953 CA -->|Layer 2+3 new items| AF[AddFilters]
954 AF -->|check filter count| CONSOL{count + new > 70?}
955 CONSOL -->|yes| CONSOLIDATE[consolidate]
956 CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since]
957 L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since]
958 CONSOL -->|no| SUB[subscribe]
959 AF -->|spawn if needed| CONN
960 SUB --> PSI
961 PSI -->|EOSE| REPOS
962
963 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
964 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
965
966 subgraph handle_connect_or_reconnect
967 HC --> FRESH_CHECK{is_fresh_sync?}
968 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
969 FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since]
970 L1_FRESH --> RCA1[recompute_actions_for_relay]
971 L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since]
972 L23_QUICK --> RCA2[recompute_actions_for_relay]
973 end
974```
975
976---
977
978## Key Design Decisions
979
980| Decision | Choice | Rationale |
981|----------|--------|-----------|
982| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
983| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
984| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
985| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
986| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
987| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events |
988| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
989| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
990| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
991| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
992| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream |
993| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect |
994| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
995| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
996| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
997| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
998| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
999
1000---
1001
1002## Module Structure
1003
1004```
1005src/sync/
1006├── mod.rs # SyncManager, main loop
1007├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
1008├── actions.rs # AddFilters struct, compute_actions, build_filters
1009├── self_subscriber.rs # SelfSubscriber, batching logic
1010├── relay_connection.rs # Per-relay WebSocket connection
1011├── consolidation.rs # Consolidation logic, daily timer
1012├── health.rs # Health tracking (reuse from v2)
1013└── metrics.rs # Prometheus metrics (reuse from v2)
1014```
1015
1016---
1017
1018## Comparison: v3 vs v4
1019
1020| Aspect | v3 | v4 |
1021|--------|----|----|
1022| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1023| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1024| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1025| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1026| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1027| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1028| PSI clearing | On disconnect | On disconnect (confirmed) |
1029| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
1030
1031---
1032
1033## Self-Subscriber Flow
1034
1035The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions.
1036
1037### State Tracking
1038
1039```rust
1040pub struct SelfSubscriber {
1041 own_relay_url: String,
1042 relay_domain: String,
1043 repo_sync_index: RepoSyncIndex,
1044 pending_sync_index: PendingSyncIndex,
1045 relay_sync_index: RelaySyncIndex,
1046 action_tx: mpsc::Sender<AddFilters>,
1047 /// Timestamp of last successful connection - used for since filter on reconnection
1048 last_connected: Option<Timestamp>,
1049 /// The active client connection
1050 client: Option<Client>,
1051}
1052```
1053
1054### On Startup / Reconnect (Unified)
1055
1056Both initial startup and reconnection use the same `connect_and_subscribe` method:
1057
1058```rust
1059impl SelfSubscriber {
1060 async fn run(mut self) {
1061 loop {
1062 // Connect or reconnect
1063 if let Err(e) = self.connect_and_subscribe().await {
1064 tracing::warn!("Connection failed: {}, will retry", e);
1065 tokio::time::sleep(Duration::from_secs(5)).await;
1066 continue;
1067 }
1068
1069 // Run event loop until disconnection
1070 self.event_loop().await;
1071
1072 // Loop will retry connection
1073 }
1074 }
1075
1076 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
1077 let client = Client::new(Keys::generate());
1078 client.add_relay(&self.own_relay_url).await?;
1079 client.connect().await;
1080
1081 // Build filter - add since only on reconnect
1082 let filter = Filter::new().kinds([
1083 Kind::Custom(30617), // Repository announcements
1084 Kind::GitPatch, // 1617
1085 Kind::Custom(1618), // PRs
1086 Kind::Custom(1619), // PR updates
1087 Kind::GitIssue, // 1621
1088 ]);
1089
1090 let filter = if let Some(ts) = self.last_connected {
1091 // Reconnection: use since filter
1092 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
1093 filter.since(since)
1094 } else {
1095 // Initial connect: no since filter - get full history
1096 filter
1097 };
1098
1099 // Update last_connected AFTER computing since
1100 self.last_connected = Some(Timestamp::now());
1101
1102 client.subscribe(filter, None).await?;
1103 self.client = Some(client);
1104 Ok(())
1105 }
1106}
1107```
1108
1109### Event Loop with Batching
1110
1111```rust
1112impl SelfSubscriber {
1113 async fn event_loop(&mut self) {
1114 let client = self.client.as_ref().unwrap();
1115 let mut pending_events: Vec<Event> = Vec::new();
1116 let mut batch_timer: Option<Instant> = None;
1117 let batch_window = Duration::from_secs(5);
1118
1119 loop {
1120 let timeout = batch_timer
1121 .map(|t| batch_window.saturating_sub(t.elapsed()))
1122 .unwrap_or(Duration::from_secs(60));
1123
1124 tokio::select! {
1125 notification = client.notifications().recv() => {
1126 match notification {
1127 Ok(RelayPoolNotification::Event { event, .. }) => {
1128 pending_events.push(*event);
1129
1130 // Start timer on first event - does NOT reset
1131 if batch_timer.is_none() {
1132 batch_timer = Some(Instant::now());
1133 }
1134 }
1135 Ok(RelayPoolNotification::Shutdown) => {
1136 // Connection lost
1137 break;
1138 }
1139 _ => {}
1140 }
1141 }
1142 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
1143 // Batch window elapsed
1144 self.process_batch(pending_events.drain(..).collect()).await;
1145 batch_timer = None;
1146 }
1147 }
1148 }
1149 }
1150
1151 async fn process_batch(&self, events: Vec<Event>) {
1152 // 1. Update RepoSyncIndex
1153 for event in events {
1154 match event.kind.as_u16() {
1155 30617 => self.handle_announcement(&event).await,
1156 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
1157 _ => {}
1158 }
1159 }
1160
1161 // 2. Derive targets and compute actions
1162 let repo_index = self.repo_sync_index.read().await;
1163 let targets = derive_relay_targets(&repo_index);
1164
1165 let pending = self.pending_sync_index.read().await;
1166 let confirmed = self.relay_sync_index.read().await;
1167
1168 let actions = compute_actions(&targets, &pending, &confirmed);
1169
1170 drop(repo_index);
1171 drop(pending);
1172 drop(confirmed);
1173
1174 // 3. Send actions to SyncManager
1175 for action in actions {
1176 let _ = self.action_tx.send(action).await;
1177 }
1178 }
1179
1180 async fn handle_announcement(&self, event: &Event) {
1181 // Extract repo_ref from event - 30617:pubkey:identifier
1182 let d_tag = event.tags.iter()
1183 .find_map(|tag| {
1184 if tag.kind() == TagKind::D {
1185 tag.content().map(|s| s.to_string())
1186 } else {
1187 None
1188 }
1189 })
1190 .unwrap_or_default();
1191
1192 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1193
1194 // Extract relay URLs from 'r' tags
1195 let relays: HashSet<String> = event.tags.iter()
1196 .filter_map(|tag| {
1197 if tag.kind() == TagKind::Relay {
1198 tag.content().map(|s| s.to_string())
1199 } else {
1200 None
1201 }
1202 })
1203 .collect();
1204
1205 // Update RepoSyncIndex
1206 let mut index = self.repo_sync_index.write().await;
1207 let needs = index.entry(repo_ref).or_default();
1208 needs.relays = relays;
1209 }
1210
1211 async fn handle_root_event(&self, event: &Event) {
1212 // Extract repo_ref from 'a' tag
1213 let repo_ref = event.tags.iter()
1214 .find_map(|tag| {
1215 if tag.kind() == TagKind::A {
1216 tag.content().map(|s| s.to_string())
1217 } else {
1218 None
1219 }
1220 });
1221
1222 if let Some(repo_ref) = repo_ref {
1223 let mut index = self.repo_sync_index.write().await;
1224 let needs = index.entry(repo_ref).or_default();
1225 needs.root_events.insert(event.id);
1226 }
1227 }
1228}
1229``` \ No newline at end of file