upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync-v3.md
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
commit18ad93f8d0b8ce172c9c227385a21af66a507950 (patch)
tree275ada806570a2105f4e75388a565f61276209c8 /docs/explanation/grasp-02-proactive-sync-v3.md
parent4e5a937a4ef5288e702ba2bae3daf2a78398b690 (diff)
docs: remove old grasp-02 design doc versions
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v3.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v3.md871
1 files changed, 0 insertions, 871 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v3.md b/docs/explanation/grasp-02-proactive-sync-v3.md
deleted file mode 100644
index 30b3102..0000000
--- a/docs/explanation/grasp-02-proactive-sync-v3.md
+++ /dev/null
@@ -1,871 +0,0 @@
1# GRASP-02: Proactive Sync v3 - Event-Driven Design
2
3## Overview
4
5This document presents v3 of the proactive sync design. Key principles:
6
71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **Batch-based pending tracking** - Each batch confirms independently
93. **Single action type** - AddFilters only, auto-spawn connections
104. **Three-way state model** - RepoSyncIndex (want) → PendingSyncIndex (in-flight) → RelaySyncIndex (confirmed)
11
12---
13
14## Data Model
15
16### RepoSyncIndex (Source of Truth)
17
18```rust
19/// What we WANT to sync - derived from events received via self-subscription.
20/// Updated immediately when self-subscriber batch fires.
21/// Key: repo addressable ref ("30617:pubkey:identifier")
22pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
23
24#[derive(Debug, Clone, Default)]
25pub struct RepoSyncNeeds {
26 /// Relay URLs listed in this repo's 30617 announcement
27 pub relays: HashSet<String>,
28 /// Root event IDs (1617/1618/1619/1621) that reference this repo
29 pub root_events: HashSet<EventId>,
30}
31```
32
33### RelaySyncIndex (Confirmed State + Connection)
34
35```rust
36/// What we've CONFIRMED syncing - includes connection state for integrated lifecycle.
37/// Key: relay URL
38pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
39
40/// Connection status for a relay
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ConnectionStatus {
43 /// Not currently connected
44 Disconnected,
45 /// Connection attempt in progress
46 Connecting,
47 /// Successfully connected and subscribed
48 Connected,
49}
50
51/// Complete state for a single relay - combines sync needs with connection lifecycle
52#[derive(Debug)]
53pub struct RelayState {
54 /// Repos we've confirmed syncing from this relay
55 pub repos: HashSet<String>,
56 /// Root events we've confirmed tracking
57 pub root_events: HashSet<EventId>,
58 /// If true, never disconnect this relay
59 pub is_bootstrap: bool,
60 /// Current connection status
61 pub connection_status: ConnectionStatus,
62 /// When we last successfully connected (for since filter on reconnect)
63 pub last_connected: Option<Timestamp>,
64 /// When we disconnected (for 15-minute state retention rule)
65 pub disconnected_at: Option<Timestamp>,
66 /// The active connection (None if disconnected)
67 pub connection: Option<RelayConnection>,
68}
69
70impl RelayState {
71 /// Check if state should be cleared based on 15-minute rule
72 pub fn should_clear_state(&self) -> bool {
73 match self.disconnected_at {
74 Some(disconnected) => {
75 let now = Timestamp::now();
76 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
77 }
78 None => false, // Still connected or never connected
79 }
80 }
81
82 /// Clear repos and root_events (called when reconnect takes > 15 minutes)
83 pub fn clear_sync_state(&mut self) {
84 self.repos.clear();
85 self.root_events.clear();
86 }
87}
88```
89
90### PendingSyncIndex (In-Flight Batches)
91
92```rust
93/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
94/// Each batch has its own ID and can confirm independently.
95/// Key: relay URL
96pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
97
98#[derive(Debug, Clone)]
99pub struct PendingBatch {
100 /// Unique ID for this batch (for debugging/logging)
101 pub batch_id: u64,
102 /// The items this batch is syncing
103 pub items: PendingItems,
104 /// Subscription IDs that must ALL receive EOSE before confirming
105 pub outstanding_subs: HashSet<SubscriptionId>,
106}
107
108#[derive(Debug, Clone, Default)]
109pub struct PendingItems {
110 pub repos: HashSet<String>,
111 pub root_events: HashSet<EventId>,
112}
113```
114
115---
116
117## State Flow
118
119```mermaid
120flowchart TB
121 subgraph Input
122 SS[SelfSubscriber]
123 OWN[Own Relay]
124 end
125
126 subgraph RepoSyncIndex - Want
127 RSI[HashMap of Repo to Relays+Events]
128 end
129
130 subgraph Derived Target
131 DT[derive_relay_targets fn]
132 TGT[Per-relay: repos + events we should sync]
133 end
134
135 subgraph PendingSyncIndex - In Flight
136 PSI[Vec of PendingBatch per relay]
137 end
138
139 subgraph RelaySyncIndex - State + Connection
140 RLI[RelayState per relay]
141 CONN[connection: Option of RelayConnection]
142 STATUS[connection_status: Connected/Disconnected/Connecting]
143 REPOS[repos + root_events]
144 end
145
146 SS -->|subscribe| OWN
147 OWN -->|events| SS
148 SS -->|batch fires| RSI
149 RSI --> DT
150 DT --> TGT
151 TGT -->|diff: target - pending - confirmed| DIFF[Compute new items]
152 PSI --> DIFF
153 RLI --> DIFF
154 DIFF -->|skip if disconnected| CHECK{Connected?}
155 CHECK -->|yes| AF[AddFilters]
156 CHECK -->|no| QUEUE[Queued in RelayState.repos]
157 AF -->|subscribe| CONN
158 AF -->|create batch| PSI
159 CONN -->|EOSE| PSI
160 PSI -->|batch complete| REPOS
161 CONN -->|disconnect event| DISC[Mark Disconnected + set disconnected_at]
162 DISC -->|reconnect| RECONN[On Reconnect]
163 RECONN -->|check 15min rule| RULE{disconnected > 15min?}
164 RULE -->|yes| CLEAR[Clear repos/root_events]
165 RULE -->|no| RETAIN[Keep retained state]
166 CLEAR --> REGEN[Regenerate AddFilters from RepoSyncIndex]
167 RETAIN --> RESUB[Resubscribe with since filter]
168```
169
170### Connection Lifecycle Integration
171
172The `RelayState` struct now owns both the connection and sync state:
173
174```rust
175// On disconnect (detected via RelayPoolNotification::Shutdown or handle_notifications returning)
176fn handle_disconnect(&mut self, relay_url: &str) {
177 if let Some(state) = self.relay_sync_index.write().await.get_mut(relay_url) {
178 state.connection_status = ConnectionStatus::Disconnected;
179 state.disconnected_at = Some(Timestamp::now());
180 state.connection = None;
181
182 // Clear any pending batches for this relay
183 self.pending_sync_index.write().await.remove(relay_url);
184 }
185}
186
187// On reconnect
188async fn handle_reconnect(&mut self, relay_url: &str) -> Result<(), Error> {
189 let mut index = self.relay_sync_index.write().await;
190 let state = index.get_mut(relay_url).ok_or("Relay not in index")?;
191
192 // Apply 15-minute state retention rule
193 if state.should_clear_state() {
194 tracing::info!("Reconnect after >15min for {}, clearing state", relay_url);
195 state.clear_sync_state();
196 }
197
198 // Create new connection
199 state.connection_status = ConnectionStatus::Connecting;
200 let connection = RelayConnection::new(relay_url.to_string());
201
202 // Connect with since filter if we have last_connected
203 let since = state.last_connected.map(|ts| {
204 Timestamp::from(ts.as_u64().saturating_sub(900)) // -15 min buffer
205 });
206
207 connection.connect_and_subscribe_with_since(since).await?;
208
209 state.connection = Some(connection);
210 state.connection_status = ConnectionStatus::Connected;
211 state.last_connected = Some(Timestamp::now());
212 state.disconnected_at = None;
213
214 drop(index); // Release lock
215
216 // Regenerate AddFilters from current state (either retained or fresh from RepoSyncIndex)
217 self.regenerate_filters_for_relay(relay_url).await;
218
219 Ok(())
220}
221
222/// Regenerate AddFilters for a relay after reconnection
223async fn regenerate_filters_for_relay(&mut self, relay_url: &str) {
224 let repo_index = self.repo_sync_index.read().await;
225 let targets = derive_relay_targets(&repo_index);
226
227 if let Some(target) = targets.get(relay_url) {
228 // Build filters for everything this relay should sync
229 let filters = build_filters(&target.repos, &target.root_events);
230
231 // Create and process AddFilters action
232 let action = AddFilters {
233 relay_url: relay_url.to_string(),
234 repos: target.repos.clone(),
235 root_events: target.root_events.clone(),
236 filters,
237 };
238
239 self.handle_add_filters(action).await;
240 }
241}
242```
243
244---
245
246## Action Type
247
248```rust
249/// Action sent from SelfSubscriber to SyncManager.
250/// SyncManager auto-spawns relay connections if they don't exist.
251pub struct AddFilters {
252 pub relay_url: String,
253 /// Items this action covers (for pending tracking)
254 pub repos: HashSet<String>,
255 pub root_events: HashSet<EventId>,
256 /// Pre-batched filters (each with <= 100 tags)
257 pub filters: Vec<Filter>,
258}
259```
260
261---
262
263## Core Algorithms
264
265### 1. derive_relay_targets
266
267Transform RepoSyncIndex into per-relay sync targets:
268
269```rust
270fn derive_relay_targets(
271 repo_index: &HashMap<String, RepoSyncNeeds>
272) -> HashMap<String, RelaySyncNeeds> {
273 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
274
275 for (repo_ref, needs) in repo_index {
276 for relay_url in &needs.relays {
277 let target = targets.entry(relay_url.clone()).or_default();
278 target.repos.insert(repo_ref.clone());
279 target.root_events.extend(needs.root_events.iter().cloned());
280 }
281 }
282
283 targets
284}
285```
286
287### 2. compute_actions (Three-Way Diff)
288
289```rust
290fn compute_actions(
291 targets: &HashMap<String, RelaySyncNeeds>,
292 pending: &HashMap<String, Vec<PendingBatch>>,
293 confirmed: &HashMap<String, RelayState>,
294) -> Vec<AddFilters> {
295 let mut actions = Vec::new();
296
297 for (relay_url, target) in targets {
298 // Skip disconnected relays - they'll get AddFilters on reconnect
299 if let Some(state) = confirmed.get(relay_url) {
300 if state.connection_status != ConnectionStatus::Connected {
301 continue;
302 }
303 }
304
305 // Collect all pending items for this relay
306 let pending_repos: HashSet<_> = pending.get(relay_url)
307 .map(|batches| batches.iter()
308 .flat_map(|b| b.items.repos.iter().cloned())
309 .collect())
310 .unwrap_or_default();
311 let pending_events: HashSet<_> = pending.get(relay_url)
312 .map(|batches| batches.iter()
313 .flat_map(|b| b.items.root_events.iter().cloned())
314 .collect())
315 .unwrap_or_default();
316
317 // Collect confirmed items for this relay
318 let confirmed_repos = confirmed.get(relay_url)
319 .map(|c| &c.repos)
320 .unwrap_or(&HashSet::new());
321 let confirmed_events = confirmed.get(relay_url)
322 .map(|c| &c.root_events)
323 .unwrap_or(&HashSet::new());
324
325 // New = target - pending - confirmed
326 let new_repos: HashSet<_> = target.repos.iter()
327 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
328 .cloned()
329 .collect();
330 let new_events: HashSet<_> = target.root_events.iter()
331 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
332 .cloned()
333 .collect();
334
335 if !new_repos.is_empty() || !new_events.is_empty() {
336 let filters = build_filters(&new_repos, &new_events);
337 actions.push(AddFilters {
338 relay_url: relay_url.clone(),
339 repos: new_repos,
340 root_events: new_events,
341 filters,
342 });
343 }
344 }
345
346 actions
347}
348```
349
350### 3. handle_add_filters (SyncManager)
351
352```rust
353impl SyncManager {
354 async fn handle_add_filters(&mut self, action: AddFilters) {
355 let AddFilters { relay_url, repos, root_events, filters } = action;
356
357 // Auto-spawn connection if needed
358 if !self.connections.contains_key(&relay_url) {
359 self.spawn_connection(&relay_url).await;
360 }
361
362 let conn = self.connections.get(&relay_url).unwrap();
363
364 // Subscribe and collect subscription IDs
365 // nostr-sdk 0.44: subscribe returns Output<Vec<SubscriptionId>>
366 // since we're only subscribed to one relay per connection
367 let mut sub_ids = HashSet::new();
368 for filter in filters {
369 // cloned filter for each subscription call
370 match conn.client.subscribe(filter, None).await {
371 Ok(output) => {
372 // Output contains subscription IDs for each relay
373 for sub_id in output.val {
374 sub_ids.insert(sub_id);
375 }
376 }
377 Err(e) => {
378 tracing::warn!("Failed to subscribe: {}", e);
379 }
380 }
381 }
382
383 // Create pending batch
384 let batch = PendingBatch {
385 batch_id: self.next_batch_id(),
386 items: PendingItems { repos, root_events },
387 outstanding_subs: sub_ids,
388 };
389
390 // Add to pending index
391 self.pending_sync_index.write().await
392 .entry(relay_url)
393 .or_default()
394 .push(batch);
395 }
396}
397```
398
399### 4. handle_eose (Batch Completion)
400
401```rust
402impl SyncManager {
403 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
404 let mut pending = self.pending_sync_index.write().await;
405
406 if let Some(batches) = pending.get_mut(relay_url) {
407 // Find which batch this subscription belongs to
408 for batch in batches.iter_mut() {
409 if batch.outstanding_subs.remove(&sub_id) {
410 // Check if batch is now complete
411 if batch.outstanding_subs.is_empty() {
412 // Move items to confirmed
413 let items = batch.items.clone();
414 drop(pending); // Release lock before acquiring another
415
416 let mut confirmed = self.relay_sync_index.write().await;
417 let relay_confirmed = confirmed
418 .entry(relay_url.to_string())
419 .or_default();
420 relay_confirmed.repos.extend(items.repos);
421 relay_confirmed.root_events.extend(items.root_events);
422
423 tracing::info!(
424 "Batch {} complete for {} - confirmed {} repos, {} events",
425 batch.batch_id, relay_url,
426 items.repos.len(), items.root_events.len()
427 );
428 }
429 break;
430 }
431 }
432
433 // Clean up completed batches
434 if let Some(batches) = pending.get_mut(relay_url) {
435 batches.retain(|b| !b.outstanding_subs.is_empty());
436 }
437 }
438 }
439}
440```
441
442---
443
444## Self-Subscriber Flow
445
446### State Tracking
447
448```rust
449pub struct SelfSubscriber {
450 own_relay_url: String,
451 relay_domain: String,
452 repo_sync_index: RepoSyncIndex,
453 pending_sync_index: PendingSyncIndex,
454 relay_sync_index: RelaySyncIndex,
455 action_tx: mpsc::Sender<AddFilters>,
456 /// Timestamp of last successful connection - used for since filter on reconnection
457 last_connected: Option<Timestamp>,
458 /// Is this the first connection attempt since startup?
459 is_initial_connect: bool,
460}
461```
462
463### On Startup
464
465```rust
466impl SelfSubscriber {
467 async fn run(mut self) {
468 // Connect to own relay
469 let client = Client::new(Keys::generate());
470 client.add_relay(&self.own_relay_url).await?;
471 client.connect().await;
472
473 // Track connection time
474 self.last_connected = Some(Timestamp::now());
475
476 // Subscribe WITHOUT since filter (get all historical) on first connect
477 let filter = Filter::new().kinds([
478 Kind::Custom(30617), // Repository announcements
479 Kind::GitPatch, // 1617
480 Kind::Custom(1618), // PRs
481 Kind::Custom(1619), // PR updates
482 Kind::GitIssue, // 1621
483 ]);
484
485 client.subscribe(filter, None).await?;
486 self.is_initial_connect = false;
487
488 // Run event loop with batching
489 self.event_loop(&client).await;
490 }
491}
492```
493
494### On Reconnection
495
496```rust
497impl SelfSubscriber {
498 async fn reconnect(&mut self, client: &Client) -> Result<(), Error> {
499 // Reconnect to own relay
500 client.connect().await;
501
502 // On reconnection ONLY, use since filter based on last_connected
503 let since = match self.last_connected {
504 Some(ts) => Timestamp::from(ts.as_u64().saturating_sub(900)), // -15 minutes buffer
505 None => Timestamp::from(0), // Shouldn't happen, but fall back to full sync
506 };
507
508 // Update last_connected AFTER computing since
509 self.last_connected = Some(Timestamp::now());
510
511 let filter = Filter::new()
512 .kinds([
513 Kind::Custom(30617),
514 Kind::GitPatch,
515 Kind::Custom(1618),
516 Kind::Custom(1619),
517 Kind::GitIssue,
518 ])
519 .since(since);
520
521 client.subscribe(filter, None).await?;
522 Ok(())
523 }
524}
525```
526
527### Batching Logic
528
529```rust
530impl SelfSubscriber {
531 async fn event_loop(&self, client: &Client) {
532 let mut pending_events: Vec<Event> = Vec::new();
533 let mut batch_timer: Option<Instant> = None;
534 let batch_window = Duration::from_secs(5);
535
536 loop {
537 let timeout = batch_timer
538 .map(|t| batch_window.saturating_sub(t.elapsed()))
539 .unwrap_or(Duration::from_secs(60));
540
541 tokio::select! {
542 notification = client.notifications().recv() => {
543 if let Ok(RelayPoolNotification::Event { event, .. }) = notification {
544 pending_events.push(*event);
545
546 // Start timer on first event (does NOT reset)
547 if batch_timer.is_none() {
548 batch_timer = Some(Instant::now());
549 }
550 }
551 }
552 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
553 // Batch window elapsed
554 self.process_batch(pending_events.drain(..).collect()).await;
555 batch_timer = None;
556 }
557 }
558 }
559 }
560
561 async fn process_batch(&self, events: Vec<Event>) {
562 // 1. Update RepoSyncIndex
563 for event in events {
564 match event.kind.as_u16() {
565 30617 => self.handle_announcement(&event).await,
566 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
567 _ => {}
568 }
569 }
570
571 // 2. Derive targets and compute actions
572 let repo_index = self.repo_sync_index.read().await;
573 let targets = derive_relay_targets(&repo_index);
574
575 let pending = self.pending_sync_index.read().await;
576 let confirmed = self.relay_sync_index.read().await;
577
578 let actions = compute_actions(&targets, &pending, &confirmed);
579
580 drop(repo_index);
581 drop(pending);
582 drop(confirmed);
583
584 // 3. Send actions to SyncManager
585 for action in actions {
586 let _ = self.action_tx.send(action).await;
587 }
588 }
589}
590```
591
592---
593
594## Bootstrap Relay
595
596```rust
597impl SyncManager {
598 async fn initialize_bootstrap(&mut self) {
599 if let Some(url) = &self.config.bootstrap_relay_url {
600 // Pre-mark as bootstrap (never removed)
601 self.relay_sync_index.write().await.insert(
602 url.clone(),
603 RelaySyncNeeds {
604 repos: HashSet::new(),
605 root_events: HashSet::new(),
606 is_bootstrap: true,
607 }
608 );
609
610 // Send Layer 1 filter
611 let filters = vec![
612 Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)])
613 ];
614
615 self.handle_add_filters(AddFilters {
616 relay_url: url.clone(),
617 repos: HashSet::new(), // Layer 1 doesn't track specific repos
618 root_events: HashSet::new(),
619 filters,
620 }).await;
621 }
622 }
623}
624```
625
626---
627
628## Disconnect Handling
629
630Direct in SyncManager (not via action):
631
632```rust
633impl SyncManager {
634 async fn check_disconnects(&mut self) {
635 let confirmed = self.relay_sync_index.read().await;
636
637 for (relay_url, state) in confirmed.iter() {
638 if state.is_bootstrap {
639 continue; // Never disconnect bootstrap
640 }
641
642 if state.repos.is_empty() && state.root_events.is_empty() {
643 // No repos - disconnect
644 self.disconnect_relay(relay_url).await;
645 }
646 }
647 }
648
649 async fn disconnect_relay(&mut self, relay_url: &str) {
650 self.relay_sync_index.write().await.remove(relay_url);
651 self.pending_sync_index.write().await.remove(relay_url);
652
653 if let Some(conn) = self.connections.remove(relay_url) {
654 conn.disconnect().await;
655 }
656 }
657}
658```
659
660---
661
662## Relay Connection Lifecycle
663
664### State Machine for External Relays
665
666```mermaid
667stateDiagram-v2
668 [*] --> Connecting: spawn_connection
669 Connecting --> Connected: success
670 Connecting --> Backoff: failure
671 Connected --> Disconnected: connection lost
672 Connected --> [*]: intentional disconnect
673 Disconnected --> Backoff: record_failure
674 Backoff --> Connecting: backoff elapsed
675 Backoff --> Dead: 24h continuous failures
676 Dead --> Connecting: daily retry
677```
678
679### Health Integration
680
681Uses `RelayHealthTracker` from [`src/sync/health.rs`](../../src/sync/health.rs):
682
683```rust
684impl SyncManager {
685 /// Spawn a connection with health tracking
686 async fn spawn_connection(&mut self, relay_url: &str) {
687 // Check if we should attempt connection
688 if !self.health_tracker.should_attempt_connection(relay_url) {
689 let remaining = self.health_tracker.get_remaining_backoff(relay_url);
690 tracing::debug!(
691 "Skipping connection to {} - backoff {:?}",
692 relay_url,
693 remaining
694 );
695 return;
696 }
697
698 match self.try_connect(relay_url).await {
699 Ok(conn) => {
700 self.health_tracker.record_success(relay_url);
701 self.connections.insert(relay_url.to_string(), conn);
702 }
703 Err(e) => {
704 self.health_tracker.record_failure(relay_url);
705 tracing::warn!("Connection to {} failed: {}", relay_url, e);
706 }
707 }
708 }
709}
710```
711
712### Reconnection Loop
713
714Each relay connection runs its own reconnection loop:
715
716```rust
717impl RelayConnection {
718 async fn run_with_reconnection(
719 mut self,
720 health_tracker: Arc<RelayHealthTracker>,
721 event_tx: mpsc::Sender<RelayEvent>,
722 ) {
723 loop {
724 // Check backoff before attempting
725 if !health_tracker.should_attempt_connection(&self.url) {
726 if let Some(remaining) = health_tracker.get_remaining_backoff(&self.url) {
727 tokio::time::sleep(remaining).await;
728 continue;
729 }
730 }
731
732 // Attempt connection
733 match self.connect_and_subscribe().await {
734 Ok(()) => {
735 health_tracker.record_success(&self.url);
736
737 // Track when we connected for since filter on reconnect
738 let connected_at = Timestamp::now();
739
740 // Run event loop until disconnection
741 self.run_event_loop(&event_tx).await;
742
743 // Connection lost - will reconnect with since filter
744 health_tracker.record_failure(&self.url);
745
746 // On reconnect, use since = connected_at - 15 minutes
747 self.set_reconnect_since(connected_at);
748 }
749 Err(e) => {
750 health_tracker.record_failure(&self.url);
751 tracing::warn!("Connection to {} failed: {}", self.url, e);
752 }
753 }
754
755 // Get backoff duration and wait
756 let state = health_tracker.get_state(&self.url);
757 if state == HealthState::Dead {
758 // Dead relays retry once per 24 hours
759 tokio::time::sleep(Duration::from_secs(24 * 3600)).await;
760 }
761 // Otherwise, loop will check should_attempt_connection
762 }
763 }
764}
765```
766
767### Backoff Configuration
768
769From existing [`RelayHealthTracker`](../../src/sync/health.rs:91):
770
771| Parameter | Value | Notes |
772|-----------|-------|-------|
773| Base backoff | 5 seconds | First failure |
774| Backoff multiplier | 2x | Exponential increase |
775| Max backoff | 1 hour (configurable) | `sync_max_backoff_secs` |
776| Dead threshold | 24 hours | Continuous failures |
777| Dead retry interval | 24 hours | Once per day |
778
779---
780
781## Consolidation
782
783### Threshold-Based (70 filters)
784
785```rust
786impl SyncManager {
787 async fn maybe_consolidate(&mut self, relay_url: &str) {
788 let filter_count = self.get_filter_count(relay_url).await;
789
790 if filter_count > 70 {
791 self.consolidate(relay_url).await;
792 }
793 }
794
795 async fn consolidate(&mut self, relay_url: &str) {
796 // 1. Wait for all pending batches to complete
797 self.wait_pending_complete(relay_url).await;
798
799 // 2. Close all subscriptions
800 self.close_all_subs(relay_url).await;
801
802 // 3. Rebuild filters from confirmed state
803 let confirmed = self.relay_sync_index.read().await;
804 let state = confirmed.get(relay_url)?;
805 let filters = build_filters(&state.repos, &state.root_events);
806
807 // 4. Resubscribe with since = now - 15 minutes
808 let since = Timestamp::now() - 900;
809 for filter in filters {
810 self.subscribe(relay_url, filter.since(since)).await;
811 }
812 }
813}
814```
815
816### Daily Timer (23-25h Random)
817
818```rust
819impl SyncManager {
820 async fn run_daily_consolidation(&self) {
821 loop {
822 let hours = 23 + rand::random::<f64>() * 2.0;
823 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
824
825 for relay_url in self.connections.keys() {
826 self.consolidate(relay_url).await;
827 }
828 }
829 }
830}
831```
832
833---
834
835## Key Design Decisions
836
837| Decision | Choice | Rationale |
838|----------|--------|-----------|
839| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
840| Since filter | Only on reconnection | Initial subscribe gets full history |
841| Pending tracking | Per-batch with batch ID | Independent confirmation, no blocking |
842| EOSE requirement | All subs in batch must complete | Single repo may need multiple filter subs |
843| Action type | Struct not enum | Only one action type needed |
844| Relay spawning | Auto-spawn on AddFilters | Simplifies action logic |
845| Disconnect | Direct in SyncManager | Not worth an action type |
846| Consolidation | 70 filters + daily timer | Threshold for growth, timer for staleness |
847| Timestamps | In-memory only | Not critical for correctness |
848| Health tracking | Reuse existing RelayHealthTracker | Already implements exponential backoff, dead relay detection |
849| Reconnection backoff | Exponential to 1h max | Prevents hammering failed relays |
850| Dead relay policy | 24h threshold, daily retry | Balance between giving up and resource waste |
851| last_connected tracking | Per-connection in-memory | Enables 15-minute buffer on reconnect |
852| Connection ownership | Inside RelayState | Ties connection lifecycle to sync state, simpler than separate maps |
853| State retention rule | Clear if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
854| Skip disconnected | compute_actions skips disconnected | Prevents queuing AddFilters for offline relays |
855| Reconnect triggers | handle_notifications returns or Shutdown | nostr-sdk signals disconnect via event loop exit |
856| On-reconnect flow | Regenerate AddFilters from RepoSyncIndex | Fresh subscriptions for what we actually need |
857
858---
859
860## Module Structure
861
862```
863src/sync/
864├── mod.rs # SyncManager, main loop
865├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
866├── actions.rs # AddFilters struct, compute_actions
867├── self_subscriber.rs # SelfSubscriber, batching logic
868├── relay_connection.rs # Per-relay WebSocket connection
869├── consolidation.rs # Consolidation logic, daily timer
870├── health.rs # Health tracking (reuse from v2)
871└── metrics.rs # Prometheus metrics (reuse from v2) \ No newline at end of file