upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 09:07:54 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 09:07:54 +0000
commit2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b (patch)
tree013aaa9fd64fd2ef4d1a1c47a7fc348f0e6a92de /docs
parent7e68b71558c8f6d3f2aa1d3bf18e77eec335343d (diff)
improve sync design
Diffstat (limited to 'docs')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v3.md871
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v4.md1229
-rw-r--r--docs/explanation/state-structure-redesign-proposal.md373
3 files changed, 2473 insertions, 0 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v3.md b/docs/explanation/grasp-02-proactive-sync-v3.md
new file mode 100644
index 0000000..30b3102
--- /dev/null
+++ b/docs/explanation/grasp-02-proactive-sync-v3.md
@@ -0,0 +1,871 @@
1# GRASP-02: Proactive Sync v3 - Event-Driven Design
2
3## Overview
4
5This document presents v3 of the proactive sync design. Key principles:
6
71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **Batch-based pending tracking** - Each batch confirms independently
93. **Single action type** - AddFilters only, auto-spawn connections
104. **Three-way state model** - RepoSyncIndex (want) → PendingSyncIndex (in-flight) → RelaySyncIndex (confirmed)
11
12---
13
14## Data Model
15
16### RepoSyncIndex (Source of Truth)
17
18```rust
19/// What we WANT to sync - derived from events received via self-subscription.
20/// Updated immediately when self-subscriber batch fires.
21/// Key: repo addressable ref ("30617:pubkey:identifier")
22pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
23
24#[derive(Debug, Clone, Default)]
25pub struct RepoSyncNeeds {
26 /// Relay URLs listed in this repo's 30617 announcement
27 pub relays: HashSet<String>,
28 /// Root event IDs (1617/1618/1619/1621) that reference this repo
29 pub root_events: HashSet<EventId>,
30}
31```
32
33### RelaySyncIndex (Confirmed State + Connection)
34
35```rust
36/// What we've CONFIRMED syncing - includes connection state for integrated lifecycle.
37/// Key: relay URL
38pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
39
40/// Connection status for a relay
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ConnectionStatus {
43 /// Not currently connected
44 Disconnected,
45 /// Connection attempt in progress
46 Connecting,
47 /// Successfully connected and subscribed
48 Connected,
49}
50
51/// Complete state for a single relay - combines sync needs with connection lifecycle
52#[derive(Debug)]
53pub struct RelayState {
54 /// Repos we've confirmed syncing from this relay
55 pub repos: HashSet<String>,
56 /// Root events we've confirmed tracking
57 pub root_events: HashSet<EventId>,
58 /// If true, never disconnect this relay
59 pub is_bootstrap: bool,
60 /// Current connection status
61 pub connection_status: ConnectionStatus,
62 /// When we last successfully connected (for since filter on reconnect)
63 pub last_connected: Option<Timestamp>,
64 /// When we disconnected (for 15-minute state retention rule)
65 pub disconnected_at: Option<Timestamp>,
66 /// The active connection (None if disconnected)
67 pub connection: Option<RelayConnection>,
68}
69
70impl RelayState {
71 /// Check if state should be cleared based on 15-minute rule
72 pub fn should_clear_state(&self) -> bool {
73 match self.disconnected_at {
74 Some(disconnected) => {
75 let now = Timestamp::now();
76 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
77 }
78 None => false, // Still connected or never connected
79 }
80 }
81
82 /// Clear repos and root_events (called when reconnect takes > 15 minutes)
83 pub fn clear_sync_state(&mut self) {
84 self.repos.clear();
85 self.root_events.clear();
86 }
87}
88```
89
90### PendingSyncIndex (In-Flight Batches)
91
92```rust
93/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
94/// Each batch has its own ID and can confirm independently.
95/// Key: relay URL
96pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
97
98#[derive(Debug, Clone)]
99pub struct PendingBatch {
100 /// Unique ID for this batch (for debugging/logging)
101 pub batch_id: u64,
102 /// The items this batch is syncing
103 pub items: PendingItems,
104 /// Subscription IDs that must ALL receive EOSE before confirming
105 pub outstanding_subs: HashSet<SubscriptionId>,
106}
107
108#[derive(Debug, Clone, Default)]
109pub struct PendingItems {
110 pub repos: HashSet<String>,
111 pub root_events: HashSet<EventId>,
112}
113```
114
115---
116
117## State Flow
118
119```mermaid
120flowchart TB
121 subgraph Input
122 SS[SelfSubscriber]
123 OWN[Own Relay]
124 end
125
126 subgraph RepoSyncIndex - Want
127 RSI[HashMap of Repo to Relays+Events]
128 end
129
130 subgraph Derived Target
131 DT[derive_relay_targets fn]
132 TGT[Per-relay: repos + events we should sync]
133 end
134
135 subgraph PendingSyncIndex - In Flight
136 PSI[Vec of PendingBatch per relay]
137 end
138
139 subgraph RelaySyncIndex - State + Connection
140 RLI[RelayState per relay]
141 CONN[connection: Option of RelayConnection]
142 STATUS[connection_status: Connected/Disconnected/Connecting]
143 REPOS[repos + root_events]
144 end
145
146 SS -->|subscribe| OWN
147 OWN -->|events| SS
148 SS -->|batch fires| RSI
149 RSI --> DT
150 DT --> TGT
151 TGT -->|diff: target - pending - confirmed| DIFF[Compute new items]
152 PSI --> DIFF
153 RLI --> DIFF
154 DIFF -->|skip if disconnected| CHECK{Connected?}
155 CHECK -->|yes| AF[AddFilters]
156 CHECK -->|no| QUEUE[Queued in RelayState.repos]
157 AF -->|subscribe| CONN
158 AF -->|create batch| PSI
159 CONN -->|EOSE| PSI
160 PSI -->|batch complete| REPOS
161 CONN -->|disconnect event| DISC[Mark Disconnected + set disconnected_at]
162 DISC -->|reconnect| RECONN[On Reconnect]
163 RECONN -->|check 15min rule| RULE{disconnected > 15min?}
164 RULE -->|yes| CLEAR[Clear repos/root_events]
165 RULE -->|no| RETAIN[Keep retained state]
166 CLEAR --> REGEN[Regenerate AddFilters from RepoSyncIndex]
167 RETAIN --> RESUB[Resubscribe with since filter]
168```
169
170### Connection Lifecycle Integration
171
172The `RelayState` struct now owns both the connection and sync state:
173
174```rust
175// On disconnect (detected via RelayPoolNotification::Shutdown or handle_notifications returning)
176fn handle_disconnect(&mut self, relay_url: &str) {
177 if let Some(state) = self.relay_sync_index.write().await.get_mut(relay_url) {
178 state.connection_status = ConnectionStatus::Disconnected;
179 state.disconnected_at = Some(Timestamp::now());
180 state.connection = None;
181
182 // Clear any pending batches for this relay
183 self.pending_sync_index.write().await.remove(relay_url);
184 }
185}
186
187// On reconnect
188async fn handle_reconnect(&mut self, relay_url: &str) -> Result<(), Error> {
189 let mut index = self.relay_sync_index.write().await;
190 let state = index.get_mut(relay_url).ok_or("Relay not in index")?;
191
192 // Apply 15-minute state retention rule
193 if state.should_clear_state() {
194 tracing::info!("Reconnect after >15min for {}, clearing state", relay_url);
195 state.clear_sync_state();
196 }
197
198 // Create new connection
199 state.connection_status = ConnectionStatus::Connecting;
200 let connection = RelayConnection::new(relay_url.to_string());
201
202 // Connect with since filter if we have last_connected
203 let since = state.last_connected.map(|ts| {
204 Timestamp::from(ts.as_u64().saturating_sub(900)) // -15 min buffer
205 });
206
207 connection.connect_and_subscribe_with_since(since).await?;
208
209 state.connection = Some(connection);
210 state.connection_status = ConnectionStatus::Connected;
211 state.last_connected = Some(Timestamp::now());
212 state.disconnected_at = None;
213
214 drop(index); // Release lock
215
216 // Regenerate AddFilters from current state (either retained or fresh from RepoSyncIndex)
217 self.regenerate_filters_for_relay(relay_url).await;
218
219 Ok(())
220}
221
222/// Regenerate AddFilters for a relay after reconnection
223async fn regenerate_filters_for_relay(&mut self, relay_url: &str) {
224 let repo_index = self.repo_sync_index.read().await;
225 let targets = derive_relay_targets(&repo_index);
226
227 if let Some(target) = targets.get(relay_url) {
228 // Build filters for everything this relay should sync
229 let filters = build_filters(&target.repos, &target.root_events);
230
231 // Create and process AddFilters action
232 let action = AddFilters {
233 relay_url: relay_url.to_string(),
234 repos: target.repos.clone(),
235 root_events: target.root_events.clone(),
236 filters,
237 };
238
239 self.handle_add_filters(action).await;
240 }
241}
242```
243
244---
245
246## Action Type
247
248```rust
249/// Action sent from SelfSubscriber to SyncManager.
250/// SyncManager auto-spawns relay connections if they don't exist.
251pub struct AddFilters {
252 pub relay_url: String,
253 /// Items this action covers (for pending tracking)
254 pub repos: HashSet<String>,
255 pub root_events: HashSet<EventId>,
256 /// Pre-batched filters (each with <= 100 tags)
257 pub filters: Vec<Filter>,
258}
259```
260
261---
262
263## Core Algorithms
264
265### 1. derive_relay_targets
266
267Transform RepoSyncIndex into per-relay sync targets:
268
269```rust
270fn derive_relay_targets(
271 repo_index: &HashMap<String, RepoSyncNeeds>
272) -> HashMap<String, RelaySyncNeeds> {
273 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
274
275 for (repo_ref, needs) in repo_index {
276 for relay_url in &needs.relays {
277 let target = targets.entry(relay_url.clone()).or_default();
278 target.repos.insert(repo_ref.clone());
279 target.root_events.extend(needs.root_events.iter().cloned());
280 }
281 }
282
283 targets
284}
285```
286
287### 2. compute_actions (Three-Way Diff)
288
289```rust
290fn compute_actions(
291 targets: &HashMap<String, RelaySyncNeeds>,
292 pending: &HashMap<String, Vec<PendingBatch>>,
293 confirmed: &HashMap<String, RelayState>,
294) -> Vec<AddFilters> {
295 let mut actions = Vec::new();
296
297 for (relay_url, target) in targets {
298 // Skip disconnected relays - they'll get AddFilters on reconnect
299 if let Some(state) = confirmed.get(relay_url) {
300 if state.connection_status != ConnectionStatus::Connected {
301 continue;
302 }
303 }
304
305 // Collect all pending items for this relay
306 let pending_repos: HashSet<_> = pending.get(relay_url)
307 .map(|batches| batches.iter()
308 .flat_map(|b| b.items.repos.iter().cloned())
309 .collect())
310 .unwrap_or_default();
311 let pending_events: HashSet<_> = pending.get(relay_url)
312 .map(|batches| batches.iter()
313 .flat_map(|b| b.items.root_events.iter().cloned())
314 .collect())
315 .unwrap_or_default();
316
317 // Collect confirmed items for this relay
318 let confirmed_repos = confirmed.get(relay_url)
319 .map(|c| &c.repos)
320 .unwrap_or(&HashSet::new());
321 let confirmed_events = confirmed.get(relay_url)
322 .map(|c| &c.root_events)
323 .unwrap_or(&HashSet::new());
324
325 // New = target - pending - confirmed
326 let new_repos: HashSet<_> = target.repos.iter()
327 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
328 .cloned()
329 .collect();
330 let new_events: HashSet<_> = target.root_events.iter()
331 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
332 .cloned()
333 .collect();
334
335 if !new_repos.is_empty() || !new_events.is_empty() {
336 let filters = build_filters(&new_repos, &new_events);
337 actions.push(AddFilters {
338 relay_url: relay_url.clone(),
339 repos: new_repos,
340 root_events: new_events,
341 filters,
342 });
343 }
344 }
345
346 actions
347}
348```
349
350### 3. handle_add_filters (SyncManager)
351
352```rust
353impl SyncManager {
354 async fn handle_add_filters(&mut self, action: AddFilters) {
355 let AddFilters { relay_url, repos, root_events, filters } = action;
356
357 // Auto-spawn connection if needed
358 if !self.connections.contains_key(&relay_url) {
359 self.spawn_connection(&relay_url).await;
360 }
361
362 let conn = self.connections.get(&relay_url).unwrap();
363
364 // Subscribe and collect subscription IDs
365 // nostr-sdk 0.44: subscribe returns Output<Vec<SubscriptionId>>
366 // since we're only subscribed to one relay per connection
367 let mut sub_ids = HashSet::new();
368 for filter in filters {
369 // cloned filter for each subscription call
370 match conn.client.subscribe(filter, None).await {
371 Ok(output) => {
372 // Output contains subscription IDs for each relay
373 for sub_id in output.val {
374 sub_ids.insert(sub_id);
375 }
376 }
377 Err(e) => {
378 tracing::warn!("Failed to subscribe: {}", e);
379 }
380 }
381 }
382
383 // Create pending batch
384 let batch = PendingBatch {
385 batch_id: self.next_batch_id(),
386 items: PendingItems { repos, root_events },
387 outstanding_subs: sub_ids,
388 };
389
390 // Add to pending index
391 self.pending_sync_index.write().await
392 .entry(relay_url)
393 .or_default()
394 .push(batch);
395 }
396}
397```
398
399### 4. handle_eose (Batch Completion)
400
401```rust
402impl SyncManager {
403 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
404 let mut pending = self.pending_sync_index.write().await;
405
406 if let Some(batches) = pending.get_mut(relay_url) {
407 // Find which batch this subscription belongs to
408 for batch in batches.iter_mut() {
409 if batch.outstanding_subs.remove(&sub_id) {
410 // Check if batch is now complete
411 if batch.outstanding_subs.is_empty() {
412 // Move items to confirmed
413 let items = batch.items.clone();
414 drop(pending); // Release lock before acquiring another
415
416 let mut confirmed = self.relay_sync_index.write().await;
417 let relay_confirmed = confirmed
418 .entry(relay_url.to_string())
419 .or_default();
420 relay_confirmed.repos.extend(items.repos);
421 relay_confirmed.root_events.extend(items.root_events);
422
423 tracing::info!(
424 "Batch {} complete for {} - confirmed {} repos, {} events",
425 batch.batch_id, relay_url,
426 items.repos.len(), items.root_events.len()
427 );
428 }
429 break;
430 }
431 }
432
433 // Clean up completed batches
434 if let Some(batches) = pending.get_mut(relay_url) {
435 batches.retain(|b| !b.outstanding_subs.is_empty());
436 }
437 }
438 }
439}
440```
441
442---
443
444## Self-Subscriber Flow
445
446### State Tracking
447
448```rust
449pub struct SelfSubscriber {
450 own_relay_url: String,
451 relay_domain: String,
452 repo_sync_index: RepoSyncIndex,
453 pending_sync_index: PendingSyncIndex,
454 relay_sync_index: RelaySyncIndex,
455 action_tx: mpsc::Sender<AddFilters>,
456 /// Timestamp of last successful connection - used for since filter on reconnection
457 last_connected: Option<Timestamp>,
458 /// Is this the first connection attempt since startup?
459 is_initial_connect: bool,
460}
461```
462
463### On Startup
464
465```rust
466impl SelfSubscriber {
467 async fn run(mut self) {
468 // Connect to own relay
469 let client = Client::new(Keys::generate());
470 client.add_relay(&self.own_relay_url).await?;
471 client.connect().await;
472
473 // Track connection time
474 self.last_connected = Some(Timestamp::now());
475
476 // Subscribe WITHOUT since filter (get all historical) on first connect
477 let filter = Filter::new().kinds([
478 Kind::Custom(30617), // Repository announcements
479 Kind::GitPatch, // 1617
480 Kind::Custom(1618), // PRs
481 Kind::Custom(1619), // PR updates
482 Kind::GitIssue, // 1621
483 ]);
484
485 client.subscribe(filter, None).await?;
486 self.is_initial_connect = false;
487
488 // Run event loop with batching
489 self.event_loop(&client).await;
490 }
491}
492```
493
494### On Reconnection
495
496```rust
497impl SelfSubscriber {
498 async fn reconnect(&mut self, client: &Client) -> Result<(), Error> {
499 // Reconnect to own relay
500 client.connect().await;
501
502 // On reconnection ONLY, use since filter based on last_connected
503 let since = match self.last_connected {
504 Some(ts) => Timestamp::from(ts.as_u64().saturating_sub(900)), // -15 minutes buffer
505 None => Timestamp::from(0), // Shouldn't happen, but fall back to full sync
506 };
507
508 // Update last_connected AFTER computing since
509 self.last_connected = Some(Timestamp::now());
510
511 let filter = Filter::new()
512 .kinds([
513 Kind::Custom(30617),
514 Kind::GitPatch,
515 Kind::Custom(1618),
516 Kind::Custom(1619),
517 Kind::GitIssue,
518 ])
519 .since(since);
520
521 client.subscribe(filter, None).await?;
522 Ok(())
523 }
524}
525```
526
527### Batching Logic
528
529```rust
530impl SelfSubscriber {
531 async fn event_loop(&self, client: &Client) {
532 let mut pending_events: Vec<Event> = Vec::new();
533 let mut batch_timer: Option<Instant> = None;
534 let batch_window = Duration::from_secs(5);
535
536 loop {
537 let timeout = batch_timer
538 .map(|t| batch_window.saturating_sub(t.elapsed()))
539 .unwrap_or(Duration::from_secs(60));
540
541 tokio::select! {
542 notification = client.notifications().recv() => {
543 if let Ok(RelayPoolNotification::Event { event, .. }) = notification {
544 pending_events.push(*event);
545
546 // Start timer on first event (does NOT reset)
547 if batch_timer.is_none() {
548 batch_timer = Some(Instant::now());
549 }
550 }
551 }
552 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
553 // Batch window elapsed
554 self.process_batch(pending_events.drain(..).collect()).await;
555 batch_timer = None;
556 }
557 }
558 }
559 }
560
561 async fn process_batch(&self, events: Vec<Event>) {
562 // 1. Update RepoSyncIndex
563 for event in events {
564 match event.kind.as_u16() {
565 30617 => self.handle_announcement(&event).await,
566 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
567 _ => {}
568 }
569 }
570
571 // 2. Derive targets and compute actions
572 let repo_index = self.repo_sync_index.read().await;
573 let targets = derive_relay_targets(&repo_index);
574
575 let pending = self.pending_sync_index.read().await;
576 let confirmed = self.relay_sync_index.read().await;
577
578 let actions = compute_actions(&targets, &pending, &confirmed);
579
580 drop(repo_index);
581 drop(pending);
582 drop(confirmed);
583
584 // 3. Send actions to SyncManager
585 for action in actions {
586 let _ = self.action_tx.send(action).await;
587 }
588 }
589}
590```
591
592---
593
594## Bootstrap Relay
595
596```rust
597impl SyncManager {
598 async fn initialize_bootstrap(&mut self) {
599 if let Some(url) = &self.config.bootstrap_relay_url {
600 // Pre-mark as bootstrap (never removed)
601 self.relay_sync_index.write().await.insert(
602 url.clone(),
603 RelaySyncNeeds {
604 repos: HashSet::new(),
605 root_events: HashSet::new(),
606 is_bootstrap: true,
607 }
608 );
609
610 // Send Layer 1 filter
611 let filters = vec![
612 Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)])
613 ];
614
615 self.handle_add_filters(AddFilters {
616 relay_url: url.clone(),
617 repos: HashSet::new(), // Layer 1 doesn't track specific repos
618 root_events: HashSet::new(),
619 filters,
620 }).await;
621 }
622 }
623}
624```
625
626---
627
628## Disconnect Handling
629
630Direct in SyncManager (not via action):
631
632```rust
633impl SyncManager {
634 async fn check_disconnects(&mut self) {
635 let confirmed = self.relay_sync_index.read().await;
636
637 for (relay_url, state) in confirmed.iter() {
638 if state.is_bootstrap {
639 continue; // Never disconnect bootstrap
640 }
641
642 if state.repos.is_empty() && state.root_events.is_empty() {
643 // No repos - disconnect
644 self.disconnect_relay(relay_url).await;
645 }
646 }
647 }
648
649 async fn disconnect_relay(&mut self, relay_url: &str) {
650 self.relay_sync_index.write().await.remove(relay_url);
651 self.pending_sync_index.write().await.remove(relay_url);
652
653 if let Some(conn) = self.connections.remove(relay_url) {
654 conn.disconnect().await;
655 }
656 }
657}
658```
659
660---
661
662## Relay Connection Lifecycle
663
664### State Machine for External Relays
665
666```mermaid
667stateDiagram-v2
668 [*] --> Connecting: spawn_connection
669 Connecting --> Connected: success
670 Connecting --> Backoff: failure
671 Connected --> Disconnected: connection lost
672 Connected --> [*]: intentional disconnect
673 Disconnected --> Backoff: record_failure
674 Backoff --> Connecting: backoff elapsed
675 Backoff --> Dead: 24h continuous failures
676 Dead --> Connecting: daily retry
677```
678
679### Health Integration
680
681Uses `RelayHealthTracker` from [`src/sync/health.rs`](../../src/sync/health.rs):
682
683```rust
684impl SyncManager {
685 /// Spawn a connection with health tracking
686 async fn spawn_connection(&mut self, relay_url: &str) {
687 // Check if we should attempt connection
688 if !self.health_tracker.should_attempt_connection(relay_url) {
689 let remaining = self.health_tracker.get_remaining_backoff(relay_url);
690 tracing::debug!(
691 "Skipping connection to {} - backoff {:?}",
692 relay_url,
693 remaining
694 );
695 return;
696 }
697
698 match self.try_connect(relay_url).await {
699 Ok(conn) => {
700 self.health_tracker.record_success(relay_url);
701 self.connections.insert(relay_url.to_string(), conn);
702 }
703 Err(e) => {
704 self.health_tracker.record_failure(relay_url);
705 tracing::warn!("Connection to {} failed: {}", relay_url, e);
706 }
707 }
708 }
709}
710```
711
712### Reconnection Loop
713
714Each relay connection runs its own reconnection loop:
715
716```rust
717impl RelayConnection {
718 async fn run_with_reconnection(
719 mut self,
720 health_tracker: Arc<RelayHealthTracker>,
721 event_tx: mpsc::Sender<RelayEvent>,
722 ) {
723 loop {
724 // Check backoff before attempting
725 if !health_tracker.should_attempt_connection(&self.url) {
726 if let Some(remaining) = health_tracker.get_remaining_backoff(&self.url) {
727 tokio::time::sleep(remaining).await;
728 continue;
729 }
730 }
731
732 // Attempt connection
733 match self.connect_and_subscribe().await {
734 Ok(()) => {
735 health_tracker.record_success(&self.url);
736
737 // Track when we connected for since filter on reconnect
738 let connected_at = Timestamp::now();
739
740 // Run event loop until disconnection
741 self.run_event_loop(&event_tx).await;
742
743 // Connection lost - will reconnect with since filter
744 health_tracker.record_failure(&self.url);
745
746 // On reconnect, use since = connected_at - 15 minutes
747 self.set_reconnect_since(connected_at);
748 }
749 Err(e) => {
750 health_tracker.record_failure(&self.url);
751 tracing::warn!("Connection to {} failed: {}", self.url, e);
752 }
753 }
754
755 // Get backoff duration and wait
756 let state = health_tracker.get_state(&self.url);
757 if state == HealthState::Dead {
758 // Dead relays retry once per 24 hours
759 tokio::time::sleep(Duration::from_secs(24 * 3600)).await;
760 }
761 // Otherwise, loop will check should_attempt_connection
762 }
763 }
764}
765```
766
767### Backoff Configuration
768
769From existing [`RelayHealthTracker`](../../src/sync/health.rs:91):
770
771| Parameter | Value | Notes |
772|-----------|-------|-------|
773| Base backoff | 5 seconds | First failure |
774| Backoff multiplier | 2x | Exponential increase |
775| Max backoff | 1 hour (configurable) | `sync_max_backoff_secs` |
776| Dead threshold | 24 hours | Continuous failures |
777| Dead retry interval | 24 hours | Once per day |
778
779---
780
781## Consolidation
782
783### Threshold-Based (70 filters)
784
785```rust
786impl SyncManager {
787 async fn maybe_consolidate(&mut self, relay_url: &str) {
788 let filter_count = self.get_filter_count(relay_url).await;
789
790 if filter_count > 70 {
791 self.consolidate(relay_url).await;
792 }
793 }
794
795 async fn consolidate(&mut self, relay_url: &str) {
796 // 1. Wait for all pending batches to complete
797 self.wait_pending_complete(relay_url).await;
798
799 // 2. Close all subscriptions
800 self.close_all_subs(relay_url).await;
801
802 // 3. Rebuild filters from confirmed state
803 let confirmed = self.relay_sync_index.read().await;
804 let state = confirmed.get(relay_url)?;
805 let filters = build_filters(&state.repos, &state.root_events);
806
807 // 4. Resubscribe with since = now - 15 minutes
808 let since = Timestamp::now() - 900;
809 for filter in filters {
810 self.subscribe(relay_url, filter.since(since)).await;
811 }
812 }
813}
814```
815
816### Daily Timer (23-25h Random)
817
818```rust
819impl SyncManager {
820 async fn run_daily_consolidation(&self) {
821 loop {
822 let hours = 23 + rand::random::<f64>() * 2.0;
823 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
824
825 for relay_url in self.connections.keys() {
826 self.consolidate(relay_url).await;
827 }
828 }
829 }
830}
831```
832
833---
834
835## Key Design Decisions
836
837| Decision | Choice | Rationale |
838|----------|--------|-----------|
839| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
840| Since filter | Only on reconnection | Initial subscribe gets full history |
841| Pending tracking | Per-batch with batch ID | Independent confirmation, no blocking |
842| EOSE requirement | All subs in batch must complete | Single repo may need multiple filter subs |
843| Action type | Struct not enum | Only one action type needed |
844| Relay spawning | Auto-spawn on AddFilters | Simplifies action logic |
845| Disconnect | Direct in SyncManager | Not worth an action type |
846| Consolidation | 70 filters + daily timer | Threshold for growth, timer for staleness |
847| Timestamps | In-memory only | Not critical for correctness |
848| Health tracking | Reuse existing RelayHealthTracker | Already implements exponential backoff, dead relay detection |
849| Reconnection backoff | Exponential to 1h max | Prevents hammering failed relays |
850| Dead relay policy | 24h threshold, daily retry | Balance between giving up and resource waste |
851| last_connected tracking | Per-connection in-memory | Enables 15-minute buffer on reconnect |
852| Connection ownership | Inside RelayState | Ties connection lifecycle to sync state, simpler than separate maps |
853| State retention rule | Clear if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
854| Skip disconnected | compute_actions skips disconnected | Prevents queuing AddFilters for offline relays |
855| Reconnect triggers | handle_notifications returns or Shutdown | nostr-sdk signals disconnect via event loop exit |
856| On-reconnect flow | Regenerate AddFilters from RepoSyncIndex | Fresh subscriptions for what we actually need |
857
858---
859
860## Module Structure
861
862```
863src/sync/
864├── mod.rs # SyncManager, main loop
865├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
866├── actions.rs # AddFilters struct, compute_actions
867├── self_subscriber.rs # SelfSubscriber, batching logic
868├── relay_connection.rs # Per-relay WebSocket connection
869├── consolidation.rs # Consolidation logic, daily timer
870├── health.rs # Health tracking (reuse from v2)
871└── metrics.rs # Prometheus metrics (reuse from v2) \ No newline at end of file
diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md
new file mode 100644
index 0000000..aba88a5
--- /dev/null
+++ b/docs/explanation/grasp-02-proactive-sync-v4.md
@@ -0,0 +1,1229 @@
1# GRASP-02: Proactive Sync v4 - Health & Reconnection Design
2
3## Overview
4
5This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles:
6
71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create
93. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions)
104. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch
115. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary
12
13---
14
15## Data Model
16
17### RepoSyncIndex (Source of Truth)
18
19```rust
20/// What we WANT to sync - derived from events received via self-subscription.
21/// Updated immediately when self-subscriber batch fires.
22/// Key: repo addressable ref - 30617:pubkey:identifier
23pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
24
25#[derive(Debug, Clone, Default)]
26pub struct RepoSyncNeeds {
27 /// Relay URLs listed in this repo's 30617 announcement
28 pub relays: HashSet<String>,
29 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
30 pub root_events: HashSet<EventId>,
31}
32```
33
34### RelaySyncIndex (Confirmed State + Connection)
35
36```rust
37/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
38/// Key: relay URL
39pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
40
41/// Connection status for a relay
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ConnectionStatus {
44 /// Not currently connected
45 Disconnected,
46 /// Connection attempt in progress
47 Connecting,
48 /// Successfully connected and subscribed
49 Connected,
50}
51
52/// Complete state for a single relay - combines sync needs with connection lifecycle
53#[derive(Debug)]
54pub struct RelayState {
55 /// Repos we have confirmed syncing from this relay
56 pub repos: HashSet<String>,
57 /// Root events we have confirmed tracking
58 pub root_events: HashSet<EventId>,
59 /// If true, never disconnect this relay
60 pub is_bootstrap: bool,
61 /// Current connection status
62 pub connection_status: ConnectionStatus,
63 /// When we last successfully connected - used for since filter on reconnect
64 pub last_connected: Option<Timestamp>,
65 /// When we disconnected - for 15-minute state retention rule
66 pub disconnected_at: Option<Timestamp>,
67 /// The active connection - None if disconnected
68 pub connection: Option<RelayConnection>,
69}
70
71impl RelayState {
72 /// Check if state should be cleared based on 15-minute rule
73 pub fn should_clear_state(&self) -> bool {
74 match self.disconnected_at {
75 Some(disconnected) => {
76 let now = Timestamp::now();
77 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
78 }
79 None => false, // Still connected or never connected
80 }
81 }
82
83 /// Clear repos and root_events - called when reconnect takes > 15 minutes
84 pub fn clear_sync_state(&mut self) {
85 self.repos.clear();
86 self.root_events.clear();
87 }
88}
89```
90
91### PendingSyncIndex (In-Flight Batches)
92
93```rust
94/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
95/// Each batch has its own ID and can confirm independently.
96/// Key: relay URL
97pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
98
99#[derive(Debug, Clone)]
100pub struct PendingBatch {
101 /// Unique ID for this batch - for debugging/logging
102 pub batch_id: u64,
103 /// The items this batch is syncing
104 pub items: PendingItems,
105 /// Subscription IDs that must ALL receive EOSE before confirming
106 pub outstanding_subs: HashSet<SubscriptionId>,
107}
108
109#[derive(Debug, Clone, Default)]
110pub struct PendingItems {
111 pub repos: HashSet<String>,
112 pub root_events: HashSet<EventId>,
113}
114```
115
116---
117
118## Connection Lifecycle State Machine
119
120```mermaid
121stateDiagram-v2
122 [*] --> Disconnected: discover relay via RepoSyncIndex
123 Disconnected --> Connecting: AddFilters triggers spawn_connection
124 Connecting --> Connected: success
125 Connecting --> Disconnected: failure + record in health tracker
126 Connected --> Disconnected: connection lost
127 Connected --> [*]: intentional disconnect via check_disconnects
128
129 note right of Disconnected: disconnected_at set for 15min rule
130 note right of Connected: last_connected tracked for since filter
131```
132
133---
134
135## Flow Scenarios
136
137### Scenario 1: Initial Connect via handle_connect_or_reconnect
138
139```mermaid
140flowchart TB
141 START[Startup] --> SS[Self-subscribe to own relay]
142 SS --> |no since filter| EVENTS[Receive historical events]
143 EVENTS --> RSI[Update RepoSyncIndex]
144 RSI --> DT[derive_relay_targets]
145 DT --> CA[compute_actions with targets and empty confirmed]
146 CA --> AF[AddFilters for each relay]
147 AF --> SPAWN{Relay connected?}
148 SPAWN --> |no| CONN[spawn_connection]
149 CONN --> HC[handle_connect_or_reconnect]
150 SPAWN --> |yes| SUB
151
152 subgraph handle_connect_or_reconnect - Fresh Sync
153 HC --> CHECK_FRESH{is_fresh_sync?}
154 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since]
155 L1 --> RCA[recompute_actions_for_relay]
156 end
157
158 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters]
159 SUB --> PB[Create PendingBatch]
160 PB --> EOSE[Wait for EOSE]
161 EOSE --> CONFIRM[Move items to confirmed repos/root_events]
162```
163
164**Key points:**
165- No `since` filter on initial connect - get full history
166- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()`
167- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
168- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking
169
170### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes
171
172```mermaid
173flowchart TB
174 DISC[Connection lost] --> MARK[Set disconnected_at = now]
175 MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay]
176 CLEAR_PEND --> WAIT[Wait for reconnection]
177 WAIT --> RECONN[Connection restored]
178 RECONN --> HC[handle_connect_or_reconnect]
179
180 subgraph handle_connect_or_reconnect - Quick Reconnect
181 HC --> CHECK{is_fresh_sync?}
182 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min]
183 SINCE --> L1[build_announcement_filter - with since]
184 L1 --> L23[rebuild_layer2_and_layer3 - with since]
185 L23 --> RCA[recompute_actions_for_relay]
186 end
187
188 RCA --> AF[AddFilters for new items only]
189 AF --> SUB[Subscribe]
190 SUB --> PB[Create PendingBatch]
191 PB --> EOSE[Wait for EOSE]
192 EOSE --> EXTEND[Extend confirmed state]
193```
194
195**Key points:**
196- PendingSyncIndex cleared on disconnect (not reconnect)
197- `handle_connect_or_reconnect`:
198 1. `build_announcement_filter(Some(since))` - Layer 1 with since
199 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
200 3. `recompute_actions_for_relay` - check for new items
201- since = last_connected - 15min ensures we catch events during disconnection
202
203### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes
204
205```mermaid
206flowchart TB
207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect]
208
209 subgraph handle_connect_or_reconnect - Stale Reconnect
210 HC --> CHECK{is_fresh_sync?}
211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state]
212 CLEAR --> L1[build_announcement_filter - no since]
213 L1 --> RCA[recompute_actions_for_relay]
214 end
215
216 RCA --> CA[compute_actions with empty confirmed]
217 CA --> AF[AddFilters for everything]
218 AF --> SUB[Subscribe - no since filter]
219 SUB --> PB[Create PendingBatch]
220 PB --> EOSE[Wait for EOSE]
221 EOSE --> CONFIRM[Populate confirmed state fresh]
222```
223
224**Key points:**
225- `should_clear_state()` returns true → triggers fresh sync
226- Same path as initial connect after clearing state
227- Layer 1: `build_announcement_filter(None)` - full history
228- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything
229
230### Scenario 4: Consolidation - Triggered on Filter Add
231
232```mermaid
233flowchart TB
234 AF[handle_add_filters called] --> COUNT{current + new > 70?}
235 COUNT --> |yes| CONSOLIDATE[consolidate]
236 CONSOLIDATE --> WAIT_PEND[wait_pending_complete]
237 WAIT_PEND --> CLOSE[unsubscribe_all]
238 CLOSE --> SINCE[since = now - 15min]
239 SINCE --> L1[build_announcement_filter - with since]
240 L1 --> L23[rebuild_layer2_and_layer3 - with since]
241 COUNT --> |no| SUB[Subscribe new filters]
242 SUB --> PB[Create PendingBatch]
243```
244
245**Key points:**
246- Consolidation checked in `handle_add_filters` BEFORE adding new filters
247- After closing all subscriptions, re-subscribe:
248 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
249 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
250- `since = now - 15min` prevents re-fetching old events
251- Keeps confirmed state, just reduces filter count
252
253### Scenario 5: Daily Timer - 23 to 25h Random
254
255```mermaid
256flowchart TB
257 DAILY[Daily timer fires] --> CLOSE[unsubscribe_all]
258 CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay]
259 CLEAR_PEND --> CLEAR_STATE[clear_sync_state]
260 CLEAR_STATE --> L1[build_announcement_filter - no since]
261 L1 --> RCA[recompute_actions_for_relay]
262 RCA --> CA[compute_actions with empty confirmed]
263 CA --> AF[AddFilters for everything]
264 AF --> SUB[Subscribe - no since filter]
265 SUB --> PB[Create PendingBatch]
266 PB --> EOSE[Wait for EOSE]
267 EOSE --> CONFIRM[Repopulate confirmed state]
268```
269
270**Key points:**
271- Daily timer is a full fresh sync, NOT consolidation
272- Clears both PendingSyncIndex and confirmed state
273- Layer 1: `build_announcement_filter(None)` - full history
274- Layer 2+3: via compute_actions with empty confirmed - full history
275- Detects any state drift accumulated over 24 hours
276
277---
278
279## Core Algorithms
280
281### 1. derive_relay_targets
282
283Transform RepoSyncIndex into per-relay sync targets:
284
285```rust
286/// Inverts RepoSyncIndex to get per-relay view
287fn derive_relay_targets(
288 repo_index: &HashMap<String, RepoSyncNeeds>
289) -> HashMap<String, RelaySyncNeeds> {
290 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
291
292 for (repo_ref, needs) in repo_index {
293 for relay_url in &needs.relays {
294 let target = targets.entry(relay_url.clone()).or_default();
295 target.repos.insert(repo_ref.clone());
296 target.root_events.extend(needs.root_events.iter().cloned());
297 }
298 }
299
300 targets
301}
302```
303
304### 2. compute_actions (Three-Way Diff)
305
306**This is the ONLY decision point for what NEW subscriptions to create.**
307
308```rust
309/// Computes AddFilters for items that are:
310/// - In targets (what we want)
311/// - NOT in pending (already in-flight)
312/// - NOT in confirmed (already confirmed)
313fn compute_actions(
314 targets: &HashMap<String, RelaySyncNeeds>,
315 pending: &HashMap<String, Vec<PendingBatch>>,
316 confirmed: &HashMap<String, RelayState>,
317) -> Vec<AddFilters> {
318 let mut actions = Vec::new();
319
320 for (relay_url, target) in targets {
321 // Skip disconnected relays - they will get AddFilters on reconnect
322 if let Some(state) = confirmed.get(relay_url) {
323 if state.connection_status != ConnectionStatus::Connected {
324 continue;
325 }
326 }
327
328 // Collect all pending items for this relay
329 let pending_repos: HashSet<_> = pending.get(relay_url)
330 .map(|batches| batches.iter()
331 .flat_map(|b| b.items.repos.iter().cloned())
332 .collect())
333 .unwrap_or_default();
334 let pending_events: HashSet<_> = pending.get(relay_url)
335 .map(|batches| batches.iter()
336 .flat_map(|b| b.items.root_events.iter().cloned())
337 .collect())
338 .unwrap_or_default();
339
340 // Collect confirmed items for this relay
341 let confirmed_repos = confirmed.get(relay_url)
342 .map(|c| &c.repos)
343 .unwrap_or(&HashSet::new());
344 let confirmed_events = confirmed.get(relay_url)
345 .map(|c| &c.root_events)
346 .unwrap_or(&HashSet::new());
347
348 // New = target - pending - confirmed
349 let new_repos: HashSet<_> = target.repos.iter()
350 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
351 .cloned()
352 .collect();
353 let new_events: HashSet<_> = target.root_events.iter()
354 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
355 .cloned()
356 .collect();
357
358 if !new_repos.is_empty() || !new_events.is_empty() {
359 let filters = build_filters(&new_repos, &new_events);
360 actions.push(AddFilters {
361 relay_url: relay_url.clone(),
362 repos: new_repos,
363 root_events: new_events,
364 filters,
365 });
366 }
367 }
368
369 actions
370}
371```
372
373### 3. Filter Building Functions (Three-Layer Strategy)
374
375The filter strategy uses three layers:
376- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
377- **Layer 2**: Events tagging our repos
378- **Layer 3**: Events tagging our root events
379
380**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
381
382```rust
383/// Layer 1: Announcements filter (kinds 30617 + 30618)
384/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
385/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
386fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
387 let filter = Filter::new().kinds([
388 Kind::Custom(30617), // Repository announcements
389 Kind::Custom(30618), // Maintainer lists
390 ]);
391
392 match since {
393 Some(ts) => filter.since(ts),
394 None => filter,
395 }
396}
397
398/// Layer 2: Events tagging one of our repos
399/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
400/// Batched per 100 repo refs.
401fn tagged_one_of_our_repo_event_filters(
402 repos: &HashSet<String>,
403 since: Option<Timestamp>,
404) -> Vec<Filter> {
405 let mut filters = Vec::new();
406 let repo_refs: Vec<_> = repos.iter().collect();
407
408 for chunk in repo_refs.chunks(100) {
409 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
410
411 // Lowercase 'a' tag - standard addressable reference
412 let mut f1 = Filter::new()
413 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
414 // Uppercase 'A' tag - some clients use this
415 let mut f2 = Filter::new()
416 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone());
417 // Quote 'q' tag - NIP-10 quote references to addressable events
418 let mut f3 = Filter::new()
419 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
420
421 if let Some(ts) = since {
422 f1 = f1.since(ts);
423 f2 = f2.since(ts);
424 f3 = f3.since(ts);
425 }
426
427 filters.push(f1);
428 filters.push(f2);
429 filters.push(f3);
430 }
431
432 filters
433}
434
435/// Layer 3: Events tagging one of our root events
436/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
437/// Batched per 100 event IDs.
438fn tagged_one_of_our_root_event_filters(
439 root_events: &HashSet<EventId>,
440 since: Option<Timestamp>,
441) -> Vec<Filter> {
442 let mut filters = Vec::new();
443 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
444
445 for chunk in event_ids.chunks(100) {
446 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
447
448 // Lowercase 'e' tag - standard event reference
449 let mut f1 = Filter::new()
450 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
451 // Uppercase 'E' tag - some clients use this
452 let mut f2 = Filter::new()
453 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone());
454 // Quote 'q' tag - NIP-10 quote references to events
455 let mut f3 = Filter::new()
456 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
457
458 if let Some(ts) = since {
459 f1 = f1.since(ts);
460 f2 = f2.since(ts);
461 f3 = f3.since(ts);
462 }
463
464 filters.push(f1);
465 filters.push(f2);
466 filters.push(f3);
467 }
468
469 filters
470}
471
472/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
473/// Used by:
474/// - compute_actions for incremental subscriptions
475/// - consolidation rebuilds (Layer 1 remains active)
476fn build_layer2_and_layer3_filters(
477 repos: &HashSet<String>,
478 root_events: &HashSet<EventId>,
479 since: Option<Timestamp>,
480) -> Vec<Filter> {
481 let mut filters = Vec::new();
482 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
483 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
484 filters
485}
486```
487
488**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently.
489
490### 4. handle_add_filters (SyncManager)
491
492```rust
493impl SyncManager {
494 async fn handle_add_filters(&mut self, action: AddFilters) {
495 let AddFilters { relay_url, repos, root_events, filters } = action;
496
497 // Auto-spawn connection if needed
498 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
499 match state {
500 None => {
501 // New relay discovered - create entry and spawn connection
502 self.relay_sync_index.write().await.insert(
503 relay_url.clone(),
504 RelayState {
505 repos: HashSet::new(),
506 root_events: HashSet::new(),
507 is_bootstrap: false,
508 connection_status: ConnectionStatus::Connecting,
509 last_connected: None,
510 disconnected_at: None,
511 connection: None,
512 }
513 );
514 self.spawn_connection(&relay_url).await;
515 return; // Subscriptions will happen on connection success
516 }
517 Some(state) if state.connection_status != ConnectionStatus::Connected => {
518 // Not connected - subscriptions will happen on connection success
519 return;
520 }
521 Some(_) => {
522 // Already connected - proceed with subscription
523 }
524 }
525
526 // Subscribe and collect subscription IDs
527 let conn = self.connections.get(&relay_url).unwrap();
528 let mut sub_ids = HashSet::new();
529
530 for filter in filters {
531 match conn.client.subscribe(filter, None).await {
532 Ok(output) => {
533 for sub_id in output.val {
534 sub_ids.insert(sub_id);
535 }
536 }
537 Err(e) => {
538 tracing::warn!("Failed to subscribe: {}", e);
539 }
540 }
541 }
542
543 // Create pending batch
544 let batch = PendingBatch {
545 batch_id: self.next_batch_id(),
546 items: PendingItems { repos, root_events },
547 outstanding_subs: sub_ids,
548 };
549
550 // Add to pending index
551 self.pending_sync_index.write().await
552 .entry(relay_url)
553 .or_default()
554 .push(batch);
555 }
556}
557```
558
559### 5. handle_disconnect
560
561```rust
562impl SyncManager {
563 /// Called when connection to a relay is lost
564 async fn handle_disconnect(&mut self, relay_url: &str) {
565 let mut index = self.relay_sync_index.write().await;
566
567 if let Some(state) = index.get_mut(relay_url) {
568 state.connection_status = ConnectionStatus::Disconnected;
569 state.disconnected_at = Some(Timestamp::now());
570 state.connection = None;
571 }
572
573 // Clear pending batches - these items were not confirmed
574 self.pending_sync_index.write().await.remove(relay_url);
575
576 // Remove from active connections map
577 self.connections.remove(relay_url);
578
579 // Health tracker records failure for backoff
580 self.health_tracker.record_failure(relay_url);
581 }
582}
583```
584
585### 6. handle_connect_or_reconnect (Unified)
586
587This method handles BOTH initial connection AND reconnection with unified logic:
588
589```rust
590impl SyncManager {
591 /// Called when connection to a relay succeeds - handles both initial connect and reconnect.
592 ///
593 /// Decision tree:
594 /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history
595 /// - Quick reconnect (<15min): since = last_connected - 15min
596 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
597 let mut index = self.relay_sync_index.write().await;
598 let state = match index.get_mut(relay_url) {
599 Some(s) => s,
600 None => return, // Relay was removed while disconnected
601 };
602
603 // Determine if this is a fresh sync or quick reconnect
604 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
605 let last_connected = state.last_connected;
606
607 if is_fresh_sync && state.last_connected.is_some() {
608 // Stale reconnect (>15min) - clear state
609 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
610 state.clear_sync_state();
611 }
612
613 // Update connection state
614 state.connection_status = ConnectionStatus::Connected;
615 state.last_connected = Some(Timestamp::now());
616 state.disconnected_at = None;
617
618 // Record success in health tracker
619 self.health_tracker.record_success(relay_url);
620
621 drop(index); // Release lock
622
623 let conn = match self.connections.get(relay_url) {
624 Some(c) => c,
625 None => return,
626 };
627
628 if is_fresh_sync {
629 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
630
631 // Step 1: Subscribe Layer 1 (announcements) without since
632 let layer1 = build_announcement_filter(None);
633 let _ = conn.client.subscribe(layer1, None).await;
634
635 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build)
636 self.recompute_actions_for_relay(relay_url).await;
637 } else {
638 // Quick reconnect: Layer 1 with since, Layer 2+3 with since
639 let since = last_connected
640 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
641 .unwrap_or(Timestamp::from(0));
642
643 // Step 1: Subscribe Layer 1 (announcements) with since
644 let layer1 = build_announcement_filter(Some(since));
645 let _ = conn.client.subscribe(layer1, None).await;
646
647 // Step 2: Rebuild Layer 2+3 for confirmed items with since
648 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
649
650 // Step 3: Check for NEW items via compute_actions
651 self.recompute_actions_for_relay(relay_url).await;
652 }
653 }
654
655 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1).
656 /// Used by:
657 /// - Quick reconnect: rebuild confirmed items with since filter
658 /// - Consolidation: close and rebuild with since filter
659 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
660 let confirmed = self.relay_sync_index.read().await;
661 let state = match confirmed.get(relay_url) {
662 Some(s) => s,
663 None => return,
664 };
665
666 // Build Layer 2+3 filters WITH since
667 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
668 drop(confirmed);
669
670 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
671 let conn = match self.connections.get(relay_url) {
672 Some(c) => c,
673 None => return,
674 };
675
676 for filter in filters {
677 let _ = conn.client.subscribe(filter, None).await;
678 }
679 }
680
681 /// Rerun compute_actions for a specific relay and process resulting AddFilters.
682 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
683 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
684 let repo_index = self.repo_sync_index.read().await;
685 let targets = derive_relay_targets(&repo_index);
686 drop(repo_index);
687
688 // Filter to just this relay
689 let target = match targets.get(relay_url) {
690 Some(t) => t.clone(),
691 None => return, // No repos reference this relay anymore
692 };
693
694 let pending = self.pending_sync_index.read().await;
695 let confirmed = self.relay_sync_index.read().await;
696
697 let mut single_relay_targets = HashMap::new();
698 single_relay_targets.insert(relay_url.to_string(), target);
699
700 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
701
702 drop(pending);
703 drop(confirmed);
704
705 // Process AddFilters
706 for action in actions {
707 self.handle_add_filters(action).await;
708 }
709 }
710}
711```
712
713### 7. Daily Timer
714
715```rust
716impl SyncManager {
717 async fn run_daily_timer(&self) {
718 loop {
719 // Random 23-25 hours
720 let hours = 23.0 + rand::random::<f64>() * 2.0;
721 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
722
723 let relay_urls: Vec<_> = self.relay_sync_index.read().await
724 .keys()
725 .cloned()
726 .collect();
727
728 for relay_url in relay_urls {
729 self.daily_sync(&relay_url).await;
730 }
731 }
732 }
733
734 /// Perform daily fresh sync for a relay
735 async fn daily_sync(&mut self, relay_url: &str) {
736 tracing::info!("Daily sync triggered for {}", relay_url);
737
738 // Close all subscriptions
739 if let Some(conn) = self.connections.get(relay_url) {
740 conn.client.unsubscribe_all().await;
741 }
742
743 // Clear PendingSyncIndex
744 self.pending_sync_index.write().await.remove(relay_url);
745
746 // Clear confirmed state - triggers fresh sync
747 {
748 let mut index = self.relay_sync_index.write().await;
749 if let Some(state) = index.get_mut(relay_url) {
750 state.clear_sync_state();
751 }
752 }
753
754 // Recompute actions - will generate AddFilters for everything
755 self.recompute_actions_for_relay(relay_url).await;
756 }
757}
758```
759
760### 8. Consolidation (Threshold-Based, Triggered on Add)
761
762Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active.
763
764```rust
765impl SyncManager {
766 /// Check filter count and consolidate if needed.
767 /// Called from handle_add_filters BEFORE adding new filters.
768 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
769 let current_count = self.get_filter_count(relay_url).await;
770
771 if current_count + new_filter_count > 70 {
772 self.consolidate(relay_url).await;
773 }
774 }
775
776 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active.
777 /// Does NOT clear state - just reduces filter count.
778 async fn consolidate(&mut self, relay_url: &str) {
779 tracing::info!("Consolidating filters for {} (count > 70)", relay_url);
780
781 // Wait for all pending batches to complete first
782 self.wait_pending_complete(relay_url).await;
783
784 // Close Layer 2+3 subscriptions only - Layer 1 remains active
785 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately
786 // For simplicity, we close all and re-add Layer 1
787 if let Some(conn) = self.connections.get(relay_url) {
788 conn.client.unsubscribe_all().await;
789 }
790
791 // Re-subscribe Layer 1 with since (maintains announcements stream)
792 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
793 let conn = self.connections.get(relay_url).unwrap();
794 let layer1 = build_announcement_filter(Some(since));
795 let _ = conn.client.subscribe(layer1, None).await;
796
797 // Rebuild Layer 2+3 only
798 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
799 }
800}
801```
802
803**Updated handle_add_filters to check consolidation:**
804
805```rust
806impl SyncManager {
807 async fn handle_add_filters(&mut self, action: AddFilters) {
808 let AddFilters { relay_url, repos, root_events, filters } = action;
809
810 // Auto-spawn connection if needed (unchanged)
811 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
812 match state {
813 None => {
814 // New relay discovered - create entry and spawn connection
815 self.relay_sync_index.write().await.insert(
816 relay_url.clone(),
817 RelayState {
818 repos: HashSet::new(),
819 root_events: HashSet::new(),
820 is_bootstrap: false,
821 connection_status: ConnectionStatus::Connecting,
822 last_connected: None,
823 disconnected_at: None,
824 connection: None,
825 }
826 );
827 self.spawn_connection(&relay_url).await;
828 return; // Subscriptions will happen on connection success
829 }
830 Some(state) if state.connection_status != ConnectionStatus::Connected => {
831 return; // Not connected - subscriptions will happen on connection success
832 }
833 Some(_) => {
834 // Already connected - proceed
835 }
836 }
837
838 // CHECK CONSOLIDATION BEFORE ADDING
839 self.maybe_consolidate(&relay_url, filters.len()).await;
840
841 // Subscribe and collect subscription IDs
842 let conn = self.connections.get(&relay_url).unwrap();
843 let mut sub_ids = HashSet::new();
844
845 for filter in filters {
846 match conn.client.subscribe(filter, None).await {
847 Ok(output) => {
848 for sub_id in output.val {
849 sub_ids.insert(sub_id);
850 }
851 }
852 Err(e) => {
853 tracing::warn!("Failed to subscribe: {}", e);
854 }
855 }
856 }
857
858 // Create pending batch (unchanged)
859 let batch = PendingBatch {
860 batch_id: self.next_batch_id(),
861 items: PendingItems { repos, root_events },
862 outstanding_subs: sub_ids,
863 };
864
865 self.pending_sync_index.write().await
866 .entry(relay_url)
867 .or_default()
868 .push(batch);
869 }
870}
871```
872
873---
874
875## Disconnect (Relay Removal) Handling
876
877```rust
878impl SyncManager {
879 /// Periodically check for relays that should be disconnected
880 async fn check_disconnects(&mut self) {
881 let confirmed = self.relay_sync_index.read().await;
882 let relays_to_disconnect: Vec<_> = confirmed.iter()
883 .filter(|(_, state)| {
884 !state.is_bootstrap &&
885 state.repos.is_empty() &&
886 state.root_events.is_empty()
887 })
888 .map(|(url, _)| url.clone())
889 .collect();
890 drop(confirmed);
891
892 for relay_url in relays_to_disconnect {
893 self.disconnect_relay(&relay_url).await;
894 }
895 }
896
897 async fn disconnect_relay(&mut self, relay_url: &str) {
898 tracing::info!("Disconnecting relay {} (no repos)", relay_url);
899
900 self.relay_sync_index.write().await.remove(relay_url);
901 self.pending_sync_index.write().await.remove(relay_url);
902
903 if let Some(conn) = self.connections.remove(relay_url) {
904 let _ = conn.client.disconnect().await;
905 }
906 }
907}
908```
909
910---
911
912## State Flow Summary
913
914```mermaid
915flowchart TB
916 subgraph Input
917 SS[SelfSubscriber]
918 OWN[Own Relay]
919 end
920
921 subgraph RepoSyncIndex - What We Want
922 RSI[HashMap: Repo to Relays+Events]
923 end
924
925 subgraph Derived Target
926 DT[derive_relay_targets fn]
927 TGT[Per-relay: repos + events we should sync]
928 end
929
930 subgraph compute_actions - Decision Point
931 CA[Three-way diff: target - pending - confirmed]
932 end
933
934 subgraph PendingSyncIndex - In Flight
935 PSI[Vec PendingBatch per relay]
936 end
937
938 subgraph RelaySyncIndex - Confirmed State
939 RLI[RelayState per relay]
940 CONN[connection_status]
941 REPOS[repos + root_events]
942 TIMES[last_connected + disconnected_at]
943 end
944
945 SS -->|subscribe| OWN
946 OWN -->|events| SS
947 SS -->|batch fires| RSI
948 RSI --> DT
949 DT --> TGT
950 TGT --> CA
951 PSI --> CA
952 RLI --> CA
953 CA -->|Layer 2+3 new items| AF[AddFilters]
954 AF -->|check filter count| CONSOL{count + new > 70?}
955 CONSOL -->|yes| CONSOLIDATE[consolidate]
956 CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since]
957 L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since]
958 CONSOL -->|no| SUB[subscribe]
959 AF -->|spawn if needed| CONN
960 SUB --> PSI
961 PSI -->|EOSE| REPOS
962
963 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
964 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
965
966 subgraph handle_connect_or_reconnect
967 HC --> FRESH_CHECK{is_fresh_sync?}
968 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
969 FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since]
970 L1_FRESH --> RCA1[recompute_actions_for_relay]
971 L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since]
972 L23_QUICK --> RCA2[recompute_actions_for_relay]
973 end
974```
975
976---
977
978## Key Design Decisions
979
980| Decision | Choice | Rationale |
981|----------|--------|-----------|
982| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
983| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
984| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
985| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
986| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
987| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events |
988| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
989| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
990| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
991| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
992| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream |
993| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect |
994| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
995| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
996| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
997| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
998| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
999
1000---
1001
1002## Module Structure
1003
1004```
1005src/sync/
1006├── mod.rs # SyncManager, main loop
1007├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
1008├── actions.rs # AddFilters struct, compute_actions, build_filters
1009├── self_subscriber.rs # SelfSubscriber, batching logic
1010├── relay_connection.rs # Per-relay WebSocket connection
1011├── consolidation.rs # Consolidation logic, daily timer
1012├── health.rs # Health tracking (reuse from v2)
1013└── metrics.rs # Prometheus metrics (reuse from v2)
1014```
1015
1016---
1017
1018## Comparison: v3 vs v4
1019
1020| Aspect | v3 | v4 |
1021|--------|----|----|
1022| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1023| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1024| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1025| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1026| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1027| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1028| PSI clearing | On disconnect | On disconnect (confirmed) |
1029| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
1030
1031---
1032
1033## Self-Subscriber Flow
1034
1035The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions.
1036
1037### State Tracking
1038
1039```rust
1040pub struct SelfSubscriber {
1041 own_relay_url: String,
1042 relay_domain: String,
1043 repo_sync_index: RepoSyncIndex,
1044 pending_sync_index: PendingSyncIndex,
1045 relay_sync_index: RelaySyncIndex,
1046 action_tx: mpsc::Sender<AddFilters>,
1047 /// Timestamp of last successful connection - used for since filter on reconnection
1048 last_connected: Option<Timestamp>,
1049 /// The active client connection
1050 client: Option<Client>,
1051}
1052```
1053
1054### On Startup / Reconnect (Unified)
1055
1056Both initial startup and reconnection use the same `connect_and_subscribe` method:
1057
1058```rust
1059impl SelfSubscriber {
1060 async fn run(mut self) {
1061 loop {
1062 // Connect or reconnect
1063 if let Err(e) = self.connect_and_subscribe().await {
1064 tracing::warn!("Connection failed: {}, will retry", e);
1065 tokio::time::sleep(Duration::from_secs(5)).await;
1066 continue;
1067 }
1068
1069 // Run event loop until disconnection
1070 self.event_loop().await;
1071
1072 // Loop will retry connection
1073 }
1074 }
1075
1076 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
1077 let client = Client::new(Keys::generate());
1078 client.add_relay(&self.own_relay_url).await?;
1079 client.connect().await;
1080
1081 // Build filter - add since only on reconnect
1082 let filter = Filter::new().kinds([
1083 Kind::Custom(30617), // Repository announcements
1084 Kind::GitPatch, // 1617
1085 Kind::Custom(1618), // PRs
1086 Kind::Custom(1619), // PR updates
1087 Kind::GitIssue, // 1621
1088 ]);
1089
1090 let filter = if let Some(ts) = self.last_connected {
1091 // Reconnection: use since filter
1092 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
1093 filter.since(since)
1094 } else {
1095 // Initial connect: no since filter - get full history
1096 filter
1097 };
1098
1099 // Update last_connected AFTER computing since
1100 self.last_connected = Some(Timestamp::now());
1101
1102 client.subscribe(filter, None).await?;
1103 self.client = Some(client);
1104 Ok(())
1105 }
1106}
1107```
1108
1109### Event Loop with Batching
1110
1111```rust
1112impl SelfSubscriber {
1113 async fn event_loop(&mut self) {
1114 let client = self.client.as_ref().unwrap();
1115 let mut pending_events: Vec<Event> = Vec::new();
1116 let mut batch_timer: Option<Instant> = None;
1117 let batch_window = Duration::from_secs(5);
1118
1119 loop {
1120 let timeout = batch_timer
1121 .map(|t| batch_window.saturating_sub(t.elapsed()))
1122 .unwrap_or(Duration::from_secs(60));
1123
1124 tokio::select! {
1125 notification = client.notifications().recv() => {
1126 match notification {
1127 Ok(RelayPoolNotification::Event { event, .. }) => {
1128 pending_events.push(*event);
1129
1130 // Start timer on first event - does NOT reset
1131 if batch_timer.is_none() {
1132 batch_timer = Some(Instant::now());
1133 }
1134 }
1135 Ok(RelayPoolNotification::Shutdown) => {
1136 // Connection lost
1137 break;
1138 }
1139 _ => {}
1140 }
1141 }
1142 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
1143 // Batch window elapsed
1144 self.process_batch(pending_events.drain(..).collect()).await;
1145 batch_timer = None;
1146 }
1147 }
1148 }
1149 }
1150
1151 async fn process_batch(&self, events: Vec<Event>) {
1152 // 1. Update RepoSyncIndex
1153 for event in events {
1154 match event.kind.as_u16() {
1155 30617 => self.handle_announcement(&event).await,
1156 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
1157 _ => {}
1158 }
1159 }
1160
1161 // 2. Derive targets and compute actions
1162 let repo_index = self.repo_sync_index.read().await;
1163 let targets = derive_relay_targets(&repo_index);
1164
1165 let pending = self.pending_sync_index.read().await;
1166 let confirmed = self.relay_sync_index.read().await;
1167
1168 let actions = compute_actions(&targets, &pending, &confirmed);
1169
1170 drop(repo_index);
1171 drop(pending);
1172 drop(confirmed);
1173
1174 // 3. Send actions to SyncManager
1175 for action in actions {
1176 let _ = self.action_tx.send(action).await;
1177 }
1178 }
1179
1180 async fn handle_announcement(&self, event: &Event) {
1181 // Extract repo_ref from event - 30617:pubkey:identifier
1182 let d_tag = event.tags.iter()
1183 .find_map(|tag| {
1184 if tag.kind() == TagKind::D {
1185 tag.content().map(|s| s.to_string())
1186 } else {
1187 None
1188 }
1189 })
1190 .unwrap_or_default();
1191
1192 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1193
1194 // Extract relay URLs from 'r' tags
1195 let relays: HashSet<String> = event.tags.iter()
1196 .filter_map(|tag| {
1197 if tag.kind() == TagKind::Relay {
1198 tag.content().map(|s| s.to_string())
1199 } else {
1200 None
1201 }
1202 })
1203 .collect();
1204
1205 // Update RepoSyncIndex
1206 let mut index = self.repo_sync_index.write().await;
1207 let needs = index.entry(repo_ref).or_default();
1208 needs.relays = relays;
1209 }
1210
1211 async fn handle_root_event(&self, event: &Event) {
1212 // Extract repo_ref from 'a' tag
1213 let repo_ref = event.tags.iter()
1214 .find_map(|tag| {
1215 if tag.kind() == TagKind::A {
1216 tag.content().map(|s| s.to_string())
1217 } else {
1218 None
1219 }
1220 });
1221
1222 if let Some(repo_ref) = repo_ref {
1223 let mut index = self.repo_sync_index.write().await;
1224 let needs = index.entry(repo_ref).or_default();
1225 needs.root_events.insert(event.id);
1226 }
1227 }
1228}
1229``` \ No newline at end of file
diff --git a/docs/explanation/state-structure-redesign-proposal.md b/docs/explanation/state-structure-redesign-proposal.md
new file mode 100644
index 0000000..0a27cf4
--- /dev/null
+++ b/docs/explanation/state-structure-redesign-proposal.md
@@ -0,0 +1,373 @@
1# State Structure Redesign Proposal v2
2
3## The Core Problem
4
5We need to transform:
6- **Repo Announcements** (30617) that list relays
7- **Root Events** (1617/1618/1619/1621) that tag repos
8
9Into:
10- **Per-relay subscriptions**: which repos and root events to sync from each relay
11
12And generate **RelayActions** when this mapping changes.
13
14---
15
16## Proposed Data Model
17
18### 1. RepoIndex (Primary Source of Truth)
19
20```rust
21/// Everything we know about repos we're tracking
22/// Key: repo addressable ref ("30617:pubkey:identifier")
23pub type RepoIndex = Arc<RwLock<HashMap<String, RepoInfo>>>;
24
25#[derive(Debug, Clone, Default)]
26pub struct RepoInfo {
27 /// Relay URLs listed in the repo's announcement
28 pub relays: HashSet<String>,
29 /// Root event IDs that reference this repo
30 pub root_events: HashSet<EventId>,
31}
32```
33
34**Updated by:** Database init, batch processing of new announcements/root events
35
36### 2. RelayIndex (Applied State)
37
38```rust
39/// What we've told each relay to sync
40/// Key: relay URL
41pub type RelayIndex = Arc<RwLock<HashMap<String, SyncTarget>>>;
42
43#[derive(Debug, Clone, Default, PartialEq)]
44pub struct SyncTarget {
45 /// Repos we're syncing for this relay
46 pub repos: HashSet<String>,
47 /// Root events we're tracking
48 pub root_events: HashSet<EventId>,
49}
50```
51
52**Updated by:** SyncManager after RelayActions are applied
53
54---
55
56## The Transformation
57
58```mermaid
59flowchart LR
60 subgraph Input
61 RA[Repo Announcements]
62 RE[Root Events]
63 end
64
65 subgraph RepoIndex
66 R1[repo_a: relays=X,Y events=1,2]
67 R2[repo_b: relays=Y,Z events=3]
68 end
69
70 subgraph Derived Target
71 T1[relay_X: repos=a events=1,2]
72 T2[relay_Y: repos=a,b events=1,2,3]
73 T3[relay_Z: repos=b events=3]
74 end
75
76 subgraph RelayIndex Applied
77 A1[relay_X: repos=a events=1,2]
78 A2[relay_Y: repos=a events=1,2]
79 end
80
81 RA --> R1
82 RA --> R2
83 RE --> R1
84 RE --> R2
85
86 R1 --> T1
87 R1 --> T2
88 R2 --> T2
89 R2 --> T3
90```
91
92The **diff** between Derived Target and RelayIndex produces RelayActions:
93- relay_Y needs AddFilters for repo_b and event 3
94- relay_Z needs SpawnRelay
95
96---
97
98## Algorithm: derive_target_from_repo_index
99
100```rust
101/// Derive what we SHOULD be syncing from the repo data
102fn derive_relay_targets(repo_index: &HashMap<String, RepoInfo>) -> HashMap<String, SyncTarget> {
103 let mut targets: HashMap<String, SyncTarget> = HashMap::new();
104
105 for (repo_ref, info) in repo_index {
106 // For each relay that lists this repo
107 for relay_url in &info.relays {
108 let target = targets.entry(relay_url.clone()).or_default();
109 target.repos.insert(repo_ref.clone());
110 target.root_events.extend(info.root_events.iter().cloned());
111 }
112 }
113
114 targets
115}
116```
117
118---
119
120## Algorithm: process_batch
121
122```rust
123async fn process_batch(&self, pending: &mut PendingUpdates) {
124 // ============================================
125 // STEP 1: Update RepoIndex from batch
126 // ============================================
127
128 let mut repo_index = self.repo_index.write().await;
129
130 // 1a. Process root events - add to repo's root_events set
131 for event in pending.root_events.drain(..) {
132 for repo_ref in extract_repo_refs(&event) {
133 repo_index.entry(repo_ref)
134 .or_default()
135 .root_events
136 .insert(event.id);
137 }
138 }
139
140 // 1b. Process announcements - update repo's relay set
141 for event in pending.announcements.drain(..) {
142 if !lists_our_service(&event) {
143 continue;
144 }
145 let repo_ref = build_repo_ref(&event);
146 let relay_urls: HashSet<String> = extract_relay_urls(&event)
147 .into_iter()
148 .filter(|url| !is_own_relay(url))
149 .collect();
150
151 // Replace relay set (handles updates that change relays)
152 repo_index.entry(repo_ref)
153 .or_default()
154 .relays = relay_urls;
155 }
156
157 // ============================================
158 // STEP 2: Derive target state from RepoIndex
159 // ============================================
160
161 let target = derive_relay_targets(&repo_index);
162 drop(repo_index); // Release write lock
163
164 // ============================================
165 // STEP 3: Diff target vs applied (RelayIndex)
166 // ============================================
167
168 let applied = self.relay_index.read().await;
169 let actions = compute_relay_actions(&target, &applied);
170 drop(applied); // Release read lock
171
172 // ============================================
173 // STEP 4: Send actions & update RelayIndex
174 // ============================================
175
176 for action in actions {
177 match &action {
178 RelayAction::SpawnRelay { relay_url, repos_and_root_events } => {
179 // Update RelayIndex with new relay
180 let mut applied = self.relay_index.write().await;
181 applied.insert(relay_url.clone(), SyncTarget {
182 repos: repos_and_root_events.keys().cloned().collect(),
183 root_events: repos_and_root_events.values()
184 .flat_map(|e| e.iter().cloned())
185 .collect(),
186 });
187 }
188 RelayAction::AddFilters { relay_url, repos_and_new_root_event } => {
189 // Update RelayIndex with additions
190 let mut applied = self.relay_index.write().await;
191 if let Some(target) = applied.get_mut(relay_url) {
192 for (repo, events) in repos_and_new_root_event {
193 target.repos.insert(repo.clone());
194 target.root_events.extend(events.iter().cloned());
195 }
196 }
197 }
198 }
199
200 // Send action to SyncManager
201 let _ = self.action_tx.send(action).await;
202 }
203}
204```
205
206---
207
208## Algorithm: compute_relay_actions
209
210```rust
211fn compute_relay_actions(
212 target: &HashMap<String, SyncTarget>,
213 applied: &HashMap<String, SyncTarget>,
214) -> Vec<RelayAction> {
215 let mut actions = Vec::new();
216
217 for (relay_url, target_state) in target {
218 match applied.get(relay_url) {
219 None => {
220 // New relay - spawn it
221 let mut repos_and_events = HashMap::new();
222 for repo in &target_state.repos {
223 // Get events for this specific repo
224 let events = target_state.root_events.clone(); // simplified
225 repos_and_events.insert(repo.clone(), events);
226 }
227 actions.push(RelayAction::SpawnRelay {
228 relay_url: relay_url.clone(),
229 repos_and_root_events: repos_and_events,
230 });
231 }
232 Some(applied_state) => {
233 // Existing relay - check for new repos/events
234 let new_repos: HashSet<_> = target_state.repos
235 .difference(&applied_state.repos)
236 .cloned()
237 .collect();
238 let new_events: HashSet<_> = target_state.root_events
239 .difference(&applied_state.root_events)
240 .cloned()
241 .collect();
242
243 if !new_repos.is_empty() || !new_events.is_empty() {
244 let mut repos_and_events = HashMap::new();
245 for repo in &new_repos {
246 repos_and_events.insert(repo.clone(), new_events.clone());
247 }
248 // Also handle new events for existing repos
249 if !new_events.is_empty() && new_repos.is_empty() {
250 for repo in &applied_state.repos {
251 repos_and_events.insert(repo.clone(), new_events.clone());
252 }
253 }
254
255 actions.push(RelayAction::AddFilters {
256 relay_url: relay_url.clone(),
257 repos_and_new_root_event: repos_and_events,
258 });
259 }
260 }
261 }
262 }
263
264 // Future: detect relay removal (in applied but not in target)
265
266 actions
267}
268```
269
270---
271
272## Handling Announcement Updates
273
274When an announcement is **updated** and changes its relay list:
275
276```mermaid
277flowchart TD
278 A[repo_a announcement updated] --> B[Old: relays X,Y]
279 B --> C[New: relays Y,Z]
280 C --> D[RepoIndex updated: repo_a.relays = Y,Z]
281 D --> E[derive_relay_targets]
282 E --> F[Target: X=empty, Y=repo_a, Z=repo_a]
283 F --> G[Diff with Applied: X=repo_a, Y=repo_a]
284 G --> H1[X: repo_a removed - future RemoveFilters]
285 G --> H2[Z: new relay - SpawnRelay]
286```
287
288The current RelayAction types only support growth (SpawnRelay, AddFilters). Removal would need a new `RemoveFilters` action type - this is a future enhancement.
289
290---
291
292## Name Mappings
293
294| Current | Proposed | Semantics |
295|---------|----------|-----------|
296| `FollowingRepoRootEvents` | `RepoIndex` | Per-repo: relays + root events |
297| `SyncRelays` | `RelayIndex` | Per-relay: what we're syncing (applied state) |
298| - | `SyncTarget` | Struct for repos + events |
299| - | `RepoInfo` | Struct for relay set + event set |
300
301---
302
303## Data Flow Summary
304
305```mermaid
306flowchart TB
307 subgraph Batch Input
308 RA[30617 Announcements]
309 RE[Root Events 1617-1621]
310 end
311
312 subgraph Step 1: Update Source
313 RI[RepoIndex]
314 end
315
316 subgraph Step 2: Derive Target
317 DT[derive_relay_targets]
318 TGT[Target HashMap]
319 end
320
321 subgraph Step 3: Diff
322 RLI[RelayIndex - Applied]
323 DIFF[compute_relay_actions]
324 end
325
326 subgraph Step 4: Apply
327 ACT[RelayActions]
328 SM[SyncManager]
329 end
330
331 RA --> RI
332 RE --> RI
333 RI --> DT
334 DT --> TGT
335 TGT --> DIFF
336 RLI --> DIFF
337 DIFF --> ACT
338 ACT --> SM
339 ACT --> |update| RLI
340```
341
342---
343
344## Files to Modify
345
346| File | Changes |
347|------|---------|
348| [`src/sync/mod.rs`](src/sync/mod.rs) | Replace type aliases with RepoIndex/RelayIndex + structs |
349| [`src/sync/self_subscriber.rs`](src/sync/self_subscriber.rs) | Rewrite process_batch with new algorithm |
350
351---
352
353## Questions for Approval
354
3551. **Naming**: Are `RepoIndex`/`RelayIndex` and `RepoInfo`/`SyncTarget` clear enough?
356
3572. **When to update RelayIndex**: Should we:
358 - (a) Update immediately when generating action (optimistic) ← proposed above
359 - (b) Update only after SyncManager confirms action succeeded
360
3613. **Bootstrap relay**: Keep special-casing it in RelayIndex (always present)?
362
3634. **Future work**: Add `RemoveFilters` action for relay removal, or defer?
364
365---
366
367## Benefits
368
3691. **Logical flow**: Source → Derived → Diff → Actions
3702. **Single source of truth**: RepoIndex is the authoritative data
3713. **Clear transformation**: `derive_relay_targets()` is a pure function
3724. **Handles updates**: Replacing `repo.relays` naturally handles announcement changes
3735. **Testable**: Each step can be unit tested independently \ No newline at end of file