upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
commit18ad93f8d0b8ce172c9c227385a21af66a507950 (patch)
tree275ada806570a2105f4e75388a565f61276209c8 /docs
parent4e5a937a4ef5288e702ba2bae3daf2a78398b690 (diff)
docs: remove old grasp-02 design doc versions
Diffstat (limited to 'docs')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v2.md785
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v3.md871
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v4.md1330
-rw-r--r--docs/explanation/grasp-02-proactive-sync.md1811
4 files changed, 1106 insertions, 3691 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v2.md b/docs/explanation/grasp-02-proactive-sync-v2.md
deleted file mode 100644
index 311e93c..0000000
--- a/docs/explanation/grasp-02-proactive-sync-v2.md
+++ /dev/null
@@ -1,785 +0,0 @@
1# GRASP-02: Proactive Sync v2 - Simplified Design
2
3## Overview
4
5This document presents a simplified redesign of the proactive sync module. The key insight is that **all sync filters can be derived from two database queries**, with incremental updates via self-subscription.
6
7## Goals (Same as v1)
8
91. **Data Availability**: Ensure we have all relevant events for repositories we host
102. **Resilience**: Handle relay failures gracefully with backoff and health tracking
113. **Efficiency**: Minimize connections and bandwidth through filter consolidation
124. **Consistency**: Use unified filters for both live sync and catchup
13
14## Scale Targets & Upper Bounds
15
16This design targets the following scale:
17
18| Metric | Target | Notes |
19| ----------------------------- | -------- | ----------------------------------------- |
20| **Repositories** | 1,000 | Repos we host/track |
21| **Root events per repo** | 50 (avg) | PRs, Issues, Patches per repo |
22| **Total relays in ecosystem** | 100 | Unique relays across all repos |
23| **Relays per repo** | 5 (avg) | Relays listed in each repo's announcement |
24| **Total root events** | ~50,000 | 1,000 repos × 50 events |
25| **Sync connections** | ~50-100 | Based on relay overlap |
26
27**Memory Estimate (in-memory HashMaps):**
28
29- `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB
30- `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB
31- **Total in-memory state**: ~10 MB (well within acceptable limits)
32
33**Upper Bounds (redesign triggers):**
34
35- **10,000+ repos**: Consider database-backed state instead of in-memory HashMaps
36- **500+ sync relays**: Consider connection pooling or relay prioritization
37- **500+ root events per repo**: Consider per-repo pagination in Layer 3 filters
38- **Sustained >100 events/second**: Consider write batching to database
39
40Beyond these limits, the in-memory HashMap model may need to evolve to a database-backed approach with lazy loading.
41
42## Core Data Structures
43
44The entire sync filter state is captured in two HashMaps, initialized from database queries at startup:
45
46```rust
47/// Repository root events we're following
48/// Key: repo addressable reference (e.g., "30617:pubkey:identifier")
49/// Value: Set of event IDs (kinds 1617, 1618, 1619, 1621) that tag this repo
50///
51/// Note: May include a few extra repo refs that aren't in sync_relays.
52/// This is acceptable - we won't query other relays for them.
53type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>;
54
55/// Relays we sync from, including their repos and events
56/// Key: relay URL
57/// Value: Map of repo_ref -> event IDs for repos that list both this relay AND our service
58///
59/// Note: Bootstrap relay (if configured) is always present and excluded from removal logic.
60type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>;
61```
62
63## Architecture Overview
64
65```mermaid
66flowchart TB
67 subgraph Startup
68 DB[(Database)]
69 Q1[Query kinds 1617/1618/1619/1621]
70 Q2[Query kind 30617]
71 DB --> Q1 --> FRRE[following_repo_root_events]
72 DB --> Q2 --> SR[sync_relays]
73 BR[Bootstrap Relay] --> SR
74 end
75
76 subgraph SyncManager
77 SS[Self-Subscriber]
78 FRRE --> SS
79 SR --> SS
80 end
81
82 subgraph SyncRelays
83 R1[Relay Connection 1]
84 R2[Relay Connection 2]
85 RN[Relay Connection N]
86 end
87
88 SS -->|spawn/update| R1
89 SS -->|spawn/update| R2
90 SS -->|spawn/update| RN
91
92 R1 -->|events| AP[Acceptance Policy]
93 R2 -->|events| AP
94 RN -->|events| AP
95 AP -->|store| DB
96```
97
98## Module Structure
99
100The sync module is organized following the pattern used by `src/http/mod.rs` and `src/metrics/mod.rs` where the primary struct lives in `mod.rs`:
101
102```
103src/sync/
104├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays)
105├── self_subscriber.rs # SelfSubscriber struct and batching logic
106├── relay_connection.rs # Per-relay WebSocket connection management
107├── health.rs # RelayHealthTracker for backoff and dead relay detection
108└── metrics.rs # SyncMetrics for Prometheus integration
109```
110
111**Rationale:** The state type aliases (`FollowingRepoRootEvents`, `SyncRelays`) are simple `Arc<RwLock<HashMap<...>>>` wrappers owned by SyncManager. Rather than creating a separate `state.rs` for two type aliases, they are colocated with SyncManager in `mod.rs` to reduce file count while maintaining clarity.
112
113## Design Decision: No Jitter
114
115We considered adding jitter to prevent thundering herd scenarios when:
116
117- Multiple relay connections initialize simultaneously
118- Batched updates affect multiple relays
119- Filter consolidation triggers across connections
120
121**Decision: No jitter implemented.**
122
123**Rationale:**
124
125- Our GRASP server should handle the load of simultaneous operations
126- Jitter would lead to more orphan filters (filters added one at a time rather than atomically)
127- Jitter creates inefficiency - partial subscriptions miss events during the stagger window
128- The batching window (5s) already provides natural smoothing without the downsides
129
130## Health Tracking & Backoff
131
132```rust
133/// Health state machine for relay connections
134enum HealthState {
135 Healthy, // Connected and working
136 Backoff(u32), // Failed, attempt count for exponential backoff
137 Dead, // 24h+ continuous failures
138}
139
140impl RelayHealthTracker {
141 /// Backoff durations:
142 /// - Attempt 1: 5s
143 /// - Attempt 2: 10s
144 /// - Attempt 3: 20s
145 /// - Attempt 4: 40s
146 /// - ... exponential up to 1h max
147 /// - After reaching 1h, continue hourly until 24h total failure time
148 /// - After 24h: marked Dead, retry once per 24h
149 fn get_backoff(&self, relay_url: &str) -> Duration;
150}
151```
152
153| State | Retry Behavior |
154| ----------- | -------------------------------------------- |
155| **Healthy** | Immediate reconnect on disconnect |
156| **Backoff** | 5s → 10s → 20s → ... → 1h max (exponential) |
157| **Hourly** | Once per hour after hitting 1h cap |
158| **Dead** | After 24h total failures, retry once per 24h |
159
160## Startup Initialization
161
162At startup, two database queries initialize the sync state:
163
164```rust
165impl SyncManager {
166 async fn initialize_from_database(&mut self) -> Result<()> {
167 // Initialize bootstrap relay if configured (never removed)
168 if let Some(bootstrap_url) = &self.config.bootstrap_relay_url {
169 self.sync_relays.write().await.insert(
170 bootstrap_url.clone(),
171 HashMap::new() // Repos potentially populated below but may stay empty (Layer 1 only)
172 );
173 }
174
175 // Query 1: Build following_repo_root_events
176 // Find all 1617/1618/1619/1621 events and extract their repo references
177 let root_events = self.database
178 .query(Filter::new().kinds([
179 Kind::GitPatch, // 1617
180 Kind::Custom(1618), // PRs
181 Kind::Custom(1619), // PR updates
182 Kind::Custom(1621), // Issues
183 ]))
184 .await?;
185
186 for event in root_events {
187 // An event may have multiple 'a' tags pointing to different repos
188 let repo_refs = self.extract_all_repo_refs(&event);
189 for repo_ref in repo_refs {
190 self.following_repo_root_events
191 .write().await
192 .entry(repo_ref)
193 .or_default()
194 .insert(event.id);
195 }
196 }
197
198 // Query 2: Build sync_relays from kind 30617 announcements
199 let announcements = self.database
200 .query(Filter::new().kind(Kind::Custom(30617)))
201 .await?;
202
203 for event in announcements {
204 let repo_ref = self.build_repo_ref(&event);
205 let relay_urls = self.extract_relay_urls(&event);
206
207 // Only track repos that list BOTH a remote relay AND our service
208 if self.lists_our_service(&event) {
209 for relay_url in relay_urls {
210 if !self.is_own_relay(&relay_url) {
211 // Get events for this repo from following_repo_root_events
212 let events = self.following_repo_root_events
213 .read().await
214 .get(&repo_ref)
215 .cloned()
216 .unwrap_or_default();
217
218 self.sync_relays
219 .write().await
220 .entry(relay_url)
221 .or_default()
222 .insert(repo_ref.clone(), events);
223 }
224 }
225 }
226 }
227
228 Ok(())
229 }
230
231 /// Extract ALL repo refs from an event (it may tag multiple repos)
232 fn extract_all_repo_refs(&self, event: &Event) -> Vec<String> {
233 event.tags.iter()
234 .filter_map(|tag| {
235 let tag_vec = tag.clone().to_vec();
236 if tag_vec.len() >= 2 && tag_vec[0] == "a" {
237 // Validate it's a 30617 reference
238 if tag_vec[1].starts_with("30617:") {
239 Some(tag_vec[1].clone())
240 } else {
241 None
242 }
243 } else {
244 None
245 }
246 })
247 .collect()
248 }
249}
250```
251
252## Self-Subscriber: Event-Driven Updates
253
254A single self-subscriber watches for new events from **our own relay** and updates the HashMaps.
255
256**Important:** The self-subscriber does NOT subscribe to kind 30618 as this would never lead to refreshing the sync filters. Those events are synced from remote relays only (via Layer 1 filter on sync relay connections).
257
258### Batching Strategy
259
260The batch timer **starts only when the first event arrives**, not on a fixed interval. This prevents the scenario where an event arriving at second 4 of a 5-second interval only gets 1 second before the batch fires.
261
262**Important:** Once the batch timer starts, it does NOT reset when additional events arrive. The batch will fire exactly 5 seconds after the first event, regardless of how many subsequent events are queued. This ensures predictable latency and prevents indefinite batching during high-activity periods.
263
264```rust
265impl SelfSubscriber {
266 async fn run(&self) {
267 // Subscribe to our own relay for relevant kinds
268 // Note: 30618 NOT included - synced from remote relays only
269 let filter = Filter::new()
270 .kinds([
271 Kind::Custom(30617), // Repository announcements
272 Kind::GitPatch, // 1617 Patches
273 Kind::Custom(1618), // PRs
274 Kind::Custom(1619), // PR updates
275 Kind::Custom(1621), // Issues
276 ]);
277
278 let mut pending_updates: Vec<PendingUpdate> = Vec::new();
279 let mut batch_deadline: Option<Instant> = None;
280
281 loop {
282 let timeout = batch_deadline
283 .map(|d| d.saturating_duration_since(Instant::now()))
284 .unwrap_or(Duration::MAX);
285
286 tokio::select! {
287 Some(event) = self.event_receiver.recv() => {
288 pending_updates.push(self.classify_update(&event));
289
290 // Start batch timer on first event
291 if batch_deadline.is_none() {
292 batch_deadline = Some(Instant::now() + Duration::from_secs(5));
293 }
294 }
295 _ = tokio::time::sleep(timeout), if batch_deadline.is_some() => {
296 // Batch window elapsed - apply all pending updates
297 self.apply_batched_updates(pending_updates.drain(..).collect()).await;
298 batch_deadline = None;
299 }
300 }
301 }
302 }
303
304 fn classify_update(&self, event: &Event) -> PendingUpdate {
305 match event.kind.as_u16() {
306 30617 => PendingUpdate::NewAnnouncement(event.clone()),
307 1617 | 1618 | 1619 | 1621 => PendingUpdate::NewRootEvent(event.clone()),
308 _ => PendingUpdate::None,
309 }
310 }
311}
312```
313
314### Applying Batched Updates
315
316When the batch window closes, we process all pending updates together:
317
318```rust
319/// Batched updates grouped by relay
320struct RelayUpdateBatch {
321 /// New repo refs to subscribe to (Layer 2)
322 new_repo_refs: HashSet<String>,
323 /// New event IDs to subscribe to (Layer 3)
324 new_event_ids: HashSet<EventId>,
325 /// Whether this is a newly discovered relay
326 is_new_relay: bool,
327}
328
329impl SelfSubscriber {
330 async fn apply_batched_updates(&mut self, updates: Vec<PendingUpdate>) {
331 // Step 1: Process all updates and update HashMaps
332 // Build batched actions per relay
333 let mut relay_batches: HashMap<String, RelayUpdateBatch> = HashMap::new();
334
335 for update in updates {
336 match update {
337 PendingUpdate::NewAnnouncement(event) => {
338 self.process_announcement(&event, &mut relay_batches).await;
339 }
340 PendingUpdate::NewRootEvent(event) => {
341 self.process_root_event(&event, &mut relay_batches).await;
342 }
343 PendingUpdate::None => {}
344 }
345 }
346
347 // Step 2: Apply batched updates to each relay
348 for (relay_url, batch) in relay_batches {
349 self.apply_batch_to_relay(&relay_url, batch).await;
350 }
351
352 // Step 3: Check for relay removal (repos removed from announcements)
353 self.check_relay_removal().await;
354 }
355
356 async fn apply_batch_to_relay(&mut self, relay_url: &str, batch: RelayUpdateBatch) {
357 if batch.is_new_relay {
358 // Spawn new relay connection with full filters
359 self.spawn_sync_relay(relay_url.to_string()).await;
360 return;
361 }
362
363 // Build incremental filters for new content (NO since - get historical)
364 let incremental_filters = self.build_incremental_filters(&batch);
365
366 if incremental_filters.is_empty() {
367 return;
368 }
369
370 // Check if we need to consolidate
371 let current_filter_count = self.get_filter_count_for_relay(relay_url).await;
372 let new_filter_count = current_filter_count + incremental_filters.len();
373
374 // Note: 70 is a conservative threshold that may need tuning based on
375 // production observations. It was chosen to trigger consolidation earlier
376 // than v1's 150, but optimal value depends on relay behavior.
377 if new_filter_count > 70 {
378 // Consolidate: add incremental filters first (no since), wait for EOSE,
379 // then close all and resubscribe with consolidated filters (with since)
380 self.consolidate_relay_subscription(relay_url, incremental_filters).await;
381 } else {
382 // Just add incremental filters (no since - to get historical events)
383 self.send_filters_to_relay(relay_url, incremental_filters).await;
384 }
385 }
386
387 fn build_incremental_filters(&self, batch: &RelayUpdateBatch) -> Vec<Filter> {
388 let mut filters = Vec::new();
389
390 // Layer 2: New repo refs (for ALL kinds that tag repos with 'a' tags)
391 if !batch.new_repo_refs.is_empty() {
392 let refs: Vec<String> = batch.new_repo_refs.iter().cloned().collect();
393 for chunk in refs.chunks(100) {
394 // All kinds with lowercase 'a' tag
395 filters.push(
396 Filter::new()
397 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec())
398 );
399 // All kinds with uppercase 'A' tag
400 filters.push(
401 Filter::new()
402 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec())
403 );
404 // All kinds with 'q' tag (quote)
405 filters.push(
406 Filter::new()
407 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
408 );
409 }
410 }
411
412 // Layer 3: New event IDs
413 if !batch.new_event_ids.is_empty() {
414 let ids: Vec<String> = batch.new_event_ids.iter()
415 .map(|id| id.to_hex())
416 .collect();
417 for chunk in ids.chunks(100) {
418 filters.push(
419 Filter::new()
420 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec())
421 );
422 filters.push(
423 Filter::new()
424 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec())
425 );
426 filters.push(
427 Filter::new()
428 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
429 );
430 }
431 }
432
433 filters
434 }
435}
436```
437
438### Consolidation Strategy
439
440When consolidating, we need a two-phase approach:
441
4421. First, subscribe with incremental filters (no `since`) to get any historical events we missed
4432. After receiving EOSE, close all subscriptions and resubscribe with consolidated filters (with `since`)
444
445```rust
446async fn consolidate_relay_subscription(
447 &mut self,
448 relay_url: &str,
449 incremental_filters: Vec<Filter>,
450) {
451 // Phase 1: Add incremental filters WITHOUT since to catch up on new content
452 // These filters are for new repo_refs / event_ids we just discovered
453 let phase1_sub_id = self.send_filters_to_relay_and_wait_eose(
454 relay_url,
455 incremental_filters
456 ).await;
457
458 // Phase 2: After EOSE, consolidate everything
459 // Close ALL existing subscriptions for this relay
460 self.close_all_subscriptions(relay_url).await;
461
462 // Build fresh consolidated filters using current HashMap state
463 let consolidated_filters = self.build_three_layer_filters_for_relay(relay_url).await;
464
465 // Resubscribe with since = now - 15 minutes
466 let since = Timestamp::now() - 900;
467 let filters_with_since: Vec<Filter> = consolidated_filters
468 .into_iter()
469 .map(|f| f.since(since))
470 .collect();
471
472 self.send_filters_to_relay(relay_url, filters_with_since).await;
473}
474```
475
476## Daily Full Catchup
477
478To capture events that may have taken longer than 15 minutes to propagate through the nostr network, we perform a daily full catchup:
479
480```rust
481impl SyncManager {
482 /// Runs approximately every 24 hours per relay connection
483 async fn daily_catchup(&mut self, relay_url: &str) {
484 // Close all current subscriptions for this relay
485 self.close_all_subscriptions(relay_url).await;
486
487 // Rebuild fresh filters from current HashMap state
488 let filters = self.build_three_layer_filters_for_relay(relay_url).await;
489
490 // Subscribe WITHOUT since filter to get full historical sync
491 for filter in filters {
492 self.subscribe_to_relay(relay_url, filter).await;
493 }
494
495 // After EOSE, switch back to live mode with since filter
496 self.wait_for_eose(relay_url).await;
497
498 // Re-add since filter for ongoing live sync
499 let since = Timestamp::now() - 900; // 15 minutes ago
500 self.resubscribe_with_since(relay_url, since).await;
501 }
502}
503```
504
505**Rationale:** The 15-minute reconnection window is standard for nostr event propagation, but some events may take longer. Rather than increasing the window (which would cause more duplicate processing), we do a daily full catchup to ensure nothing is missed. This adds minimal complexity while providing comprehensive coverage.
506
507## Sync Relay Connections
508
509Each sync relay connection uses the three-layer filter strategy:
510
511```rust
512impl SyncRelayConnection {
513 async fn start(&mut self) {
514 loop {
515 match self.connect_and_subscribe().await {
516 Ok(()) => {
517 // Record successful connection
518 self.last_successful_connection = Instant::now();
519 self.health_tracker.record_success(&self.url);
520
521 // Run event loop until disconnect
522 self.run_event_loop().await;
523 }
524 Err(e) => {
525 self.health_tracker.record_failure(&self.url);
526 }
527 }
528
529 // Reconnect with backoff and since filter
530 let backoff = self.health_tracker.get_backoff(&self.url);
531 tokio::time::sleep(backoff).await;
532
533 // On reconnect, use since = last_successful - 15 minutes
534 self.reconnect_since = Some(
535 Timestamp::from(self.last_successful_connection - Duration::from_secs(900))
536 );
537 }
538 }
539
540 async fn connect_and_subscribe(&mut self) -> Result<()> {
541 self.client.connect().await?;
542
543 let filters = self.build_three_layer_filters().await;
544
545 // Apply since filter if reconnecting
546 let filters = if let Some(since) = self.reconnect_since {
547 filters.into_iter().map(|f| f.since(since)).collect()
548 } else {
549 filters
550 };
551
552 for filter in filters {
553 self.client.subscribe(filter, None).await?;
554 }
555
556 Ok(())
557 }
558}
559```
560
561## Three-Layer Filter Strategy
562
563```rust
564impl SyncRelayConnection {
565 async fn build_three_layer_filters(&self) -> Vec<Filter> {
566 let mut filters = Vec::new();
567
568 // Get repos for this relay
569 let repos = self.sync_relays.read().await
570 .get(&self.url)
571 .cloned()
572 .unwrap_or_default();
573
574 // Layer 1: Announcements (kinds 30617 + 30618)
575 // Note: 30618 is ONLY synced from remote relays, not self-subscribed
576 // Always included even if relay has no repos (bootstrap relay case)
577 filters.push(
578 Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)])
579 );
580
581 // Layer 2: Events tagging repos with 'a' tags (ALL kinds)
582 // Batched per 100 repo refs
583 let repo_refs: Vec<String> = repos.keys().cloned().collect();
584 for chunk in repo_refs.chunks(100) {
585 filters.push(
586 Filter::new()
587 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec())
588 );
589 filters.push(
590 Filter::new()
591 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec())
592 );
593 filters.push(
594 Filter::new()
595 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
596 );
597 }
598
599 // Layer 3: Events tagging root events (batch per 100 event IDs)
600 let all_event_ids: HashSet<EventId> = repos.values()
601 .flat_map(|ids| ids.iter().cloned())
602 .collect();
603
604 let event_id_strs: Vec<String> = all_event_ids
605 .iter()
606 .map(|id| id.to_hex())
607 .collect();
608
609 for chunk in event_id_strs.chunks(100) {
610 filters.push(
611 Filter::new()
612 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec())
613 );
614 filters.push(
615 Filter::new()
616 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec())
617 );
618 filters.push(
619 Filter::new()
620 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
621 );
622 }
623
624 filters
625 }
626}
627```
628
629## Relay Removal
630
631```rust
632async fn check_relay_removal(&mut self) {
633 let relays_to_check: Vec<String> = self.sync_relays.read().await
634 .keys()
635 .cloned()
636 .collect();
637
638 for relay_url in relays_to_check {
639 // Never remove bootstrap relay
640 if Some(relay_url.as_str()) == self.config.bootstrap_relay_url.as_deref() {
641 continue;
642 }
643
644 // Check if relay has any repos left
645 let should_remove = {
646 let sync_relays = self.sync_relays.read().await;
647 sync_relays.get(&relay_url)
648 .map(|repos| repos.is_empty())
649 .unwrap_or(true)
650 };
651
652 if should_remove {
653 // Remove from HashMap
654 self.sync_relays.write().await.remove(&relay_url);
655
656 // Close connection
657 self.close_relay_connection(&relay_url).await;
658 }
659 }
660}
661```
662
663## Prometheus Metrics (Same as v1)
664
665| Metric | Type | Labels | Description |
666| ------------------------------------- | ------- | ------------- | ----------------------- |
667| `ngit_sync_relay_connected` | Gauge | relay | Connection status 1/0 |
668| `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome |
669| `ngit_sync_relay_status` | Gauge | relay | Health state 1/2/3 |
670| `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures |
671| `ngit_sync_events_total` | Counter | source | Events by source type |
672| `ngit_sync_gap_events_total` | Counter | relay | Gap events filled |
673| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered |
674| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected |
675| `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count |
676
677## Module Structure (Simplified)
678
679```
680src/sync/
681├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays)
682├── self_subscriber.rs # SelfSubscriber + batching logic
683├── relay_connection.rs # Per-relay WebSocket + filters
684├── health.rs # RelayHealthTracker (reuse from v1)
685└── metrics.rs # SyncMetrics (reuse from v1)
686```
687
688> **Note:** SyncManager and state type aliases are colocated in `mod.rs` following the pattern of `src/http/mod.rs` (HttpService) and `src/metrics/mod.rs` (Metrics). See the earlier "Module Structure" section for rationale.
689
690## Comparison: v1 vs v2
691
692| Aspect | v1 (Current) | v2 (Simplified) |
693| ------------------- | ------------------------------------------------------------------ | --------------------------------------------- |
694| **State Model** | Spread across FilterService, SubscriptionManager, ConnectionState | Two HashMaps derived from DB |
695| **Relay Discovery** | Multiple paths: bootstrap, DB query, self-subscribe, remote events | Single path: DB init + self-subscribe |
696| **Filter Updates** | Dynamic per-event subscription adds | Batched updates (5s window, starts on event) |
697| **Consolidation** | Per-connection at 150 filters | Per-connection at 70 filters |
698| **Batching** | Per 100 tags | Per 100 tags |
699| **Reconnection** | Various backoff strategies | Unified: since = last_success - 15min |
700| **Jitter** | Startup jitter | None (see design decision) |
701| **30618 Handling** | Synced everywhere | Remote relays only, not self-subscribed |
702| **1621 (Issues)** | Not included | Included with 1617/1618/1619 |
703| **Layer 2 Scope** | Specific NIP-34 kinds | ALL kinds with 'a' tags |
704| **Health Backoff** | Variable | 5s → exp → 1h max → hourly → dead@24h → daily |
705
706## Key Design Decisions
707
7081. **Single Source of Truth**: Two HashMaps represent all sync state, initialized from database
7092. **Event-Driven Updates**: Self-subscriber updates HashMaps; relay connections read from them
7103. **Batched Filter Updates**: 5-second window that starts on first event (timer does NOT reset on subsequent events)
7114. **Uniform Reconnection**: Always use `since = last_successful - 15min`
7125. **No Jitter**: Trade-offs not worth it - orphan filters and inefficiency outweigh thundering herd concerns
7136. **Bootstrap Relay Protected**: Never removed from sync_relays - ensures at least one sync connection exists even when no repositories currently list our service (cold start / recovery scenario)
7147. **30618 Remote-Only**: Maintainer state synced from remote relays, not self-subscribed
7158. **70 Filter Consolidation Threshold**: Lower than v1's 150 for earlier consolidation (conservative value that may need tuning based on production observation)
7169. **100-Tag Batching**: Consistent batch size for Layer 2 and Layer 3 filters
71710. **Layer 2 All Kinds**: Subscribe to ALL events with 'a' tags, not just NIP-34 kinds
71811. **Two-Phase Consolidation**: Incremental filters WITHOUT since first, then consolidated WITH since
71912. **Multiple Repo Refs**: Handle events that tag multiple repos correctly
72013. **Daily Full Catchup**: Periodic sync restart without `since` filter (~24h) to catch slow-propagating events
72114. **Dual DB Queries at Startup**: Separate queries for root events and announcements. Could be combined into a single query, but some relays cache by kind which may make separate queries more efficient. Trade-off deferred for future optimization.
722
723---
724
725## Detailed Flow Diagram
726
727```mermaid
728sequenceDiagram
729 participant DB as Database
730 participant SM as SyncManager
731 participant SS as Self-Subscriber
732 participant RC as RelayConnection
733
734 Note over SM: Startup
735 SM->>SM: Add bootstrap relay to sync_relays
736 SM->>DB: Query kinds 1617/1618/1619/1621
737 DB-->>SM: Root events
738 SM->>SM: Build following_repo_root_events
739 SM->>SM: Handle multi-repo events
740
741 SM->>DB: Query kind 30617
742 DB-->>SM: Announcements
743 SM->>SM: Build sync_relays
744
745 SM->>RC: Spawn connections for each relay
746 RC->>RC: Build 3-layer filters from sync_relays
747 RC->>RC: Connect and subscribe
748
749 Note over SS: Event-Driven Updates
750 SS->>SS: Subscribe to 30617/1617/1618/1619/1621
751 SS->>SS: Receive event - start 5s batch timer
752 SS->>SS: Collect more events in batch window
753 SS->>SS: Batch window closes
754 SS->>SM: Apply batched updates to HashMaps
755
756 alt New Relay Discovered
757 SM->>RC: Spawn new connection
758 else New Content for Existing Relay
759 alt Under 70 filter limit
760 SM->>RC: Add incremental filter - no since
761 else Over 70 filter limit
762 SM->>RC: Add incremental filters - no since
763 RC-->>SM: EOSE received
764 SM->>RC: Close all subscriptions
765 SM->>RC: Resubscribe consolidated - with since
766 end
767 else Relay Has No More Repos
768 alt Is Bootstrap Relay
769 SM->>SM: Keep connection - Layer 1 only
770 else Not Bootstrap Relay
771 SM->>RC: Close connection
772 end
773 end
774
775 Note over RC: Connection Lifecycle
776 RC->>RC: Process incoming events
777 RC->>DB: Store via acceptance policy
778
779 RC->>RC: Connection drops
780 RC->>RC: Wait backoff - 5s to 1h exponential
781 RC->>RC: Reconnect with since = last_success - 15min
782
783 Note over RC: If failures continue 24h
784 RC->>RC: Mark dead - retry once per 24h
785```
diff --git a/docs/explanation/grasp-02-proactive-sync-v3.md b/docs/explanation/grasp-02-proactive-sync-v3.md
deleted file mode 100644
index 30b3102..0000000
--- a/docs/explanation/grasp-02-proactive-sync-v3.md
+++ /dev/null
@@ -1,871 +0,0 @@
1# GRASP-02: Proactive Sync v3 - Event-Driven Design
2
3## Overview
4
5This document presents v3 of the proactive sync design. Key principles:
6
71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **Batch-based pending tracking** - Each batch confirms independently
93. **Single action type** - AddFilters only, auto-spawn connections
104. **Three-way state model** - RepoSyncIndex (want) → PendingSyncIndex (in-flight) → RelaySyncIndex (confirmed)
11
12---
13
14## Data Model
15
16### RepoSyncIndex (Source of Truth)
17
18```rust
19/// What we WANT to sync - derived from events received via self-subscription.
20/// Updated immediately when self-subscriber batch fires.
21/// Key: repo addressable ref ("30617:pubkey:identifier")
22pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
23
24#[derive(Debug, Clone, Default)]
25pub struct RepoSyncNeeds {
26 /// Relay URLs listed in this repo's 30617 announcement
27 pub relays: HashSet<String>,
28 /// Root event IDs (1617/1618/1619/1621) that reference this repo
29 pub root_events: HashSet<EventId>,
30}
31```
32
33### RelaySyncIndex (Confirmed State + Connection)
34
35```rust
36/// What we've CONFIRMED syncing - includes connection state for integrated lifecycle.
37/// Key: relay URL
38pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
39
40/// Connection status for a relay
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ConnectionStatus {
43 /// Not currently connected
44 Disconnected,
45 /// Connection attempt in progress
46 Connecting,
47 /// Successfully connected and subscribed
48 Connected,
49}
50
51/// Complete state for a single relay - combines sync needs with connection lifecycle
52#[derive(Debug)]
53pub struct RelayState {
54 /// Repos we've confirmed syncing from this relay
55 pub repos: HashSet<String>,
56 /// Root events we've confirmed tracking
57 pub root_events: HashSet<EventId>,
58 /// If true, never disconnect this relay
59 pub is_bootstrap: bool,
60 /// Current connection status
61 pub connection_status: ConnectionStatus,
62 /// When we last successfully connected (for since filter on reconnect)
63 pub last_connected: Option<Timestamp>,
64 /// When we disconnected (for 15-minute state retention rule)
65 pub disconnected_at: Option<Timestamp>,
66 /// The active connection (None if disconnected)
67 pub connection: Option<RelayConnection>,
68}
69
70impl RelayState {
71 /// Check if state should be cleared based on 15-minute rule
72 pub fn should_clear_state(&self) -> bool {
73 match self.disconnected_at {
74 Some(disconnected) => {
75 let now = Timestamp::now();
76 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
77 }
78 None => false, // Still connected or never connected
79 }
80 }
81
82 /// Clear repos and root_events (called when reconnect takes > 15 minutes)
83 pub fn clear_sync_state(&mut self) {
84 self.repos.clear();
85 self.root_events.clear();
86 }
87}
88```
89
90### PendingSyncIndex (In-Flight Batches)
91
92```rust
93/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
94/// Each batch has its own ID and can confirm independently.
95/// Key: relay URL
96pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
97
98#[derive(Debug, Clone)]
99pub struct PendingBatch {
100 /// Unique ID for this batch (for debugging/logging)
101 pub batch_id: u64,
102 /// The items this batch is syncing
103 pub items: PendingItems,
104 /// Subscription IDs that must ALL receive EOSE before confirming
105 pub outstanding_subs: HashSet<SubscriptionId>,
106}
107
108#[derive(Debug, Clone, Default)]
109pub struct PendingItems {
110 pub repos: HashSet<String>,
111 pub root_events: HashSet<EventId>,
112}
113```
114
115---
116
117## State Flow
118
119```mermaid
120flowchart TB
121 subgraph Input
122 SS[SelfSubscriber]
123 OWN[Own Relay]
124 end
125
126 subgraph RepoSyncIndex - Want
127 RSI[HashMap of Repo to Relays+Events]
128 end
129
130 subgraph Derived Target
131 DT[derive_relay_targets fn]
132 TGT[Per-relay: repos + events we should sync]
133 end
134
135 subgraph PendingSyncIndex - In Flight
136 PSI[Vec of PendingBatch per relay]
137 end
138
139 subgraph RelaySyncIndex - State + Connection
140 RLI[RelayState per relay]
141 CONN[connection: Option of RelayConnection]
142 STATUS[connection_status: Connected/Disconnected/Connecting]
143 REPOS[repos + root_events]
144 end
145
146 SS -->|subscribe| OWN
147 OWN -->|events| SS
148 SS -->|batch fires| RSI
149 RSI --> DT
150 DT --> TGT
151 TGT -->|diff: target - pending - confirmed| DIFF[Compute new items]
152 PSI --> DIFF
153 RLI --> DIFF
154 DIFF -->|skip if disconnected| CHECK{Connected?}
155 CHECK -->|yes| AF[AddFilters]
156 CHECK -->|no| QUEUE[Queued in RelayState.repos]
157 AF -->|subscribe| CONN
158 AF -->|create batch| PSI
159 CONN -->|EOSE| PSI
160 PSI -->|batch complete| REPOS
161 CONN -->|disconnect event| DISC[Mark Disconnected + set disconnected_at]
162 DISC -->|reconnect| RECONN[On Reconnect]
163 RECONN -->|check 15min rule| RULE{disconnected > 15min?}
164 RULE -->|yes| CLEAR[Clear repos/root_events]
165 RULE -->|no| RETAIN[Keep retained state]
166 CLEAR --> REGEN[Regenerate AddFilters from RepoSyncIndex]
167 RETAIN --> RESUB[Resubscribe with since filter]
168```
169
170### Connection Lifecycle Integration
171
172The `RelayState` struct now owns both the connection and sync state:
173
174```rust
175// On disconnect (detected via RelayPoolNotification::Shutdown or handle_notifications returning)
176fn handle_disconnect(&mut self, relay_url: &str) {
177 if let Some(state) = self.relay_sync_index.write().await.get_mut(relay_url) {
178 state.connection_status = ConnectionStatus::Disconnected;
179 state.disconnected_at = Some(Timestamp::now());
180 state.connection = None;
181
182 // Clear any pending batches for this relay
183 self.pending_sync_index.write().await.remove(relay_url);
184 }
185}
186
187// On reconnect
188async fn handle_reconnect(&mut self, relay_url: &str) -> Result<(), Error> {
189 let mut index = self.relay_sync_index.write().await;
190 let state = index.get_mut(relay_url).ok_or("Relay not in index")?;
191
192 // Apply 15-minute state retention rule
193 if state.should_clear_state() {
194 tracing::info!("Reconnect after >15min for {}, clearing state", relay_url);
195 state.clear_sync_state();
196 }
197
198 // Create new connection
199 state.connection_status = ConnectionStatus::Connecting;
200 let connection = RelayConnection::new(relay_url.to_string());
201
202 // Connect with since filter if we have last_connected
203 let since = state.last_connected.map(|ts| {
204 Timestamp::from(ts.as_u64().saturating_sub(900)) // -15 min buffer
205 });
206
207 connection.connect_and_subscribe_with_since(since).await?;
208
209 state.connection = Some(connection);
210 state.connection_status = ConnectionStatus::Connected;
211 state.last_connected = Some(Timestamp::now());
212 state.disconnected_at = None;
213
214 drop(index); // Release lock
215
216 // Regenerate AddFilters from current state (either retained or fresh from RepoSyncIndex)
217 self.regenerate_filters_for_relay(relay_url).await;
218
219 Ok(())
220}
221
222/// Regenerate AddFilters for a relay after reconnection
223async fn regenerate_filters_for_relay(&mut self, relay_url: &str) {
224 let repo_index = self.repo_sync_index.read().await;
225 let targets = derive_relay_targets(&repo_index);
226
227 if let Some(target) = targets.get(relay_url) {
228 // Build filters for everything this relay should sync
229 let filters = build_filters(&target.repos, &target.root_events);
230
231 // Create and process AddFilters action
232 let action = AddFilters {
233 relay_url: relay_url.to_string(),
234 repos: target.repos.clone(),
235 root_events: target.root_events.clone(),
236 filters,
237 };
238
239 self.handle_add_filters(action).await;
240 }
241}
242```
243
244---
245
246## Action Type
247
248```rust
249/// Action sent from SelfSubscriber to SyncManager.
250/// SyncManager auto-spawns relay connections if they don't exist.
251pub struct AddFilters {
252 pub relay_url: String,
253 /// Items this action covers (for pending tracking)
254 pub repos: HashSet<String>,
255 pub root_events: HashSet<EventId>,
256 /// Pre-batched filters (each with <= 100 tags)
257 pub filters: Vec<Filter>,
258}
259```
260
261---
262
263## Core Algorithms
264
265### 1. derive_relay_targets
266
267Transform RepoSyncIndex into per-relay sync targets:
268
269```rust
270fn derive_relay_targets(
271 repo_index: &HashMap<String, RepoSyncNeeds>
272) -> HashMap<String, RelaySyncNeeds> {
273 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
274
275 for (repo_ref, needs) in repo_index {
276 for relay_url in &needs.relays {
277 let target = targets.entry(relay_url.clone()).or_default();
278 target.repos.insert(repo_ref.clone());
279 target.root_events.extend(needs.root_events.iter().cloned());
280 }
281 }
282
283 targets
284}
285```
286
287### 2. compute_actions (Three-Way Diff)
288
289```rust
290fn compute_actions(
291 targets: &HashMap<String, RelaySyncNeeds>,
292 pending: &HashMap<String, Vec<PendingBatch>>,
293 confirmed: &HashMap<String, RelayState>,
294) -> Vec<AddFilters> {
295 let mut actions = Vec::new();
296
297 for (relay_url, target) in targets {
298 // Skip disconnected relays - they'll get AddFilters on reconnect
299 if let Some(state) = confirmed.get(relay_url) {
300 if state.connection_status != ConnectionStatus::Connected {
301 continue;
302 }
303 }
304
305 // Collect all pending items for this relay
306 let pending_repos: HashSet<_> = pending.get(relay_url)
307 .map(|batches| batches.iter()
308 .flat_map(|b| b.items.repos.iter().cloned())
309 .collect())
310 .unwrap_or_default();
311 let pending_events: HashSet<_> = pending.get(relay_url)
312 .map(|batches| batches.iter()
313 .flat_map(|b| b.items.root_events.iter().cloned())
314 .collect())
315 .unwrap_or_default();
316
317 // Collect confirmed items for this relay
318 let confirmed_repos = confirmed.get(relay_url)
319 .map(|c| &c.repos)
320 .unwrap_or(&HashSet::new());
321 let confirmed_events = confirmed.get(relay_url)
322 .map(|c| &c.root_events)
323 .unwrap_or(&HashSet::new());
324
325 // New = target - pending - confirmed
326 let new_repos: HashSet<_> = target.repos.iter()
327 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
328 .cloned()
329 .collect();
330 let new_events: HashSet<_> = target.root_events.iter()
331 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
332 .cloned()
333 .collect();
334
335 if !new_repos.is_empty() || !new_events.is_empty() {
336 let filters = build_filters(&new_repos, &new_events);
337 actions.push(AddFilters {
338 relay_url: relay_url.clone(),
339 repos: new_repos,
340 root_events: new_events,
341 filters,
342 });
343 }
344 }
345
346 actions
347}
348```
349
350### 3. handle_add_filters (SyncManager)
351
352```rust
353impl SyncManager {
354 async fn handle_add_filters(&mut self, action: AddFilters) {
355 let AddFilters { relay_url, repos, root_events, filters } = action;
356
357 // Auto-spawn connection if needed
358 if !self.connections.contains_key(&relay_url) {
359 self.spawn_connection(&relay_url).await;
360 }
361
362 let conn = self.connections.get(&relay_url).unwrap();
363
364 // Subscribe and collect subscription IDs
365 // nostr-sdk 0.44: subscribe returns Output<Vec<SubscriptionId>>
366 // since we're only subscribed to one relay per connection
367 let mut sub_ids = HashSet::new();
368 for filter in filters {
369 // cloned filter for each subscription call
370 match conn.client.subscribe(filter, None).await {
371 Ok(output) => {
372 // Output contains subscription IDs for each relay
373 for sub_id in output.val {
374 sub_ids.insert(sub_id);
375 }
376 }
377 Err(e) => {
378 tracing::warn!("Failed to subscribe: {}", e);
379 }
380 }
381 }
382
383 // Create pending batch
384 let batch = PendingBatch {
385 batch_id: self.next_batch_id(),
386 items: PendingItems { repos, root_events },
387 outstanding_subs: sub_ids,
388 };
389
390 // Add to pending index
391 self.pending_sync_index.write().await
392 .entry(relay_url)
393 .or_default()
394 .push(batch);
395 }
396}
397```
398
399### 4. handle_eose (Batch Completion)
400
401```rust
402impl SyncManager {
403 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
404 let mut pending = self.pending_sync_index.write().await;
405
406 if let Some(batches) = pending.get_mut(relay_url) {
407 // Find which batch this subscription belongs to
408 for batch in batches.iter_mut() {
409 if batch.outstanding_subs.remove(&sub_id) {
410 // Check if batch is now complete
411 if batch.outstanding_subs.is_empty() {
412 // Move items to confirmed
413 let items = batch.items.clone();
414 drop(pending); // Release lock before acquiring another
415
416 let mut confirmed = self.relay_sync_index.write().await;
417 let relay_confirmed = confirmed
418 .entry(relay_url.to_string())
419 .or_default();
420 relay_confirmed.repos.extend(items.repos);
421 relay_confirmed.root_events.extend(items.root_events);
422
423 tracing::info!(
424 "Batch {} complete for {} - confirmed {} repos, {} events",
425 batch.batch_id, relay_url,
426 items.repos.len(), items.root_events.len()
427 );
428 }
429 break;
430 }
431 }
432
433 // Clean up completed batches
434 if let Some(batches) = pending.get_mut(relay_url) {
435 batches.retain(|b| !b.outstanding_subs.is_empty());
436 }
437 }
438 }
439}
440```
441
442---
443
444## Self-Subscriber Flow
445
446### State Tracking
447
448```rust
449pub struct SelfSubscriber {
450 own_relay_url: String,
451 relay_domain: String,
452 repo_sync_index: RepoSyncIndex,
453 pending_sync_index: PendingSyncIndex,
454 relay_sync_index: RelaySyncIndex,
455 action_tx: mpsc::Sender<AddFilters>,
456 /// Timestamp of last successful connection - used for since filter on reconnection
457 last_connected: Option<Timestamp>,
458 /// Is this the first connection attempt since startup?
459 is_initial_connect: bool,
460}
461```
462
463### On Startup
464
465```rust
466impl SelfSubscriber {
467 async fn run(mut self) {
468 // Connect to own relay
469 let client = Client::new(Keys::generate());
470 client.add_relay(&self.own_relay_url).await?;
471 client.connect().await;
472
473 // Track connection time
474 self.last_connected = Some(Timestamp::now());
475
476 // Subscribe WITHOUT since filter (get all historical) on first connect
477 let filter = Filter::new().kinds([
478 Kind::Custom(30617), // Repository announcements
479 Kind::GitPatch, // 1617
480 Kind::Custom(1618), // PRs
481 Kind::Custom(1619), // PR updates
482 Kind::GitIssue, // 1621
483 ]);
484
485 client.subscribe(filter, None).await?;
486 self.is_initial_connect = false;
487
488 // Run event loop with batching
489 self.event_loop(&client).await;
490 }
491}
492```
493
494### On Reconnection
495
496```rust
497impl SelfSubscriber {
498 async fn reconnect(&mut self, client: &Client) -> Result<(), Error> {
499 // Reconnect to own relay
500 client.connect().await;
501
502 // On reconnection ONLY, use since filter based on last_connected
503 let since = match self.last_connected {
504 Some(ts) => Timestamp::from(ts.as_u64().saturating_sub(900)), // -15 minutes buffer
505 None => Timestamp::from(0), // Shouldn't happen, but fall back to full sync
506 };
507
508 // Update last_connected AFTER computing since
509 self.last_connected = Some(Timestamp::now());
510
511 let filter = Filter::new()
512 .kinds([
513 Kind::Custom(30617),
514 Kind::GitPatch,
515 Kind::Custom(1618),
516 Kind::Custom(1619),
517 Kind::GitIssue,
518 ])
519 .since(since);
520
521 client.subscribe(filter, None).await?;
522 Ok(())
523 }
524}
525```
526
527### Batching Logic
528
529```rust
530impl SelfSubscriber {
531 async fn event_loop(&self, client: &Client) {
532 let mut pending_events: Vec<Event> = Vec::new();
533 let mut batch_timer: Option<Instant> = None;
534 let batch_window = Duration::from_secs(5);
535
536 loop {
537 let timeout = batch_timer
538 .map(|t| batch_window.saturating_sub(t.elapsed()))
539 .unwrap_or(Duration::from_secs(60));
540
541 tokio::select! {
542 notification = client.notifications().recv() => {
543 if let Ok(RelayPoolNotification::Event { event, .. }) = notification {
544 pending_events.push(*event);
545
546 // Start timer on first event (does NOT reset)
547 if batch_timer.is_none() {
548 batch_timer = Some(Instant::now());
549 }
550 }
551 }
552 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
553 // Batch window elapsed
554 self.process_batch(pending_events.drain(..).collect()).await;
555 batch_timer = None;
556 }
557 }
558 }
559 }
560
561 async fn process_batch(&self, events: Vec<Event>) {
562 // 1. Update RepoSyncIndex
563 for event in events {
564 match event.kind.as_u16() {
565 30617 => self.handle_announcement(&event).await,
566 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
567 _ => {}
568 }
569 }
570
571 // 2. Derive targets and compute actions
572 let repo_index = self.repo_sync_index.read().await;
573 let targets = derive_relay_targets(&repo_index);
574
575 let pending = self.pending_sync_index.read().await;
576 let confirmed = self.relay_sync_index.read().await;
577
578 let actions = compute_actions(&targets, &pending, &confirmed);
579
580 drop(repo_index);
581 drop(pending);
582 drop(confirmed);
583
584 // 3. Send actions to SyncManager
585 for action in actions {
586 let _ = self.action_tx.send(action).await;
587 }
588 }
589}
590```
591
592---
593
594## Bootstrap Relay
595
596```rust
597impl SyncManager {
598 async fn initialize_bootstrap(&mut self) {
599 if let Some(url) = &self.config.bootstrap_relay_url {
600 // Pre-mark as bootstrap (never removed)
601 self.relay_sync_index.write().await.insert(
602 url.clone(),
603 RelaySyncNeeds {
604 repos: HashSet::new(),
605 root_events: HashSet::new(),
606 is_bootstrap: true,
607 }
608 );
609
610 // Send Layer 1 filter
611 let filters = vec![
612 Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)])
613 ];
614
615 self.handle_add_filters(AddFilters {
616 relay_url: url.clone(),
617 repos: HashSet::new(), // Layer 1 doesn't track specific repos
618 root_events: HashSet::new(),
619 filters,
620 }).await;
621 }
622 }
623}
624```
625
626---
627
628## Disconnect Handling
629
630Direct in SyncManager (not via action):
631
632```rust
633impl SyncManager {
634 async fn check_disconnects(&mut self) {
635 let confirmed = self.relay_sync_index.read().await;
636
637 for (relay_url, state) in confirmed.iter() {
638 if state.is_bootstrap {
639 continue; // Never disconnect bootstrap
640 }
641
642 if state.repos.is_empty() && state.root_events.is_empty() {
643 // No repos - disconnect
644 self.disconnect_relay(relay_url).await;
645 }
646 }
647 }
648
649 async fn disconnect_relay(&mut self, relay_url: &str) {
650 self.relay_sync_index.write().await.remove(relay_url);
651 self.pending_sync_index.write().await.remove(relay_url);
652
653 if let Some(conn) = self.connections.remove(relay_url) {
654 conn.disconnect().await;
655 }
656 }
657}
658```
659
660---
661
662## Relay Connection Lifecycle
663
664### State Machine for External Relays
665
666```mermaid
667stateDiagram-v2
668 [*] --> Connecting: spawn_connection
669 Connecting --> Connected: success
670 Connecting --> Backoff: failure
671 Connected --> Disconnected: connection lost
672 Connected --> [*]: intentional disconnect
673 Disconnected --> Backoff: record_failure
674 Backoff --> Connecting: backoff elapsed
675 Backoff --> Dead: 24h continuous failures
676 Dead --> Connecting: daily retry
677```
678
679### Health Integration
680
681Uses `RelayHealthTracker` from [`src/sync/health.rs`](../../src/sync/health.rs):
682
683```rust
684impl SyncManager {
685 /// Spawn a connection with health tracking
686 async fn spawn_connection(&mut self, relay_url: &str) {
687 // Check if we should attempt connection
688 if !self.health_tracker.should_attempt_connection(relay_url) {
689 let remaining = self.health_tracker.get_remaining_backoff(relay_url);
690 tracing::debug!(
691 "Skipping connection to {} - backoff {:?}",
692 relay_url,
693 remaining
694 );
695 return;
696 }
697
698 match self.try_connect(relay_url).await {
699 Ok(conn) => {
700 self.health_tracker.record_success(relay_url);
701 self.connections.insert(relay_url.to_string(), conn);
702 }
703 Err(e) => {
704 self.health_tracker.record_failure(relay_url);
705 tracing::warn!("Connection to {} failed: {}", relay_url, e);
706 }
707 }
708 }
709}
710```
711
712### Reconnection Loop
713
714Each relay connection runs its own reconnection loop:
715
716```rust
717impl RelayConnection {
718 async fn run_with_reconnection(
719 mut self,
720 health_tracker: Arc<RelayHealthTracker>,
721 event_tx: mpsc::Sender<RelayEvent>,
722 ) {
723 loop {
724 // Check backoff before attempting
725 if !health_tracker.should_attempt_connection(&self.url) {
726 if let Some(remaining) = health_tracker.get_remaining_backoff(&self.url) {
727 tokio::time::sleep(remaining).await;
728 continue;
729 }
730 }
731
732 // Attempt connection
733 match self.connect_and_subscribe().await {
734 Ok(()) => {
735 health_tracker.record_success(&self.url);
736
737 // Track when we connected for since filter on reconnect
738 let connected_at = Timestamp::now();
739
740 // Run event loop until disconnection
741 self.run_event_loop(&event_tx).await;
742
743 // Connection lost - will reconnect with since filter
744 health_tracker.record_failure(&self.url);
745
746 // On reconnect, use since = connected_at - 15 minutes
747 self.set_reconnect_since(connected_at);
748 }
749 Err(e) => {
750 health_tracker.record_failure(&self.url);
751 tracing::warn!("Connection to {} failed: {}", self.url, e);
752 }
753 }
754
755 // Get backoff duration and wait
756 let state = health_tracker.get_state(&self.url);
757 if state == HealthState::Dead {
758 // Dead relays retry once per 24 hours
759 tokio::time::sleep(Duration::from_secs(24 * 3600)).await;
760 }
761 // Otherwise, loop will check should_attempt_connection
762 }
763 }
764}
765```
766
767### Backoff Configuration
768
769From existing [`RelayHealthTracker`](../../src/sync/health.rs:91):
770
771| Parameter | Value | Notes |
772|-----------|-------|-------|
773| Base backoff | 5 seconds | First failure |
774| Backoff multiplier | 2x | Exponential increase |
775| Max backoff | 1 hour (configurable) | `sync_max_backoff_secs` |
776| Dead threshold | 24 hours | Continuous failures |
777| Dead retry interval | 24 hours | Once per day |
778
779---
780
781## Consolidation
782
783### Threshold-Based (70 filters)
784
785```rust
786impl SyncManager {
787 async fn maybe_consolidate(&mut self, relay_url: &str) {
788 let filter_count = self.get_filter_count(relay_url).await;
789
790 if filter_count > 70 {
791 self.consolidate(relay_url).await;
792 }
793 }
794
795 async fn consolidate(&mut self, relay_url: &str) {
796 // 1. Wait for all pending batches to complete
797 self.wait_pending_complete(relay_url).await;
798
799 // 2. Close all subscriptions
800 self.close_all_subs(relay_url).await;
801
802 // 3. Rebuild filters from confirmed state
803 let confirmed = self.relay_sync_index.read().await;
804 let state = confirmed.get(relay_url)?;
805 let filters = build_filters(&state.repos, &state.root_events);
806
807 // 4. Resubscribe with since = now - 15 minutes
808 let since = Timestamp::now() - 900;
809 for filter in filters {
810 self.subscribe(relay_url, filter.since(since)).await;
811 }
812 }
813}
814```
815
816### Daily Timer (23-25h Random)
817
818```rust
819impl SyncManager {
820 async fn run_daily_consolidation(&self) {
821 loop {
822 let hours = 23 + rand::random::<f64>() * 2.0;
823 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
824
825 for relay_url in self.connections.keys() {
826 self.consolidate(relay_url).await;
827 }
828 }
829 }
830}
831```
832
833---
834
835## Key Design Decisions
836
837| Decision | Choice | Rationale |
838|----------|--------|-----------|
839| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
840| Since filter | Only on reconnection | Initial subscribe gets full history |
841| Pending tracking | Per-batch with batch ID | Independent confirmation, no blocking |
842| EOSE requirement | All subs in batch must complete | Single repo may need multiple filter subs |
843| Action type | Struct not enum | Only one action type needed |
844| Relay spawning | Auto-spawn on AddFilters | Simplifies action logic |
845| Disconnect | Direct in SyncManager | Not worth an action type |
846| Consolidation | 70 filters + daily timer | Threshold for growth, timer for staleness |
847| Timestamps | In-memory only | Not critical for correctness |
848| Health tracking | Reuse existing RelayHealthTracker | Already implements exponential backoff, dead relay detection |
849| Reconnection backoff | Exponential to 1h max | Prevents hammering failed relays |
850| Dead relay policy | 24h threshold, daily retry | Balance between giving up and resource waste |
851| last_connected tracking | Per-connection in-memory | Enables 15-minute buffer on reconnect |
852| Connection ownership | Inside RelayState | Ties connection lifecycle to sync state, simpler than separate maps |
853| State retention rule | Clear if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
854| Skip disconnected | compute_actions skips disconnected | Prevents queuing AddFilters for offline relays |
855| Reconnect triggers | handle_notifications returns or Shutdown | nostr-sdk signals disconnect via event loop exit |
856| On-reconnect flow | Regenerate AddFilters from RepoSyncIndex | Fresh subscriptions for what we actually need |
857
858---
859
860## Module Structure
861
862```
863src/sync/
864├── mod.rs # SyncManager, main loop
865├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
866├── actions.rs # AddFilters struct, compute_actions
867├── self_subscriber.rs # SelfSubscriber, batching logic
868├── relay_connection.rs # Per-relay WebSocket connection
869├── consolidation.rs # Consolidation logic, daily timer
870├── health.rs # Health tracking (reuse from v2)
871└── metrics.rs # Prometheus metrics (reuse from v2) \ No newline at end of file
diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md
deleted file mode 100644
index dd508b3..0000000
--- a/docs/explanation/grasp-02-proactive-sync-v4.md
+++ /dev/null
@@ -1,1330 +0,0 @@
1# GRASP-02: Proactive Sync v4 - Health & Reconnection Design
2
3## Overview
4
5This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles:
6
71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create
93. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions)
104. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch
115. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary
12
13---
14
15## Data Model
16
17### RepoSyncIndex (Source of Truth)
18
19```rust
20/// What we WANT to sync - derived from events received via self-subscription.
21/// Updated immediately when self-subscriber batch fires.
22/// Key: repo addressable ref - 30617:pubkey:identifier
23pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
24
25#[derive(Debug, Clone, Default)]
26pub struct RepoSyncNeeds {
27 /// Relay URLs listed in this repo's 30617 announcement
28 pub relays: HashSet<String>,
29 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
30 pub root_events: HashSet<EventId>,
31}
32```
33
34### RelaySyncIndex (Confirmed State + Connection)
35
36```rust
37/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
38/// Key: relay URL
39pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
40
41/// Connection status for a relay
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ConnectionStatus {
44 /// Not currently connected
45 Disconnected,
46 /// Connection attempt in progress
47 Connecting,
48 /// Successfully connected and subscribed
49 Connected,
50}
51
52/// Complete state for a single relay - combines sync needs with connection lifecycle
53#[derive(Debug)]
54pub struct RelayState {
55 /// Repos we have confirmed syncing from this relay
56 pub repos: HashSet<String>,
57 /// Root events we have confirmed tracking
58 pub root_events: HashSet<EventId>,
59 /// If true, never disconnect this relay
60 pub is_bootstrap: bool,
61 /// Current connection status
62 pub connection_status: ConnectionStatus,
63 /// When we last successfully connected - used for since filter on reconnect
64 pub last_connected: Option<Timestamp>,
65 /// When we disconnected - for 15-minute state retention rule
66 pub disconnected_at: Option<Timestamp>,
67}
68
69impl RelayState {
70 /// Check if state should be cleared based on 15-minute rule
71 pub fn should_clear_state(&self) -> bool {
72 match self.disconnected_at {
73 Some(disconnected) => {
74 let now = Timestamp::now();
75 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
76 }
77 None => false, // Still connected or never connected
78 }
79 }
80
81 /// Clear repos and root_events - called when reconnect takes > 15 minutes
82 pub fn clear_sync_state(&mut self) {
83 self.repos.clear();
84 self.root_events.clear();
85 }
86}
87```
88
89### PendingSyncIndex (In-Flight Batches)
90
91```rust
92/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
93/// Each batch has its own ID and can confirm independently.
94/// Key: relay URL
95pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
96
97#[derive(Debug, Clone)]
98pub struct PendingBatch {
99 /// Unique ID for this batch - for debugging/logging
100 pub batch_id: u64,
101 /// The items this batch is syncing
102 pub items: PendingItems,
103 /// Subscription IDs that must ALL receive EOSE before confirming
104 pub outstanding_subs: HashSet<SubscriptionId>,
105}
106
107#[derive(Debug, Clone, Default)]
108pub struct PendingItems {
109 pub repos: HashSet<String>,
110 pub root_events: HashSet<EventId>,
111}
112```
113
114---
115
116## Connection Lifecycle State Machine
117
118```mermaid
119stateDiagram-v2
120 [*] --> Disconnected: discover relay via RepoSyncIndex
121 Disconnected --> Connecting: AddFilters triggers spawn_connection
122 Connecting --> Connected: success
123 Connecting --> Disconnected: failure + record in health tracker
124 Connected --> Disconnected: connection lost
125 Connected --> [*]: intentional disconnect via check_disconnects
126
127 note right of Disconnected: disconnected_at set for 15min rule
128 note right of Connected: last_connected tracked for since filter
129```
130
131---
132
133## Flow Scenarios
134
135### Scenario 1: Initial Connect via handle_connect_or_reconnect
136
137```mermaid
138flowchart TB
139 START[Startup] --> SS[Self-subscribe to own relay]
140 SS --> |no since filter| EVENTS[Receive historical events]
141 EVENTS --> RSI[Update RepoSyncIndex]
142 RSI --> DT[derive_relay_targets]
143 DT --> CA[compute_actions with targets and empty confirmed]
144 CA --> AF[AddFilters for each relay]
145 AF --> SPAWN{Relay connected?}
146 SPAWN --> |no| CONN[spawn_connection]
147 CONN --> HC[handle_connect_or_reconnect]
148 SPAWN --> |yes| SUB
149
150 subgraph handle_connect_or_reconnect - Fresh Sync
151 HC --> CHECK_FRESH{is_fresh_sync?}
152 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since]
153 L1 --> RCA[recompute_actions_for_relay]
154 end
155
156 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters]
157 SUB --> PB[Create PendingBatch]
158 PB --> EOSE[Wait for EOSE]
159 EOSE --> CONFIRM[Move items to confirmed repos/root_events]
160```
161
162**Key points:**
163
164- No `since` filter on initial connect - get full history
165- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()`
166- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
167- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking
168
169### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes
170
171```mermaid
172flowchart TB
173 DISC[Connection lost] --> MARK[Set disconnected_at = now]
174 MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay]
175 CLEAR_PEND --> WAIT[Wait for reconnection]
176 WAIT --> RECONN[Connection restored]
177 RECONN --> HC[handle_connect_or_reconnect]
178
179 subgraph handle_connect_or_reconnect - Quick Reconnect
180 HC --> CHECK{is_fresh_sync?}
181 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min]
182 SINCE --> L1[build_announcement_filter - with since]
183 L1 --> L23[rebuild_layer2_and_layer3 - with since]
184 L23 --> RCA[recompute_actions_for_relay]
185 end
186
187 RCA --> AF[AddFilters for new items only]
188 AF --> SUB[Subscribe]
189 SUB --> PB[Create PendingBatch]
190 PB --> EOSE[Wait for EOSE]
191 EOSE --> EXTEND[Extend confirmed state]
192```
193
194**Key points:**
195
196- PendingSyncIndex cleared on disconnect (not reconnect)
197- `handle_connect_or_reconnect`:
198 1. `build_announcement_filter(Some(since))` - Layer 1 with since
199 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
200 3. `recompute_actions_for_relay` - check for new items
201- since = last_connected - 15min ensures we catch events during disconnection
202
203### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes
204
205```mermaid
206flowchart TB
207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect]
208
209 subgraph handle_connect_or_reconnect - Stale Reconnect
210 HC --> CHECK{is_fresh_sync?}
211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state]
212 CLEAR --> L1[build_announcement_filter - no since]
213 L1 --> RCA[recompute_actions_for_relay]
214 end
215
216 RCA --> CA[compute_actions with empty confirmed]
217 CA --> AF[AddFilters for everything]
218 AF --> SUB[Subscribe - no since filter]
219 SUB --> PB[Create PendingBatch]
220 PB --> EOSE[Wait for EOSE]
221 EOSE --> CONFIRM[Populate confirmed state fresh]
222```
223
224**Key points:**
225
226- `should_clear_state()` returns true → triggers fresh sync
227- Same path as initial connect after clearing state
228- Layer 1: `build_announcement_filter(None)` - full history
229- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything
230
231### Scenario 4: Consolidation - Triggered on Filter Add
232
233```mermaid
234flowchart TB
235 AF[handle_add_filters called] --> COUNT{current + new > 70?}
236 COUNT --> |yes| CONSOLIDATE[consolidate]
237 CONSOLIDATE --> WAIT_PEND[wait_pending_complete]
238 WAIT_PEND --> CLOSE[unsubscribe_all]
239 CLOSE --> SINCE[since = now - 15min]
240 SINCE --> L1[build_announcement_filter - with since]
241 L1 --> L23[rebuild_layer2_and_layer3 - with since]
242 COUNT --> |no| SUB[Subscribe new filters]
243 SUB --> PB[Create PendingBatch]
244```
245
246**Key points:**
247
248- Consolidation checked in `handle_add_filters` BEFORE adding new filters
249- After closing all subscriptions, re-subscribe:
250 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
251 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
252- `since = now - 15min` prevents re-fetching old events
253- Keeps confirmed state, just reduces filter count
254
255### Scenario 5: Daily Timer - 23 to 25h Random
256
257```mermaid
258flowchart TB
259 DAILY[Daily timer fires] --> CLOSE[unsubscribe_all]
260 CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay]
261 CLEAR_PEND --> CLEAR_STATE[clear_sync_state]
262 CLEAR_STATE --> L1[build_announcement_filter - no since]
263 L1 --> RCA[recompute_actions_for_relay]
264 RCA --> CA[compute_actions with empty confirmed]
265 CA --> AF[AddFilters for everything]
266 AF --> SUB[Subscribe - no since filter]
267 SUB --> PB[Create PendingBatch]
268 PB --> EOSE[Wait for EOSE]
269 EOSE --> CONFIRM[Repopulate confirmed state]
270```
271
272**Key points:**
273
274- Daily timer is a full fresh sync, NOT consolidation
275- Clears both PendingSyncIndex and confirmed state
276- Layer 1: `build_announcement_filter(None)` - full history
277- Layer 2+3: via compute_actions with empty confirmed - full history
278- Detects any state drift accumulated over 24 hours
279
280---
281
282## Core Algorithms
283
284### 1. derive_relay_targets
285
286Transform RepoSyncIndex into per-relay sync targets:
287
288```rust
289/// Inverts RepoSyncIndex to get per-relay view
290fn derive_relay_targets(
291 repo_index: &HashMap<String, RepoSyncNeeds>
292) -> HashMap<String, RelaySyncNeeds> {
293 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
294
295 for (repo_ref, needs) in repo_index {
296 for relay_url in &needs.relays {
297 let target = targets.entry(relay_url.clone()).or_default();
298 target.repos.insert(repo_ref.clone());
299 target.root_events.extend(needs.root_events.iter().cloned());
300 }
301 }
302
303 targets
304}
305```
306
307### 2. compute_actions (Three-Way Diff)
308
309**This is the ONLY decision point for what NEW subscriptions to create.**
310
311```rust
312/// Computes AddFilters for items that are:
313/// - In targets (what we want)
314/// - NOT in pending (already in-flight)
315/// - NOT in confirmed (already confirmed)
316fn compute_actions(
317 targets: &HashMap<String, RelaySyncNeeds>,
318 pending: &HashMap<String, Vec<PendingBatch>>,
319 confirmed: &HashMap<String, RelayState>,
320) -> Vec<AddFilters> {
321 let mut actions = Vec::new();
322
323 for (relay_url, target) in targets {
324 // Skip disconnected relays - they will get AddFilters on reconnect
325 if let Some(state) = confirmed.get(relay_url) {
326 if state.connection_status != ConnectionStatus::Connected {
327 continue;
328 }
329 }
330
331 // Collect all pending items for this relay
332 let pending_repos: HashSet<_> = pending.get(relay_url)
333 .map(|batches| batches.iter()
334 .flat_map(|b| b.items.repos.iter().cloned())
335 .collect())
336 .unwrap_or_default();
337 let pending_events: HashSet<_> = pending.get(relay_url)
338 .map(|batches| batches.iter()
339 .flat_map(|b| b.items.root_events.iter().cloned())
340 .collect())
341 .unwrap_or_default();
342
343 // Collect confirmed items for this relay
344 let confirmed_repos = confirmed.get(relay_url)
345 .map(|c| &c.repos)
346 .unwrap_or(&HashSet::new());
347 let confirmed_events = confirmed.get(relay_url)
348 .map(|c| &c.root_events)
349 .unwrap_or(&HashSet::new());
350
351 // New = target - pending - confirmed
352 let new_repos: HashSet<_> = target.repos.iter()
353 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
354 .cloned()
355 .collect();
356 let new_events: HashSet<_> = target.root_events.iter()
357 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
358 .cloned()
359 .collect();
360
361 if !new_repos.is_empty() || !new_events.is_empty() {
362 let filters = build_filters(&new_repos, &new_events);
363 actions.push(AddFilters {
364 relay_url: relay_url.clone(),
365 repos: new_repos,
366 root_events: new_events,
367 filters,
368 });
369 }
370 }
371
372 actions
373}
374```
375
376### 3. Filter Building Functions (Three-Layer Strategy)
377
378The filter strategy uses three layers:
379
380- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
381- **Layer 2**: Events tagging our repos
382- **Layer 3**: Events tagging our root events
383
384**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
385
386```rust
387/// Layer 1: Announcements filter (kinds 30617 + 30618)
388/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
389/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
390fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
391 let filter = Filter::new().kinds([
392 Kind::Custom(30617), // Repository announcements
393 Kind::Custom(30618), // Maintainer lists
394 ]);
395
396 match since {
397 Some(ts) => filter.since(ts),
398 None => filter,
399 }
400}
401
402/// Layer 2: Events tagging one of our repos
403/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
404/// Batched per 100 repo refs.
405fn tagged_one_of_our_repo_event_filters(
406 repos: &HashSet<String>,
407 since: Option<Timestamp>,
408) -> Vec<Filter> {
409 let mut filters = Vec::new();
410 let repo_refs: Vec<_> = repos.iter().collect();
411
412 for chunk in repo_refs.chunks(100) {
413 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
414
415 // Lowercase 'a' tag - standard addressable reference
416 let mut f1 = Filter::new()
417 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
418 // Uppercase 'A' tag - some clients use this
419 let mut f2 = Filter::new()
420 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone());
421 // Quote 'q' tag - NIP-10 quote references to addressable events
422 let mut f3 = Filter::new()
423 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
424
425 if let Some(ts) = since {
426 f1 = f1.since(ts);
427 f2 = f2.since(ts);
428 f3 = f3.since(ts);
429 }
430
431 filters.push(f1);
432 filters.push(f2);
433 filters.push(f3);
434 }
435
436 filters
437}
438
439/// Layer 3: Events tagging one of our root events
440/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
441/// Batched per 100 event IDs.
442fn tagged_one_of_our_root_event_filters(
443 root_events: &HashSet<EventId>,
444 since: Option<Timestamp>,
445) -> Vec<Filter> {
446 let mut filters = Vec::new();
447 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
448
449 for chunk in event_ids.chunks(100) {
450 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
451
452 // Lowercase 'e' tag - standard event reference
453 let mut f1 = Filter::new()
454 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
455 // Uppercase 'E' tag - some clients use this
456 let mut f2 = Filter::new()
457 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone());
458 // Quote 'q' tag - NIP-10 quote references to events
459 let mut f3 = Filter::new()
460 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
461
462 if let Some(ts) = since {
463 f1 = f1.since(ts);
464 f2 = f2.since(ts);
465 f3 = f3.since(ts);
466 }
467
468 filters.push(f1);
469 filters.push(f2);
470 filters.push(f3);
471 }
472
473 filters
474}
475
476/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
477/// Used by:
478/// - compute_actions for incremental subscriptions
479/// - consolidation rebuilds (Layer 1 remains active)
480fn build_layer2_and_layer3_filters(
481 repos: &HashSet<String>,
482 root_events: &HashSet<EventId>,
483 since: Option<Timestamp>,
484) -> Vec<Filter> {
485 let mut filters = Vec::new();
486 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
487 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
488 filters
489}
490```
491
492**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently.
493
494### 4. handle_add_filters (SyncManager)
495
496```rust
497impl SyncManager {
498 async fn handle_add_filters(&mut self, action: AddFilters) {
499 let AddFilters { relay_url, repos, root_events, filters } = action;
500
501 // Auto-spawn connection if needed
502 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
503 match state {
504 None => {
505 // New relay discovered - create entry and spawn connection
506 self.relay_sync_index.write().await.insert(
507 relay_url.clone(),
508 RelayState {
509 repos: HashSet::new(),
510 root_events: HashSet::new(),
511 is_bootstrap: false,
512 connection_status: ConnectionStatus::Connecting,
513 last_connected: None,
514 disconnected_at: None,
515 connection: None,
516 }
517 );
518 self.spawn_connection(&relay_url).await;
519 return; // Subscriptions will happen on connection success
520 }
521 Some(state) if state.connection_status != ConnectionStatus::Connected => {
522 // Not connected - subscriptions will happen on connection success
523 return;
524 }
525 Some(_) => {
526 // Already connected - proceed with subscription
527 }
528 }
529
530 // Subscribe and collect subscription IDs
531 let conn = self.connections.get(&relay_url).unwrap();
532 let mut sub_ids = HashSet::new();
533
534 for filter in filters {
535 match conn.client.subscribe(filter, None).await {
536 Ok(output) => {
537 for sub_id in output.val {
538 sub_ids.insert(sub_id);
539 }
540 }
541 Err(e) => {
542 tracing::warn!("Failed to subscribe: {}", e);
543 }
544 }
545 }
546
547 // Create pending batch
548 let batch = PendingBatch {
549 batch_id: self.next_batch_id(),
550 items: PendingItems { repos, root_events },
551 outstanding_subs: sub_ids,
552 };
553
554 // Add to pending index
555 self.pending_sync_index.write().await
556 .entry(relay_url)
557 .or_default()
558 .push(batch);
559 }
560}
561```
562
563### 5. handle_disconnect
564
565```rust
566impl SyncManager {
567 /// Called when connection to a relay is lost
568 async fn handle_disconnect(&mut self, relay_url: &str) {
569 let mut index = self.relay_sync_index.write().await;
570
571 if let Some(state) = index.get_mut(relay_url) {
572 state.connection_status = ConnectionStatus::Disconnected;
573 state.disconnected_at = Some(Timestamp::now());
574 state.connection = None;
575 }
576
577 // Clear pending batches - these items were not confirmed
578 self.pending_sync_index.write().await.remove(relay_url);
579
580 // Remove from active connections map
581 self.connections.remove(relay_url);
582
583 // Health tracker records failure for backoff
584 self.health_tracker.record_failure(relay_url);
585 }
586}
587```
588
589### 6. handle_connect_or_reconnect (Unified)
590
591This method handles BOTH initial connection AND reconnection with unified logic:
592
593```rust
594impl SyncManager {
595 /// Called when connection to a relay succeeds - handles both initial connect and reconnect.
596 ///
597 /// Decision tree:
598 /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history
599 /// - Quick reconnect (<15min): since = last_connected - 15min
600 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
601 let mut index = self.relay_sync_index.write().await;
602 let state = match index.get_mut(relay_url) {
603 Some(s) => s,
604 None => return, // Relay was removed while disconnected
605 };
606
607 // Determine if this is a fresh sync or quick reconnect
608 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
609 let last_connected = state.last_connected;
610
611 if is_fresh_sync && state.last_connected.is_some() {
612 // Stale reconnect (>15min) - clear state
613 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
614 state.clear_sync_state();
615 }
616
617 // Update connection state
618 state.connection_status = ConnectionStatus::Connected;
619 state.last_connected = Some(Timestamp::now());
620 state.disconnected_at = None;
621
622 // Record success in health tracker
623 self.health_tracker.record_success(relay_url);
624
625 drop(index); // Release lock
626
627 let conn = match self.connections.get(relay_url) {
628 Some(c) => c,
629 None => return,
630 };
631
632 if is_fresh_sync {
633 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
634
635 // Step 1: Subscribe Layer 1 (announcements) without since
636 let layer1 = build_announcement_filter(None);
637 let _ = conn.client.subscribe(layer1, None).await;
638
639 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build)
640 self.recompute_actions_for_relay(relay_url).await;
641 } else {
642 // Quick reconnect: Layer 1 with since, Layer 2+3 with since
643 let since = last_connected
644 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
645 .unwrap_or(Timestamp::from(0));
646
647 // Step 1: Subscribe Layer 1 (announcements) with since
648 let layer1 = build_announcement_filter(Some(since));
649 let _ = conn.client.subscribe(layer1, None).await;
650
651 // Step 2: Rebuild Layer 2+3 for confirmed items with since
652 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
653
654 // Step 3: Check for NEW items via compute_actions
655 self.recompute_actions_for_relay(relay_url).await;
656 }
657 }
658
659 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1).
660 /// Used by:
661 /// - Quick reconnect: rebuild confirmed items with since filter
662 /// - Consolidation: close and rebuild with since filter
663 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
664 let confirmed = self.relay_sync_index.read().await;
665 let state = match confirmed.get(relay_url) {
666 Some(s) => s,
667 None => return,
668 };
669
670 // Build Layer 2+3 filters WITH since
671 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
672 drop(confirmed);
673
674 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
675 let conn = match self.connections.get(relay_url) {
676 Some(c) => c,
677 None => return,
678 };
679
680 for filter in filters {
681 let _ = conn.client.subscribe(filter, None).await;
682 }
683 }
684
685 /// Rerun compute_actions for a specific relay and process resulting AddFilters.
686 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
687 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
688 let repo_index = self.repo_sync_index.read().await;
689 let targets = derive_relay_targets(&repo_index);
690 drop(repo_index);
691
692 // Filter to just this relay
693 let target = match targets.get(relay_url) {
694 Some(t) => t.clone(),
695 None => return, // No repos reference this relay anymore
696 };
697
698 let pending = self.pending_sync_index.read().await;
699 let confirmed = self.relay_sync_index.read().await;
700
701 let mut single_relay_targets = HashMap::new();
702 single_relay_targets.insert(relay_url.to_string(), target);
703
704 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
705
706 drop(pending);
707 drop(confirmed);
708
709 // Process AddFilters
710 for action in actions {
711 self.handle_add_filters(action).await;
712 }
713 }
714}
715```
716
717### 7. Daily Timer
718
719```rust
720impl SyncManager {
721 async fn run_daily_timer(&self) {
722 loop {
723 // Random 23-25 hours
724 let hours = 23.0 + rand::random::<f64>() * 2.0;
725 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
726
727 let relay_urls: Vec<_> = self.relay_sync_index.read().await
728 .keys()
729 .cloned()
730 .collect();
731
732 for relay_url in relay_urls {
733 self.daily_sync(&relay_url).await;
734 }
735 }
736 }
737
738 /// Perform daily fresh sync for a relay
739 async fn daily_sync(&mut self, relay_url: &str) {
740 tracing::info!("Daily sync triggered for {}", relay_url);
741
742 // Close all subscriptions
743 if let Some(conn) = self.connections.get(relay_url) {
744 conn.client.unsubscribe_all().await;
745 }
746
747 // Clear PendingSyncIndex
748 self.pending_sync_index.write().await.remove(relay_url);
749
750 // Clear confirmed state - triggers fresh sync
751 {
752 let mut index = self.relay_sync_index.write().await;
753 if let Some(state) = index.get_mut(relay_url) {
754 state.clear_sync_state();
755 }
756 }
757
758 // Recompute actions - will generate AddFilters for everything
759 self.recompute_actions_for_relay(relay_url).await;
760 }
761}
762```
763
764### 8. Consolidation (Threshold-Based, Triggered on Add)
765
766Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active.
767
768```rust
769impl SyncManager {
770 /// Check filter count and consolidate if needed.
771 /// Called from handle_add_filters BEFORE adding new filters.
772 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
773 let current_count = self.get_filter_count(relay_url).await;
774
775 if current_count + new_filter_count > 70 {
776 self.consolidate(relay_url).await;
777 }
778 }
779
780 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active.
781 /// Does NOT clear state - just reduces filter count.
782 async fn consolidate(&mut self, relay_url: &str) {
783 tracing::info!("Consolidating filters for {} (count > 70)", relay_url);
784
785 // Wait for all pending batches to complete first
786 self.wait_pending_complete(relay_url).await;
787
788 // Close Layer 2+3 subscriptions only - Layer 1 remains active
789 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately
790 // For simplicity, we close all and re-add Layer 1
791 if let Some(conn) = self.connections.get(relay_url) {
792 conn.client.unsubscribe_all().await;
793 }
794
795 // Re-subscribe Layer 1 with since (maintains announcements stream)
796 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
797 let conn = self.connections.get(relay_url).unwrap();
798 let layer1 = build_announcement_filter(Some(since));
799 let _ = conn.client.subscribe(layer1, None).await;
800
801 // Rebuild Layer 2+3 only
802 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
803 }
804}
805```
806
807**Updated handle_add_filters to check consolidation:**
808
809```rust
810impl SyncManager {
811 async fn handle_add_filters(&mut self, action: AddFilters) {
812 let AddFilters { relay_url, repos, root_events, filters } = action;
813
814 // Auto-spawn connection if needed (unchanged)
815 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
816 match state {
817 None => {
818 // New relay discovered - create entry and spawn connection
819 self.relay_sync_index.write().await.insert(
820 relay_url.clone(),
821 RelayState {
822 repos: HashSet::new(),
823 root_events: HashSet::new(),
824 is_bootstrap: false,
825 connection_status: ConnectionStatus::Connecting,
826 last_connected: None,
827 disconnected_at: None,
828 connection: None,
829 }
830 );
831 self.spawn_connection(&relay_url).await;
832 return; // Subscriptions will happen on connection success
833 }
834 Some(state) if state.connection_status != ConnectionStatus::Connected => {
835 return; // Not connected - subscriptions will happen on connection success
836 }
837 Some(_) => {
838 // Already connected - proceed
839 }
840 }
841
842 // CHECK CONSOLIDATION BEFORE ADDING
843 self.maybe_consolidate(&relay_url, filters.len()).await;
844
845 // Subscribe and collect subscription IDs
846 let conn = self.connections.get(&relay_url).unwrap();
847 let mut sub_ids = HashSet::new();
848
849 for filter in filters {
850 match conn.client.subscribe(filter, None).await {
851 Ok(output) => {
852 for sub_id in output.val {
853 sub_ids.insert(sub_id);
854 }
855 }
856 Err(e) => {
857 tracing::warn!("Failed to subscribe: {}", e);
858 }
859 }
860 }
861
862 // Create pending batch (unchanged)
863 let batch = PendingBatch {
864 batch_id: self.next_batch_id(),
865 items: PendingItems { repos, root_events },
866 outstanding_subs: sub_ids,
867 };
868
869 self.pending_sync_index.write().await
870 .entry(relay_url)
871 .or_default()
872 .push(batch);
873 }
874}
875```
876
877---
878
879## Disconnect (Relay Removal) Handling
880
881```rust
882impl SyncManager {
883 /// Periodically check for relays that should be disconnected
884 async fn check_disconnects(&mut self) {
885 let confirmed = self.relay_sync_index.read().await;
886 let relays_to_disconnect: Vec<_> = confirmed.iter()
887 .filter(|(_, state)| {
888 !state.is_bootstrap &&
889 state.repos.is_empty() &&
890 state.root_events.is_empty()
891 })
892 .map(|(url, _)| url.clone())
893 .collect();
894 drop(confirmed);
895
896 for relay_url in relays_to_disconnect {
897 self.disconnect_relay(&relay_url).await;
898 }
899 }
900
901 async fn disconnect_relay(&mut self, relay_url: &str) {
902 tracing::info!("Disconnecting relay {} (no repos)", relay_url);
903
904 self.relay_sync_index.write().await.remove(relay_url);
905 self.pending_sync_index.write().await.remove(relay_url);
906
907 if let Some(conn) = self.connections.remove(relay_url) {
908 let _ = conn.client.disconnect().await;
909 }
910 }
911}
912```
913
914---
915
916## State Flow Summary
917
918```mermaid
919flowchart TB
920 subgraph Input
921 SS[SelfSubscriber]
922 OWN[Own Relay]
923 end
924
925 subgraph RepoSyncIndex - What We Want
926 RSI[HashMap: Repo to Relays+Events]
927 end
928
929 subgraph Derived Target
930 DT[derive_relay_targets fn]
931 TGT[Per-relay: repos + events we should sync]
932 end
933
934 subgraph compute_actions - Decision Point
935 CA[Three-way diff: target - pending - confirmed]
936 end
937
938 subgraph PendingSyncIndex - In Flight
939 PSI[Vec PendingBatch per relay]
940 end
941
942 subgraph RelaySyncIndex - Confirmed State
943 RLI[RelayState per relay]
944 CONN[connection_status]
945 REPOS[repos + root_events]
946 TIMES[last_connected + disconnected_at]
947 end
948
949 SS -->|subscribe| OWN
950 OWN -->|events| SS
951 SS -->|batch fires| RSI
952 RSI --> DT
953 DT --> TGT
954 TGT --> CA
955 PSI --> CA
956 RLI --> CA
957 CA -->|Layer 2+3 new items| AF[AddFilters]
958 AF -->|check filter count| CONSOL{count + new > 70?}
959 CONSOL -->|yes| CONSOLIDATE[consolidate]
960 CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since]
961 L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since]
962 CONSOL -->|no| SUB[subscribe]
963 AF -->|spawn if needed| CONN
964 SUB --> PSI
965 PSI -->|EOSE| REPOS
966
967 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
968 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
969
970 subgraph handle_connect_or_reconnect
971 HC --> FRESH_CHECK{is_fresh_sync?}
972 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
973 FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since]
974 L1_FRESH --> RCA1[recompute_actions_for_relay]
975 L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since]
976 L23_QUICK --> RCA2[recompute_actions_for_relay]
977 end
978```
979
980---
981
982## Key Design Decisions
983
984| Decision | Choice | Rationale |
985| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- |
986| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
987| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
988| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
989| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
990| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
991| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events |
992| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
993| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
994| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
995| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
996| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream |
997| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect |
998| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
999| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
1000| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
1001| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
1002| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
1003
1004---
1005
1006## Module Structure
1007
1008```
1009src/sync/
1010├── mod.rs # SyncManager, main loop
1011├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
1012├── actions.rs # AddFilters struct, compute_actions, build_filters
1013├── self_subscriber.rs # SelfSubscriber, batching logic
1014├── relay_connection.rs # Per-relay WebSocket connection
1015├── consolidation.rs # Consolidation logic, daily timer
1016├── health.rs # Health tracking (reuse from v2)
1017└── metrics.rs # Prometheus metrics (reuse from v2)
1018```
1019
1020---
1021
1022## Comparison: v3 vs v4
1023
1024| Aspect | v3 | v4 |
1025| ------------------------ | ----------------------------------------- | --------------------------------------------- |
1026| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1027| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1028| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1029| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1030| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1031| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1032| PSI clearing | On disconnect | On disconnect (confirmed) |
1033| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
1034
1035---
1036
1037## Self-Subscriber Flow
1038
1039The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions.
1040
1041### State Tracking
1042
1043```rust
1044pub struct SelfSubscriber {
1045 own_relay_url: String,
1046 relay_domain: String,
1047 repo_sync_index: RepoSyncIndex,
1048 pending_sync_index: PendingSyncIndex,
1049 relay_sync_index: RelaySyncIndex,
1050 action_tx: mpsc::Sender<AddFilters>,
1051 /// Timestamp of last successful connection - used for since filter on reconnection
1052 last_connected: Option<Timestamp>,
1053 /// The active client connection
1054 client: Option<Client>,
1055}
1056```
1057
1058### On Startup / Reconnect (Unified)
1059
1060Both initial startup and reconnection use the same `connect_and_subscribe` method:
1061
1062```rust
1063impl SelfSubscriber {
1064 async fn run(mut self) {
1065 loop {
1066 // Connect or reconnect
1067 if let Err(e) = self.connect_and_subscribe().await {
1068 tracing::warn!("Connection failed: {}, will retry", e);
1069 tokio::time::sleep(Duration::from_secs(5)).await;
1070 continue;
1071 }
1072
1073 // Run event loop until disconnection
1074 self.event_loop().await;
1075
1076 // Loop will retry connection
1077 }
1078 }
1079
1080 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
1081 let client = Client::new(Keys::generate());
1082 client.add_relay(&self.own_relay_url).await?;
1083 client.connect().await;
1084
1085 // Build filter - add since only on reconnect
1086 let filter = Filter::new().kinds([
1087 Kind::Custom(30617), // Repository announcements
1088 Kind::GitPatch, // 1617
1089 Kind::Custom(1618), // PRs
1090 Kind::Custom(1619), // PR updates
1091 Kind::GitIssue, // 1621
1092 ]);
1093
1094 let filter = if let Some(ts) = self.last_connected {
1095 // Reconnection: use since filter
1096 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
1097 filter.since(since)
1098 } else {
1099 // Initial connect: no since filter - get full history
1100 filter
1101 };
1102
1103 // Update last_connected AFTER computing since
1104 self.last_connected = Some(Timestamp::now());
1105
1106 client.subscribe(filter, None).await?;
1107 self.client = Some(client);
1108 Ok(())
1109 }
1110}
1111```
1112
1113### Event Loop with Batching
1114
1115```rust
1116impl SelfSubscriber {
1117 async fn event_loop(&mut self) {
1118 let client = self.client.as_ref().unwrap();
1119 let mut pending_events: Vec<Event> = Vec::new();
1120 let mut batch_timer: Option<Instant> = None;
1121 let batch_window = Duration::from_secs(5);
1122
1123 loop {
1124 let timeout = batch_timer
1125 .map(|t| batch_window.saturating_sub(t.elapsed()))
1126 .unwrap_or(Duration::from_secs(60));
1127
1128 tokio::select! {
1129 notification = client.notifications().recv() => {
1130 match notification {
1131 Ok(RelayPoolNotification::Event { event, .. }) => {
1132 pending_events.push(*event);
1133
1134 // Start timer on first event - does NOT reset
1135 if batch_timer.is_none() {
1136 batch_timer = Some(Instant::now());
1137 }
1138 }
1139 Ok(RelayPoolNotification::Shutdown) => {
1140 // Connection lost
1141 break;
1142 }
1143 _ => {}
1144 }
1145 }
1146 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
1147 // Batch window elapsed
1148 self.process_batch(pending_events.drain(..).collect()).await;
1149 batch_timer = None;
1150 }
1151 }
1152 }
1153 }
1154
1155 async fn process_batch(&self, events: Vec<Event>) {
1156 // 1. Update RepoSyncIndex
1157 for event in events {
1158 match event.kind.as_u16() {
1159 30617 => self.handle_announcement(&event).await,
1160 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
1161 _ => {}
1162 }
1163 }
1164
1165 // 2. Derive targets and compute actions
1166 let repo_index = self.repo_sync_index.read().await;
1167 let targets = derive_relay_targets(&repo_index);
1168
1169 let pending = self.pending_sync_index.read().await;
1170 let confirmed = self.relay_sync_index.read().await;
1171
1172 let actions = compute_actions(&targets, &pending, &confirmed);
1173
1174 drop(repo_index);
1175 drop(pending);
1176 drop(confirmed);
1177
1178 // 3. Send actions to SyncManager
1179 for action in actions {
1180 let _ = self.action_tx.send(action).await;
1181 }
1182 }
1183
1184 async fn handle_announcement(&self, event: &Event) {
1185 // Extract repo_ref from event - 30617:pubkey:identifier
1186 let d_tag = event.tags.iter()
1187 .find_map(|tag| {
1188 if tag.kind() == TagKind::D {
1189 tag.content().map(|s| s.to_string())
1190 } else {
1191 None
1192 }
1193 })
1194 .unwrap_or_default();
1195
1196 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1197
1198 // Extract relay URLs from 'r' tags
1199 let relays: HashSet<String> = event.tags.iter()
1200 .filter_map(|tag| {
1201 if tag.kind() == TagKind::Relay {
1202 tag.content().map(|s| s.to_string())
1203 } else {
1204 None
1205 }
1206 })
1207 .collect();
1208
1209 // Update RepoSyncIndex
1210 let mut index = self.repo_sync_index.write().await;
1211 let needs = index.entry(repo_ref).or_default();
1212 needs.relays = relays;
1213 }
1214
1215 async fn handle_root_event(&self, event: &Event) {
1216 // Extract repo_ref from 'a' tag
1217 let repo_ref = event.tags.iter()
1218 .find_map(|tag| {
1219 if tag.kind() == TagKind::A {
1220 tag.content().map(|s| s.to_string())
1221 } else {
1222 None
1223 }
1224 });
1225
1226 if let Some(repo_ref) = repo_ref {
1227 let mut index = self.repo_sync_index.write().await;
1228 let needs = index.entry(repo_ref).or_default();
1229 needs.root_events.insert(event.id);
1230 }
1231 }
1232}
1233```
1234
1235---
1236
1237## Implementation Notes
1238
1239This section documents the actual implementation details as of December 2024 (Phases 1-10 complete).
1240
1241### Architectural Decisions During Implementation
1242
1243**Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc<Mutex<SyncManager>>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup:
1244
1245```rust
1246// 7. Wrap self in Arc<Mutex> for sharing with timer task
1247let sync_manager = Arc::new(Mutex::new(self));
1248```
1249
1250This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber.
1251
1252**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses:
1253
1254- `DashMap` for thread-safe concurrent access without external locking
1255- Three states: `Healthy`, `Degraded`, `Dead`
1256- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff
1257- Dead threshold: 24 hours of continuous failures
1258- Dead relay retry: Once per 24 hours
1259
1260### Implementation Constants
1261
1262| Constant | Value | Purpose |
1263| --------------------------------- | ---------- | ------------------------------------------------ |
1264| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation |
1265| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation |
1266| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync |
1267| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect |
1268| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead |
1269| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff |
1270
1271### Daily Timer Randomization
1272
1273The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running:
1274
1275```rust
1276let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0;
1277```
1278
1279### Bootstrap Relay Protection
1280
1281Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out:
1282
1283```rust
1284.filter(|(_, state)| {
1285 !state.is_bootstrap &&
1286 state.repos.is_empty() &&
1287 state.root_events.is_empty()
1288})
1289```
1290
1291### Graceful Shutdown
1292
1293Shutdown uses a tokio broadcast channel for coordinated termination:
1294
1295```rust
1296let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
1297```
1298
1299Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop.
1300
1301### Actual Module Structure
1302
1303The implemented module structure differs from the original spec:
1304
1305```
1306src/sync/
1307├── mod.rs # SyncManager, main loop, index types, metrics
1308├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters
1309├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters
1310├── health.rs # RelayHealthTracker, HealthState, exponential backoff
1311├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling
1312└── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic
1313```
1314
1315Key differences from spec:
1316
1317- No separate `state.rs` - types are defined in `mod.rs`
1318- No separate `actions.rs` - moved to `algorithms.rs`
1319- No separate `consolidation.rs` - consolidation logic in `mod.rs`
1320- No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs`
1321
1322### Deviations from Original v4 Spec
1323
13241. **RelayState lacks `connection` field**: The spec showed `connection: Option<RelayConnection>` in `RelayState`, but the implementation stores connections in a separate `HashMap<String, RelayConnection>` in `SyncManager`.
1325
13262. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct.
1327
13283. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches.
1329
13304. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API.
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md
index c07f07c..dd508b3 100644
--- a/docs/explanation/grasp-02-proactive-sync.md
+++ b/docs/explanation/grasp-02-proactive-sync.md
@@ -1,929 +1,1330 @@
1# GRASP-02: Proactive Sync - Design Document 1# GRASP-02: Proactive Sync v4 - Health & Reconnection Design
2 2
3## Overview 3## Overview
4 4
5GRASP-02 Proactive Sync enables ngit-grasp to maintain live WebSocket connections to other relays listed in repository announcement events, synchronizing NIP-34 related events using both **live sync** (real-time subscriptions) and **negentropy catchup** (NIP-77 set reconciliation). 5This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles:
6 6
7This document covers **event syncing only**. Git data syncing is out of scope for this phase. 71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create
93. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions)
104. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch
115. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary
8 12
9## Goals 13---
10 14
111. **Data Availability**: Ensure we have all relevant events for repositories we host 15## Data Model
122. **Resilience**: Handle relay failures gracefully with backoff and health tracking
133. **Efficiency**: Minimize connections and bandwidth through filter consolidation
144. **Consistency**: Use unified filters for both live sync and negentropy catchup
15 16
16## Architecture Overview 17### RepoSyncIndex (Source of Truth)
17 18
18```mermaid 19```rust
19flowchart TB 20/// What we WANT to sync - derived from events received via self-subscription.
20 subgraph ngit-grasp 21/// Updated immediately when self-subscriber batch fires.
21 subgraph SyncManager 22/// Key: repo addressable ref - 30617:pubkey:identifier
22 SS[Self-Subscriber] 23pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
23 RC[Remote Connections] 24
24 end 25#[derive(Debug, Clone, Default)]
25 WS[WebSocket Server] 26pub struct RepoSyncNeeds {
26 FS[FilterService] 27 /// Relay URLs listed in this repo's 30617 announcement
27 RH[RelayHealthTracker] 28 pub relays: HashSet<String>,
28 DB[(Database)] 29 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
29 AP[AcceptancePolicy] 30 pub root_events: HashSet<EventId>,
30 MET[Prometheus Metrics] 31}
31 end 32```
32 33
33 subgraph External Relays 34### RelaySyncIndex (Confirmed State + Connection)
34 R1[relay.example.com]
35 R2[other-grasp.io]
36 R3[nostr.land]
37 end
38 35
39 WS -->|broadcasts events| SS 36```rust
40 SS -->|discovers relays| RC 37/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
41 RC -->|builds filters| FS 38/// Key: relay URL
42 RC -->|tracks health| RH 39pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
43 RC -->|stores events| DB 40
44 RC -->|validates| AP 41/// Connection status for a relay
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ConnectionStatus {
44 /// Not currently connected
45 Disconnected,
46 /// Connection attempt in progress
47 Connecting,
48 /// Successfully connected and subscribed
49 Connected,
50}
51
52/// Complete state for a single relay - combines sync needs with connection lifecycle
53#[derive(Debug)]
54pub struct RelayState {
55 /// Repos we have confirmed syncing from this relay
56 pub repos: HashSet<String>,
57 /// Root events we have confirmed tracking
58 pub root_events: HashSet<EventId>,
59 /// If true, never disconnect this relay
60 pub is_bootstrap: bool,
61 /// Current connection status
62 pub connection_status: ConnectionStatus,
63 /// When we last successfully connected - used for since filter on reconnect
64 pub last_connected: Option<Timestamp>,
65 /// When we disconnected - for 15-minute state retention rule
66 pub disconnected_at: Option<Timestamp>,
67}
45 68
46 RC <-->|WebSocket + NEG| R1 69impl RelayState {
47 RC <-->|WebSocket + NEG| R2 70 /// Check if state should be cleared based on 15-minute rule
48 RC <-->|WebSocket + NEG| R3 71 pub fn should_clear_state(&self) -> bool {
72 match self.disconnected_at {
73 Some(disconnected) => {
74 let now = Timestamp::now();
75 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
76 }
77 None => false, // Still connected or never connected
78 }
79 }
49 80
50 RH -->|exposes state| MET 81 /// Clear repos and root_events - called when reconnect takes > 15 minutes
82 pub fn clear_sync_state(&mut self) {
83 self.repos.clear();
84 self.root_events.clear();
85 }
86}
51``` 87```
52 88
53**Key Insight: Self-Subscribe Architecture** 89### PendingSyncIndex (In-Flight Batches)
54 90
55The SyncManager uses a "self-subscribe" pattern for relay discovery. Rather than polling the database periodically, it connects to its own WebSocket server as a client and subscribes to kind 30617 events. When new announcements are saved (from any source), the self-subscriber receives them instantly and can spawn connections to newly discovered relays. 91```rust
92/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
93/// Each batch has its own ID and can confirm independently.
94/// Key: relay URL
95pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
96
97#[derive(Debug, Clone)]
98pub struct PendingBatch {
99 /// Unique ID for this batch - for debugging/logging
100 pub batch_id: u64,
101 /// The items this batch is syncing
102 pub items: PendingItems,
103 /// Subscription IDs that must ALL receive EOSE before confirming
104 pub outstanding_subs: HashSet<SubscriptionId>,
105}
56 106
57## Connection Management 107#[derive(Debug, Clone, Default)]
108pub struct PendingItems {
109 pub repos: HashSet<String>,
110 pub root_events: HashSet<EventId>,
111}
112```
58 113
59### Relay Discovery 114---
60 115
61Relays to connect to are discovered using a **self-subscribe architecture** rather than periodic polling. The SyncManager connects to its own relay as a client and subscribes to kind 30617 (repository announcement) events. When a new announcement is saved to the database (from direct submission or sync), the self-subscriber receives it immediately and discovers new relays to connect to. 116## Connection Lifecycle State Machine
62 117
63```mermaid 118```mermaid
64flowchart LR 119stateDiagram-v2
65 subgraph Relay 120 [*] --> Disconnected: discover relay via RepoSyncIndex
66 WS[WebSocket Server] 121 Disconnected --> Connecting: AddFilters triggers spawn_connection
67 DB[(Database)] 122 Connecting --> Connected: success
68 end 123 Connecting --> Disconnected: failure + record in health tracker
69 124 Connected --> Disconnected: connection lost
70 subgraph SyncManager 125 Connected --> [*]: intentional disconnect via check_disconnects
71 SS[Self-Subscribe Client] 126
72 RC[Remote Connections] 127 note right of Disconnected: disconnected_at set for 15min rule
73 end 128 note right of Connected: last_connected tracked for since filter
74
75 WS -->|broadcast| SS
76 SS -->|extract relay URLs| RC
77 RC -->|sync events| WS
78``` 129```
79 130
80**Why Self-Subscribe vs Polling?** 131---
81 132
82| Approach | Latency | Complexity | Resource Use | 133## Flow Scenarios
83|----------|---------|------------|--------------|
84| Self-Subscribe | Instant | Low | Minimal (1 WS connection) |
85| Periodic Polling | 30s+ delay | Higher | DB queries every N seconds |
86 134
87The self-subscribe approach provides: 135### Scenario 1: Initial Connect via handle_connect_or_reconnect
88- **Immediate discovery**: New relays discovered instantly when announcement saved
89- **No polling overhead**: No periodic database queries
90- **Simple architecture**: Reuses existing WebSocket infrastructure
91 136
92**Implementation Pattern:** 137```mermaid
138flowchart TB
139 START[Startup] --> SS[Self-subscribe to own relay]
140 SS --> |no since filter| EVENTS[Receive historical events]
141 EVENTS --> RSI[Update RepoSyncIndex]
142 RSI --> DT[derive_relay_targets]
143 DT --> CA[compute_actions with targets and empty confirmed]
144 CA --> AF[AddFilters for each relay]
145 AF --> SPAWN{Relay connected?}
146 SPAWN --> |no| CONN[spawn_connection]
147 CONN --> HC[handle_connect_or_reconnect]
148 SPAWN --> |yes| SUB
149
150 subgraph handle_connect_or_reconnect - Fresh Sync
151 HC --> CHECK_FRESH{is_fresh_sync?}
152 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since]
153 L1 --> RCA[recompute_actions_for_relay]
154 end
93 155
94```rust 156 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters]
95// In SyncManager::run() 157 SUB --> PB[Create PendingBatch]
96let self_client = Client::default(); 158 PB --> EOSE[Wait for EOSE]
97self_client.add_relay(&own_relay_url).await?; 159 EOSE --> CONFIRM[Move items to confirmed repos/root_events]
98self_client.connect().await;
99
100let filter = Filter::new().kind(Kind::Custom(30617));
101self_client.subscribe(filter, None).await?;
102
103// Handle notifications - when announcement arrives, extract relay URLs
104client.handle_notifications(|notification| async {
105 if let RelayPoolNotification::Event { event, .. } = notification {
106 let new_urls = filter_service.extract_relay_urls_from_event(&event);
107 for url in new_urls {
108 if !active_relays.contains(&url) && !is_own_relay(&url) {
109 spawn_connection(url, tx.clone(), filter_service.clone());
110 }
111 }
112 }
113 Ok(false) // Continue processing
114});
115``` 160```
116 161
117**Startup Discovery:** At startup, existing announcements in the database are queried once to discover initial relays. After startup, all discovery is event-driven via self-subscribe. 162**Key points:**
118 163
119**Reconnection:** The self-subscriber has built-in exponential backoff reconnection (1s → 60s max) to handle temporary disconnections from our own relay. 164- No `since` filter on initial connect - get full history
165- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()`
166- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
167- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking
120 168
121### Connection Lifecycle 169### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes
122 170
123```mermaid 171```mermaid
124stateDiagram-v2 172flowchart TB
125 [*] --> Connecting: startup/new relay 173 DISC[Connection lost] --> MARK[Set disconnected_at = now]
126 Connecting --> Connected: success 174 MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay]
127 Connecting --> Backoff: failure 175 CLEAR_PEND --> WAIT[Wait for reconnection]
128 Connected --> Disconnected: connection lost 176 WAIT --> RECONN[Connection restored]
129 Disconnected --> Backoff: reconnect failed 177 RECONN --> HC[handle_connect_or_reconnect]
130 Backoff --> Connecting: backoff timer expires 178
131 Backoff --> Dead: 24h continuous failures 179 subgraph handle_connect_or_reconnect - Quick Reconnect
132 Dead --> Connecting: daily retry timer 180 HC --> CHECK{is_fresh_sync?}
133 Connected --> Updating: filter change 181 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min]
134 Updating --> Connected: complete 182 SINCE --> L1[build_announcement_filter - with since]
183 L1 --> L23[rebuild_layer2_and_layer3 - with since]
184 L23 --> RCA[recompute_actions_for_relay]
185 end
186
187 RCA --> AF[AddFilters for new items only]
188 AF --> SUB[Subscribe]
189 SUB --> PB[Create PendingBatch]
190 PB --> EOSE[Wait for EOSE]
191 EOSE --> EXTEND[Extend confirmed state]
135``` 192```
136 193
137### Health Tracking & Backoff 194**Key points:**
138 195
139| State | Behavior | 196- PendingSyncIndex cleared on disconnect (not reconnect)
140| ----------- | --------------------------------------------------- | 197- `handle_connect_or_reconnect`:
141| **Healthy** | Normal operation, immediate reconnect on disconnect | 198 1. `build_announcement_filter(Some(since))` - Layer 1 with since
142| **Backoff** | Exponential backoff: 5s → 10s → 20s → ... → 1h max | 199 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
143| **Dead** | 24h of continuous failures, retry once per day | 200 3. `recompute_actions_for_relay` - check for new items
201- since = last_connected - 15min ensures we catch events during disconnection
144 202
145Health state is **kept in-memory** using a `DashMap` for lock-free concurrent access: 203### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes
146 204
147```rust 205```mermaid
148/// In-memory relay health tracking (NOT persisted to database) 206flowchart TB
149/// 207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect]
150/// Design rationale: For <100 relays, persistence adds complexity without
151/// significant benefit. Conservative initial backoff on restart avoids
152/// thundering herd issues.
153struct RelayHealthTracker {
154 health: DashMap<RelayUrl, RelayHealth>,
155 metrics: SyncMetrics, // Prometheus metrics for operator visibility
156}
157 208
158struct RelayHealth { 209 subgraph handle_connect_or_reconnect - Stale Reconnect
159 url: RelayUrl, 210 HC --> CHECK{is_fresh_sync?}
160 status: RelayStatus, // Healthy, Backoff, Dead 211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state]
161 consecutive_failures: u32, 212 CLEAR --> L1[build_announcement_filter - no since]
162 last_failure_at: Option<Instant>, 213 L1 --> RCA[recompute_actions_for_relay]
163 last_success_at: Option<Instant>, 214 end
164 next_retry_at: Instant,
165}
166 215
167enum RelayStatus { 216 RCA --> CA[compute_actions with empty confirmed]
168 Healthy, 217 CA --> AF[AddFilters for everything]
169 Backoff { attempt: u32 }, // backoff = min(5 * 2^attempt, 3600) seconds 218 AF --> SUB[Subscribe - no since filter]
170 Dead, // retry in 24h 219 SUB --> PB[Create PendingBatch]
171} 220 PB --> EOSE[Wait for EOSE]
221 EOSE --> CONFIRM[Populate confirmed state fresh]
172``` 222```
173 223
174### Restart Behavior (Graceful Degradation) 224**Key points:**
175 225
176On restart, all relay health state is reset. To avoid thundering herd: 226- `should_clear_state()` returns true → triggers fresh sync
227- Same path as initial connect after clearing state
228- Layer 1: `build_announcement_filter(None)` - full history
229- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything
177 230
1781. **Conservative initial backoff**: Start with 5s delay (not immediate) for all relays 231### Scenario 4: Consolidation - Triggered on Filter Add
1792. **Staggered connection attempts**: Add random jitter (0-2s) per relay
1803. **Health rebuilds organically**: Relays prove themselves healthy through successful connections
181 232
182```rust 233```mermaid
183impl RelayHealthTracker { 234flowchart TB
184 fn new(metrics: SyncMetrics) -> Self { 235 AF[handle_add_filters called] --> COUNT{current + new > 70?}
185 Self { 236 COUNT --> |yes| CONSOLIDATE[consolidate]
186 health: DashMap::new(), 237 CONSOLIDATE --> WAIT_PEND[wait_pending_complete]
187 metrics, 238 WAIT_PEND --> CLOSE[unsubscribe_all]
188 } 239 CLOSE --> SINCE[since = now - 15min]
189 } 240 SINCE --> L1[build_announcement_filter - with since]
241 L1 --> L23[rebuild_layer2_and_layer3 - with since]
242 COUNT --> |no| SUB[Subscribe new filters]
243 SUB --> PB[Create PendingBatch]
244```
190 245
191 /// Called on startup for each discovered relay 246**Key points:**
192 fn initialize_relay(&self, url: RelayUrl) {
193 self.health.insert(url.clone(), RelayHealth {
194 url,
195 status: RelayStatus::Backoff { attempt: 0 }, // Start conservative
196 consecutive_failures: 0,
197 last_failure_at: None,
198 last_success_at: None,
199 next_retry_at: Instant::now() + Self::initial_backoff_with_jitter(),
200 });
201 }
202 247
203 fn initial_backoff_with_jitter() -> Duration { 248- Consolidation checked in `handle_add_filters` BEFORE adding new filters
204 Duration::from_secs(5) + Duration::from_millis(rand::random::<u64>() % 2000) 249- After closing all subscriptions, re-subscribe:
205 } 250 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
206} 251 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
252- `since = now - 15min` prevents re-fetching old events
253- Keeps confirmed state, just reduces filter count
254
255### Scenario 5: Daily Timer - 23 to 25h Random
256
257```mermaid
258flowchart TB
259 DAILY[Daily timer fires] --> CLOSE[unsubscribe_all]
260 CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay]
261 CLEAR_PEND --> CLEAR_STATE[clear_sync_state]
262 CLEAR_STATE --> L1[build_announcement_filter - no since]
263 L1 --> RCA[recompute_actions_for_relay]
264 RCA --> CA[compute_actions with empty confirmed]
265 CA --> AF[AddFilters for everything]
266 AF --> SUB[Subscribe - no since filter]
267 SUB --> PB[Create PendingBatch]
268 PB --> EOSE[Wait for EOSE]
269 EOSE --> CONFIRM[Repopulate confirmed state]
207``` 270```
208 271
209**Trade-off**: We lose knowledge of chronically failing relays across restarts. This is acceptable because: 272**Key points:**
273
274- Daily timer is a full fresh sync, NOT consolidation
275- Clears both PendingSyncIndex and confirmed state
276- Layer 1: `build_announcement_filter(None)` - full history
277- Layer 2+3: via compute_actions with empty confirmed - full history
278- Detects any state drift accumulated over 24 hours
210 279
211- Scale is small (<100 relays) 280---
212- Conservative initial backoff prevents hammering bad relays
213- Prometheus metrics preserve historical health data for operators
214 281
215## Filter Strategy 282## Core Algorithms
216 283
217### Unified Filters for Live Sync and Negentropy 284### 1. derive_relay_targets
218 285
219The same filter logic is used for both live subscriptions and negentropy reconciliation: 286Transform RepoSyncIndex into per-relay sync targets:
220 287
221```mermaid 288```rust
222flowchart LR 289/// Inverts RepoSyncIndex to get per-relay view
223 subgraph Filter Layers 290fn derive_relay_targets(
224 F1[Layer 1: All 30617+30618] 291 repo_index: &HashMap<String, RepoSyncNeeds>
225 F2[Layer 2: Events tagging repos via A/a/q] 292) -> HashMap<String, RelaySyncNeeds> {
226 F3[Layer 3: Events tagging PRs/Issues via E/e/q] 293 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
227 end 294
295 for (repo_ref, needs) in repo_index {
296 for relay_url in &needs.relays {
297 let target = targets.entry(relay_url.clone()).or_default();
298 target.repos.insert(repo_ref.clone());
299 target.root_events.extend(needs.root_events.iter().cloned());
300 }
301 }
228 302
229 F1 -->|client-side| AP[Acceptance Policy] 303 targets
230 F2 -->|server-side| Relay 304}
231 F3 -->|server-side| Relay
232``` 305```
233 306
234### Layer 1: Repository Announcements & States 307### 2. compute_actions (Three-Way Diff)
235 308
236Get ALL kind 30617 and 30618 events with unified `since` timestamp, then filter client-side through acceptance policy: 309**This is the ONLY decision point for what NEW subscriptions to create.**
237 310
238```rust 311```rust
239// Use same since filter as other layers for consistency 312/// Computes AddFilters for items that are:
240let layer1_filter = Filter::new() 313/// - In targets (what we want)
241 .kinds([Kind::from(30617), Kind::from(30618)]) 314/// - NOT in pending (already in-flight)
242 .since(since_timestamp); // Unified with Layer 2/3 315/// - NOT in confirmed (already confirmed)
316fn compute_actions(
317 targets: &HashMap<String, RelaySyncNeeds>,
318 pending: &HashMap<String, Vec<PendingBatch>>,
319 confirmed: &HashMap<String, RelayState>,
320) -> Vec<AddFilters> {
321 let mut actions = Vec::new();
322
323 for (relay_url, target) in targets {
324 // Skip disconnected relays - they will get AddFilters on reconnect
325 if let Some(state) = confirmed.get(relay_url) {
326 if state.connection_status != ConnectionStatus::Connected {
327 continue;
328 }
329 }
330
331 // Collect all pending items for this relay
332 let pending_repos: HashSet<_> = pending.get(relay_url)
333 .map(|batches| batches.iter()
334 .flat_map(|b| b.items.repos.iter().cloned())
335 .collect())
336 .unwrap_or_default();
337 let pending_events: HashSet<_> = pending.get(relay_url)
338 .map(|batches| batches.iter()
339 .flat_map(|b| b.items.root_events.iter().cloned())
340 .collect())
341 .unwrap_or_default();
342
343 // Collect confirmed items for this relay
344 let confirmed_repos = confirmed.get(relay_url)
345 .map(|c| &c.repos)
346 .unwrap_or(&HashSet::new());
347 let confirmed_events = confirmed.get(relay_url)
348 .map(|c| &c.root_events)
349 .unwrap_or(&HashSet::new());
350
351 // New = target - pending - confirmed
352 let new_repos: HashSet<_> = target.repos.iter()
353 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
354 .cloned()
355 .collect();
356 let new_events: HashSet<_> = target.root_events.iter()
357 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
358 .cloned()
359 .collect();
360
361 if !new_repos.is_empty() || !new_events.is_empty() {
362 let filters = build_filters(&new_repos, &new_events);
363 actions.push(AddFilters {
364 relay_url: relay_url.clone(),
365 repos: new_repos,
366 root_events: new_events,
367 filters,
368 });
369 }
370 }
371
372 actions
373}
243``` 374```
244 375
245**Client-side validation**: Only store events that pass our [`Nip34WritePolicy`](src/nostr/builder.rs:51). 376### 3. Filter Building Functions (Three-Layer Strategy)
246 377
247### Layer 2: Events Tagging Repositories 378The filter strategy uses three layers:
248 379
249For repo announcements **that list BOTH this relay AND our service**: 380- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
381- **Layer 2**: Events tagging our repos
382- **Layer 3**: Events tagging our root events
383
384**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
250 385
251```rust 386```rust
252// Build addressable references: 30617:<pubkey>:<identifier> 387/// Layer 1: Announcements filter (kinds 30617 + 30618)
253let repo_refs: Vec<String> = announcements 388/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
254 .iter() 389/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
255 .filter(|a| a.relays.contains(&this_relay) && a.lists_service(&our_domain)) 390fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
256 .map(|a| format!("30617:{}:{}", a.pubkey.to_hex(), a.identifier)) 391 let filter = Filter::new().kinds([
257 .collect(); 392 Kind::Custom(30617), // Repository announcements
258 393 Kind::Custom(30618), // Maintainer lists
259let layer2_filter = Filter::new() 394 ]);
260 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_refs.clone()) 395
261 .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo_refs)); 396 match since {
262``` 397 Some(ts) => filter.since(ts),
398 None => filter,
399 }
400}
263 401
264### Layer 3: Events Tagging Issues/PRs/Patches 402/// Layer 2: Events tagging one of our repos
403/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
404/// Batched per 100 repo refs.
405fn tagged_one_of_our_repo_event_filters(
406 repos: &HashSet<String>,
407 since: Option<Timestamp>,
408) -> Vec<Filter> {
409 let mut filters = Vec::new();
410 let repo_refs: Vec<_> = repos.iter().collect();
411
412 for chunk in repo_refs.chunks(100) {
413 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
414
415 // Lowercase 'a' tag - standard addressable reference
416 let mut f1 = Filter::new()
417 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
418 // Uppercase 'A' tag - some clients use this
419 let mut f2 = Filter::new()
420 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone());
421 // Quote 'q' tag - NIP-10 quote references to addressable events
422 let mut f3 = Filter::new()
423 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
424
425 if let Some(ts) = since {
426 f1 = f1.since(ts);
427 f2 = f2.since(ts);
428 f3 = f3.since(ts);
429 }
265 430
266For events that reference PRs, Patches, or Issues from repos we track: 431 filters.push(f1);
432 filters.push(f2);
433 filters.push(f3);
434 }
267 435
268```rust 436 filters
269// Collect event IDs of PRs, Patches, Issues we've stored 437}
270let tagged_event_ids: Vec<EventId> = database
271 .query(Filter::new().kinds([1618, 1619, 1621, 1622, 1630])) // PR, PR Update, Issue, Patch, etc.
272 .iter()
273 .filter(|e| references_tracked_repo(e, &announcements))
274 .map(|e| e.id)
275 .collect();
276
277let layer3_filter = Filter::new()
278 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), tagged_event_ids.clone())
279 .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), tagged_event_ids));
280```
281 438
282### Filter Size Management 439/// Layer 3: Events tagging one of our root events
440/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
441/// Batched per 100 event IDs.
442fn tagged_one_of_our_root_event_filters(
443 root_events: &HashSet<EventId>,
444 since: Option<Timestamp>,
445) -> Vec<Filter> {
446 let mut filters = Vec::new();
447 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
448
449 for chunk in event_ids.chunks(100) {
450 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
451
452 // Lowercase 'e' tag - standard event reference
453 let mut f1 = Filter::new()
454 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
455 // Uppercase 'E' tag - some clients use this
456 let mut f2 = Filter::new()
457 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone());
458 // Quote 'q' tag - NIP-10 quote references to events
459 let mut f3 = Filter::new()
460 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
461
462 if let Some(ts) = since {
463 f1 = f1.since(ts);
464 f2 = f2.since(ts);
465 f3 = f3.since(ts);
466 }
283 467
284When the tag list exceeds a threshold, split into batches: 468 filters.push(f1);
469 filters.push(f2);
470 filters.push(f3);
471 }
285 472
286```rust 473 filters
287const MAX_TAGS_PER_FILTER: usize = 100; 474}
288 475
289fn build_filters(tag_values: Vec<String>) -> Vec<Filter> { 476/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
290 tag_values 477/// Used by:
291 .chunks(MAX_TAGS_PER_FILTER) 478/// - compute_actions for incremental subscriptions
292 .map(|chunk| Filter::new().custom_tag(tag, chunk.to_vec())) 479/// - consolidation rebuilds (Layer 1 remains active)
293 .collect() 480fn build_layer2_and_layer3_filters(
481 repos: &HashSet<String>,
482 root_events: &HashSet<EventId>,
483 since: Option<Timestamp>,
484) -> Vec<Filter> {
485 let mut filters = Vec::new();
486 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
487 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
488 filters
294} 489}
295``` 490```
296 491
297**Consolidation**: When total filter count exceeds ~150 across a connection, consolidate by rebuilding from scratch. 492**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently.
298 493
299### Filter Generation vs. Policy Validation 494### 4. handle_add_filters (SyncManager)
300 495
301The filter strategy and acceptance policies serve **different purposes** even though they share conceptual knowledge: 496```rust
497impl SyncManager {
498 async fn handle_add_filters(&mut self, action: AddFilters) {
499 let AddFilters { relay_url, repos, root_events, filters } = action;
500
501 // Auto-spawn connection if needed
502 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
503 match state {
504 None => {
505 // New relay discovered - create entry and spawn connection
506 self.relay_sync_index.write().await.insert(
507 relay_url.clone(),
508 RelayState {
509 repos: HashSet::new(),
510 root_events: HashSet::new(),
511 is_bootstrap: false,
512 connection_status: ConnectionStatus::Connecting,
513 last_connected: None,
514 disconnected_at: None,
515 connection: None,
516 }
517 );
518 self.spawn_connection(&relay_url).await;
519 return; // Subscriptions will happen on connection success
520 }
521 Some(state) if state.connection_status != ConnectionStatus::Connected => {
522 // Not connected - subscriptions will happen on connection success
523 return;
524 }
525 Some(_) => {
526 // Already connected - proceed with subscription
527 }
528 }
302 529
303| Concern | Filters | Policies | 530 // Subscribe and collect subscription IDs
304|---------|---------|----------| 531 let conn = self.connections.get(&relay_url).unwrap();
305| **Direction** | What to request FROM remote relays | What to accept INTO local database | 532 let mut sub_ids = HashSet::new();
306| **Input** | Stored events (announcements, PRs, etc.) | Single incoming event | 533
307| **Output** | Filter specification | Accept/Reject decision | 534 for filter in filters {
535 match conn.client.subscribe(filter, None).await {
536 Ok(output) => {
537 for sub_id in output.val {
538 sub_ids.insert(sub_id);
539 }
540 }
541 Err(e) => {
542 tracing::warn!("Failed to subscribe: {}", e);
543 }
544 }
545 }
308 546
309The modular sub-policies ([`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24), [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25), etc.) encode knowledge about event kinds and tag types, but this knowledge is applied differently: 547 // Create pending batch
548 let batch = PendingBatch {
549 batch_id: self.next_batch_id(),
550 items: PendingItems { repos, root_events },
551 outstanding_subs: sub_ids,
552 };
310 553
311- **In filters**: We enumerate **all** addressable refs (`30617:pubkey:id`) from stored announcements 554 // Add to pending index
312- **In policies**: [`RelatedEventPolicy::check_references()`](../../src/nostr/policy/related.rs:39) checks if incoming event references **any** accepted event 555 self.pending_sync_index.write().await
556 .entry(relay_url)
557 .or_default()
558 .push(batch);
559 }
560}
561```
313 562
314Because of this fundamental difference, filter generation logic stays in `src/sync/filter.rs` rather than being delegated to policy modules. Both share the understanding of NIP-34 event relationships, but they answer different questions. 563### 5. handle_disconnect
315 564
316## Subscription Updates 565```rust
566impl SyncManager {
567 /// Called when connection to a relay is lost
568 async fn handle_disconnect(&mut self, relay_url: &str) {
569 let mut index = self.relay_sync_index.write().await;
570
571 if let Some(state) = index.get_mut(relay_url) {
572 state.connection_status = ConnectionStatus::Disconnected;
573 state.disconnected_at = Some(Timestamp::now());
574 state.connection = None;
575 }
317 576
318### Dynamic Subscription Management 577 // Clear pending batches - these items were not confirmed
578 self.pending_sync_index.write().await.remove(relay_url);
319 579
320When new events arrive that affect our filter criteria: 580 // Remove from active connections map
581 self.connections.remove(relay_url);
321 582
322```mermaid 583 // Health tracker records failure for backoff
323sequenceDiagram 584 self.health_tracker.record_failure(relay_url);
324 participant LocalRelay 585 }
325 participant SyncManager 586}
326 participant RemoteRelay
327
328 LocalRelay->>SyncManager: New PR event accepted
329 SyncManager->>SyncManager: Extract event ID
330 SyncManager->>SyncManager: Build new filter for E/e/q tags
331 SyncManager->>RemoteRelay: REQ with new filter
332 RemoteRelay-->>SyncManager: Events matching new filter
333``` 587```
334 588
335**Events that trigger subscription updates**: 589### 6. handle_connect_or_reconnect (Unified)
336 590
337- New repository announcement accepted (adds to Layer 2) 591This method handles BOTH initial connection AND reconnection with unified logic:
338- New PR/Issue/Patch accepted (adds to Layer 3)
339 592
340### When to Consolidate 593```rust
594impl SyncManager {
595 /// Called when connection to a relay succeeds - handles both initial connect and reconnect.
596 ///
597 /// Decision tree:
598 /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history
599 /// - Quick reconnect (<15min): since = last_connected - 15min
600 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
601 let mut index = self.relay_sync_index.write().await;
602 let state = match index.get_mut(relay_url) {
603 Some(s) => s,
604 None => return, // Relay was removed while disconnected
605 };
341 606
342Track subscription count per connection: 607 // Determine if this is a fresh sync or quick reconnect
608 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
609 let last_connected = state.last_connected;
343 610
344```rust 611 if is_fresh_sync && state.last_connected.is_some() {
345struct ConnectionState { 612 // Stale reconnect (>15min) - clear state
346 relay_url: RelayUrl, 613 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
347 subscriptions: Vec<SubscriptionId>, 614 state.clear_sync_state();
348 total_filter_count: usize, 615 }
349}
350 616
351impl ConnectionState { 617 // Update connection state
352 fn should_consolidate(&self) -> bool { 618 state.connection_status = ConnectionStatus::Connected;
353 self.total_filter_count > 150 619 state.last_connected = Some(Timestamp::now());
354 } 620 state.disconnected_at = None;
355 621
356 async fn consolidate(&mut self) { 622 // Record success in health tracker
357 // Close all subscriptions 623 self.health_tracker.record_success(relay_url);
358 // Rebuild from scratch with current database state
359 }
360}
361```
362 624
363## Negentropy Catchup 625 drop(index); // Release lock
364 626
365### NIP-77 Reconciliation Protocol 627 let conn = match self.connections.get(relay_url) {
628 Some(c) => c,
629 None => return,
630 };
366 631
367Negentropy enables efficient set reconciliation - discovering which events we're missing without transferring full event lists. 632 if is_fresh_sync {
633 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
368 634
369### Timing 635 // Step 1: Subscribe Layer 1 (announcements) without since
636 let layer1 = build_announcement_filter(None);
637 let _ = conn.client.subscribe(layer1, None).await;
370 638
371| Trigger | Behavior | 639 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build)
372| ------------------- | -------------------------------------------------------------------- | 640 self.recompute_actions_for_relay(relay_url).await;
373| **Initial startup** | Warm-up delay, staggered if many filters, initializes daily schedule | 641 } else {
374| **After reconnect** | Delay to avoid rate limiting, limited to events from last 3 days | 642 // Quick reconnect: Layer 1 with since, Layer 2+3 with since
375| **Daily** | Staggered batches, max 100 tagged events per filter | 643 let since = last_connected
644 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
645 .unwrap_or(Timestamp::from(0));
376 646
377### Startup Flow 647 // Step 1: Subscribe Layer 1 (announcements) with since
648 let layer1 = build_announcement_filter(Some(since));
649 let _ = conn.client.subscribe(layer1, None).await;
378 650
379```mermaid 651 // Step 2: Rebuild Layer 2+3 for confirmed items with since
380sequenceDiagram 652 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
381 participant Server
382 participant SyncManager
383 participant Relay
384
385 Server->>SyncManager: Start
386 SyncManager->>SyncManager: Wait warm-up delay
387 SyncManager->>SyncManager: Build batched filters
388
389 loop For each relay with stagger delay
390 SyncManager->>Relay: NEG-OPEN with filter batch 1
391 Relay-->>SyncManager: NEG-MSG with differences
392 SyncManager->>Relay: NEG-MSG response
393 Note over SyncManager,Relay: Reconciliation rounds
394 Relay-->>SyncManager: NEG-CLOSE or events
395 SyncManager->>SyncManager: Validate + store events
396
397 alt More batches
398 SyncManager->>SyncManager: Wait stagger delay
399 SyncManager->>Relay: NEG-OPEN with next batch
400 end
401 end
402 653
403 SyncManager->>SyncManager: Schedule daily catchup 654 // Step 3: Check for NEW items via compute_actions
404``` 655 self.recompute_actions_for_relay(relay_url).await;
656 }
657 }
405 658
406### Reconnection Catchup 659 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1).
660 /// Used by:
661 /// - Quick reconnect: rebuild confirmed items with since filter
662 /// - Consolidation: close and rebuild with since filter
663 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
664 let confirmed = self.relay_sync_index.read().await;
665 let state = match confirmed.get(relay_url) {
666 Some(s) => s,
667 None => return,
668 };
407 669
408After connection reestablished: 670 // Build Layer 2+3 filters WITH since
671 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
672 drop(confirmed);
409 673
410```rust 674 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
411async fn catchup_after_reconnect(&self, relay: &RelayUrl) { 675 let conn = match self.connections.get(relay_url) {
412 // Delay to avoid immediate disconnect for too many requests 676 Some(c) => c,
413 tokio::time::sleep(RECONNECT_CATCHUP_DELAY).await; 677 None => return,
678 };
679
680 for filter in filters {
681 let _ = conn.client.subscribe(filter, None).await;
682 }
683 }
684
685 /// Rerun compute_actions for a specific relay and process resulting AddFilters.
686 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
687 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
688 let repo_index = self.repo_sync_index.read().await;
689 let targets = derive_relay_targets(&repo_index);
690 drop(repo_index);
691
692 // Filter to just this relay
693 let target = match targets.get(relay_url) {
694 Some(t) => t.clone(),
695 None => return, // No repos reference this relay anymore
696 };
414 697
415 // Only catch up on recent events (last 3 days) 698 let pending = self.pending_sync_index.read().await;
416 let since = Timestamp::now() - Duration::from_secs(3 * 24 * 60 * 60); 699 let confirmed = self.relay_sync_index.read().await;
417 700
418 let filters = self.build_filters_for_relay(relay) 701 let mut single_relay_targets = HashMap::new();
419 .into_iter() 702 single_relay_targets.insert(relay_url.to_string(), target);
420 .map(|f| f.since(since))
421 .collect();
422 703
423 self.run_negentropy(relay, filters).await; 704 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
705
706 drop(pending);
707 drop(confirmed);
708
709 // Process AddFilters
710 for action in actions {
711 self.handle_add_filters(action).await;
712 }
713 }
424} 714}
425``` 715```
426 716
427### Daily Catchup Schedule 717### 7. Daily Timer
428 718
429```rust 719```rust
430// Daily catchup runs at consistent time, staggered across relays 720impl SyncManager {
431async fn schedule_daily_catchup(&self) { 721 async fn run_daily_timer(&self) {
432 let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60)); 722 loop {
723 // Random 23-25 hours
724 let hours = 23.0 + rand::random::<f64>() * 2.0;
725 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
726
727 let relay_urls: Vec<_> = self.relay_sync_index.read().await
728 .keys()
729 .cloned()
730 .collect();
731
732 for relay_url in relay_urls {
733 self.daily_sync(&relay_url).await;
734 }
735 }
736 }
433 737
434 loop { 738 /// Perform daily fresh sync for a relay
435 interval.tick().await; 739 async fn daily_sync(&mut self, relay_url: &str) {
740 tracing::info!("Daily sync triggered for {}", relay_url);
436 741
437 for (i, relay) in self.healthy_relays().enumerate() { 742 // Close all subscriptions
438 // Stagger: 5 minute delay between relays 743 if let Some(conn) = self.connections.get(relay_url) {
439 tokio::time::sleep(Duration::from_secs(i as u64 * 300)).await; 744 conn.client.unsubscribe_all().await;
745 }
440 746
441 // Batch filters to max 100 tagged events each 747 // Clear PendingSyncIndex
442 let batches = self.build_batched_filters(&relay, 100); 748 self.pending_sync_index.write().await.remove(relay_url);
443 749
444 for batch in batches { 750 // Clear confirmed state - triggers fresh sync
445 self.run_negentropy(&relay, batch).await; 751 {
446 tokio::time::sleep(Duration::from_secs(60)).await; // 1 min between batches 752 let mut index = self.relay_sync_index.write().await;
753 if let Some(state) = index.get_mut(relay_url) {
754 state.clear_sync_state();
447 } 755 }
448 } 756 }
757
758 // Recompute actions - will generate AddFilters for everything
759 self.recompute_actions_for_relay(relay_url).await;
449 } 760 }
450} 761}
451``` 762```
452 763
453## Event Processing 764### 8. Consolidation (Threshold-Based, Triggered on Add)
454 765
455### Acceptance Policy 766Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active.
456 767
457All synced events go through our acceptance policy, reusing the same [`Nip34WritePolicy`](../../src/nostr/builder.rs:36) validation logic used for direct client submissions. 768```rust
769impl SyncManager {
770 /// Check filter count and consolidate if needed.
771 /// Called from handle_add_filters BEFORE adding new filters.
772 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
773 let current_count = self.get_filter_count(relay_url).await;
458 774
459#### Design: Reusing admit_event() 775 if current_count + new_filter_count > 70 {
776 self.consolidate(relay_url).await;
777 }
778 }
460 779
461The [`WritePolicy::admit_event()`](../../src/nostr/builder.rs:256-269) trait method takes a `SocketAddr` parameter designed for client connections: 780 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active.
781 /// Does NOT clear state - just reduces filter count.
782 async fn consolidate(&mut self, relay_url: &str) {
783 tracing::info!("Consolidating filters for {} (count > 70)", relay_url);
462 784
463```rust 785 // Wait for all pending batches to complete first
464// From nostr-relay-builder WritePolicy trait 786 self.wait_pending_complete(relay_url).await;
465fn admit_event<'a>( 787
466 &'a self, 788 // Close Layer 2+3 subscriptions only - Layer 1 remains active
467 event: &'a Event, 789 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately
468 _addr: &'a SocketAddr, // Unused in our implementation 790 // For simplicity, we close all and re-add Layer 1
469) -> BoxedFuture<'a, PolicyResult>; 791 if let Some(conn) = self.connections.get(relay_url) {
792 conn.client.unsubscribe_all().await;
793 }
794
795 // Re-subscribe Layer 1 with since (maintains announcements stream)
796 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
797 let conn = self.connections.get(relay_url).unwrap();
798 let layer1 = build_announcement_filter(Some(since));
799 let _ = conn.client.subscribe(layer1, None).await;
800
801 // Rebuild Layer 2+3 only
802 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
803 }
804}
470``` 805```
471 806
472For synced events from remote relays, we pass a **synthetic localhost address** since: 807**Updated handle_add_filters to check consolidation:**
4731. The `_addr` parameter is currently unused in our [`Nip34WritePolicy`](../../src/nostr/builder.rs:259)
4742. All meaningful validation is done by the modular sub-policies (see below)
4753. This allows reusing 100% of the existing validation logic
476 808
477```rust 809```rust
478use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 810impl SyncManager {
479 811 async fn handle_add_filters(&mut self, action: AddFilters) {
480/// Synthetic address for synced events (not from a direct client connection) 812 let AddFilters { relay_url, repos, root_events, filters } = action;
481const SYNC_SOURCE_ADDR: SocketAddr = SocketAddr::new( 813
482 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 814 // Auto-spawn connection if needed (unchanged)
483 0 815 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
484); 816 match state {
485 817 None => {
486async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> { 818 // New relay discovered - create entry and spawn connection
487 // Apply our Nip34WritePolicy using synthetic address 819 self.relay_sync_index.write().await.insert(
488 // The SocketAddr is unused - all validation is by the modular sub-policies 820 relay_url.clone(),
489 let result = self.acceptance_policy 821 RelayState {
490 .admit_event(&event, &SYNC_SOURCE_ADDR) 822 repos: HashSet::new(),
491 .await; 823 root_events: HashSet::new(),
492 824 is_bootstrap: false,
493 match result { 825 connection_status: ConnectionStatus::Connecting,
494 PolicyResult::Accept => { 826 last_connected: None,
495 self.database.save_event(&event).await?; 827 disconnected_at: None,
496 tracing::debug!( 828 connection: None,
497 "Accepted synced event {} from {}", 829 }
498 event.id.to_hex(), 830 );
499 source_relay 831 self.spawn_connection(&relay_url).await;
500 ); 832 return; // Subscriptions will happen on connection success
501 self.trigger_subscription_updates(&event).await; 833 }
834 Some(state) if state.connection_status != ConnectionStatus::Connected => {
835 return; // Not connected - subscriptions will happen on connection success
836 }
837 Some(_) => {
838 // Already connected - proceed
839 }
502 } 840 }
503 PolicyResult::Reject(reason) => { 841
504 tracing::debug!( 842 // CHECK CONSOLIDATION BEFORE ADDING
505 "Rejected synced event {} from {}: {}", 843 self.maybe_consolidate(&relay_url, filters.len()).await;
506 event.id.to_hex(), 844
507 source_relay, 845 // Subscribe and collect subscription IDs
508 reason 846 let conn = self.connections.get(&relay_url).unwrap();
509 ); 847 let mut sub_ids = HashSet::new();
848
849 for filter in filters {
850 match conn.client.subscribe(filter, None).await {
851 Ok(output) => {
852 for sub_id in output.val {
853 sub_ids.insert(sub_id);
854 }
855 }
856 Err(e) => {
857 tracing::warn!("Failed to subscribe: {}", e);
858 }
859 }
510 } 860 }
511 }
512 861
513 Ok(()) 862 // Create pending batch (unchanged)
863 let batch = PendingBatch {
864 batch_id: self.next_batch_id(),
865 items: PendingItems { repos, root_events },
866 outstanding_subs: sub_ids,
867 };
868
869 self.pending_sync_index.write().await
870 .entry(relay_url)
871 .or_default()
872 .push(batch);
873 }
514} 874}
515``` 875```
516 876
517#### Modular Sub-Policies 877---
518
519The [`Nip34WritePolicy`](../../src/nostr/builder.rs:36-42) delegates to specialized sub-policies in [`src/nostr/policy/`](../../src/nostr/policy/mod.rs:1-41):
520 878
521| Sub-Policy | Kinds | Responsibility | 879## Disconnect (Relay Removal) Handling
522|------------|-------|----------------|
523| [`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24-27) | 30617 | Validates service listing, maintainer exception, creates bare repos |
524| [`StatePolicy`](../../src/nostr/policy/state.rs:43-46) | 30618 | Validates state structure, aligns git refs with authorized state |
525| [`PrEventPolicy`](../../src/nostr/policy/pr_event.rs) | 1618, 1619 | Validates PR/PR Update events, manages refs/nostr/* |
526| [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25-29) | All others | Checks forward/backward references to accepted repos/events |
527 880
528All sub-policies share a common [`PolicyContext`](../../src/nostr/policy/mod.rs:22-27) containing: 881```rust
529- `domain`: Our service domain for validation 882impl SyncManager {
530- `database`: For querying existing events 883 /// Periodically check for relays that should be disconnected
531- `git_data_path`: For git operations 884 async fn check_disconnects(&mut self) {
885 let confirmed = self.relay_sync_index.read().await;
886 let relays_to_disconnect: Vec<_> = confirmed.iter()
887 .filter(|(_, state)| {
888 !state.is_bootstrap &&
889 state.repos.is_empty() &&
890 state.root_events.is_empty()
891 })
892 .map(|(url, _)| url.clone())
893 .collect();
894 drop(confirmed);
895
896 for relay_url in relays_to_disconnect {
897 self.disconnect_relay(&relay_url).await;
898 }
899 }
532 900
533#### Why Not Call Sub-Policies Directly? 901 async fn disconnect_relay(&mut self, relay_url: &str) {
902 tracing::info!("Disconnecting relay {} (no repos)", relay_url);
534 903
535While we could bypass `admit_event()` and call sub-policies directly: 904 self.relay_sync_index.write().await.remove(relay_url);
905 self.pending_sync_index.write().await.remove(relay_url);
536 906
537```rust 907 if let Some(conn) = self.connections.remove(relay_url) {
538// Alternative: Direct sub-policy calls (NOT recommended) 908 let _ = conn.client.disconnect().await;
539match event.kind.as_u16() { 909 }
540 30617 => self.announcement_policy.validate(&event).await, 910 }
541 30618 => self.state_policy.validate(&event),
542 1618 | 1619 => self.pr_event_policy.validate_nostr_ref(&event).await,
543 _ => self.related_event_policy.check_references(&event).await,
544} 911}
545``` 912```
546 913
547This is **not recommended** because: 914---
5481. Duplicates the kind-routing logic from [`admit_event()`](../../src/nostr/builder.rs:261-268)
5492. Misses important post-validation steps (e.g., `handle_announcement()` also calls `ensure_bare_repository()`)
5503. Creates maintenance burden when policy logic changes
551 915
552## Module Structure 916## State Flow Summary
553 917
554### New `src/sync/` Module 918```mermaid
919flowchart TB
920 subgraph Input
921 SS[SelfSubscriber]
922 OWN[Own Relay]
923 end
555 924
556``` 925 subgraph RepoSyncIndex - What We Want
557src/ 926 RSI[HashMap: Repo to Relays+Events]
558├── sync/ 927 end
559│ ├── mod.rs # Module exports
560│ ├── manager.rs # SyncManager - main coordinator
561│ ├── connection.rs # Per-relay connection handling
562│ ├── filter.rs # Filter building and batching
563│ ├── health.rs # RelayHealth tracking
564│ ├── negentropy.rs # NIP-77 reconciliation logic
565│ └── subscription.rs # Dynamic subscription management
566├── nostr/
567│ └── ... (existing)
568└── ...
569```
570 928
571### Integration with Main Binary 929 subgraph Derived Target
930 DT[derive_relay_targets fn]
931 TGT[Per-relay: repos + events we should sync]
932 end
572 933
573```rust 934 subgraph compute_actions - Decision Point
574// In main.rs 935 CA[Three-way diff: target - pending - confirmed]
575async fn main() -> Result<()> { 936 end
576 // ... existing setup ...
577 937
578 // Start sync manager as background task 938 subgraph PendingSyncIndex - In Flight
579 let sync_manager = SyncManager::new( 939 PSI[Vec PendingBatch per relay]
580 database.clone(), 940 end
581 config.domain.clone(),
582 );
583 941
584 tokio::spawn(async move { 942 subgraph RelaySyncIndex - Confirmed State
585 sync_manager.run().await 943 RLI[RelayState per relay]
586 }); 944 CONN[connection_status]
945 REPOS[repos + root_events]
946 TIMES[last_connected + disconnected_at]
947 end
587 948
588 // ... rest of server startup ... 949 SS -->|subscribe| OWN
589} 950 OWN -->|events| SS
951 SS -->|batch fires| RSI
952 RSI --> DT
953 DT --> TGT
954 TGT --> CA
955 PSI --> CA
956 RLI --> CA
957 CA -->|Layer 2+3 new items| AF[AddFilters]
958 AF -->|check filter count| CONSOL{count + new > 70?}
959 CONSOL -->|yes| CONSOLIDATE[consolidate]
960 CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since]
961 L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since]
962 CONSOL -->|no| SUB[subscribe]
963 AF -->|spawn if needed| CONN
964 SUB --> PSI
965 PSI -->|EOSE| REPOS
966
967 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
968 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
969
970 subgraph handle_connect_or_reconnect
971 HC --> FRESH_CHECK{is_fresh_sync?}
972 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
973 FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since]
974 L1_FRESH --> RCA1[recompute_actions_for_relay]
975 L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since]
976 L23_QUICK --> RCA2[recompute_actions_for_relay]
977 end
590``` 978```
591 979
592## Metrics & Observability 980---
981
982## Key Design Decisions
983
984| Decision | Choice | Rationale |
985| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- |
986| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
987| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
988| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
989| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
990| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
991| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events |
992| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
993| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
994| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
995| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
996| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream |
997| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect |
998| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
999| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
1000| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
1001| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
1002| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
593 1003
594All sync metrics are exposed via Prometheus at `/metrics`. For <100 relays, per-relay labels are acceptable cardinality. 1004---
595 1005
596### Prometheus Metrics 1006## Module Structure
597 1007
598```rust 1008```
599/// Sync module metrics registered with the global Prometheus registry 1009src/sync/
600pub struct SyncMetrics { 1010├── mod.rs # SyncManager, main loop
601 // === Connection Metrics (per relay) === 1011├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
602 /// Active outbound connections: ngit_sync_relay_connected{relay="wss://..."} 1012├── actions.rs # AddFilters struct, compute_actions, build_filters
603 relay_connected: IntGaugeVec, // labels: [relay] 1013├── self_subscriber.rs # SelfSubscriber, batching logic
1014├── relay_connection.rs # Per-relay WebSocket connection
1015├── consolidation.rs # Consolidation logic, daily timer
1016├── health.rs # Health tracking (reuse from v2)
1017└── metrics.rs # Prometheus metrics (reuse from v2)
1018```
604 1019
605 /// Connection attempts: ngit_sync_connection_attempts_total{relay="wss://...", result="success|failure"} 1020---
606 connection_attempts: CounterVec, // labels: [relay, result]
607 1021
608 // === Relay Health Status === 1022## Comparison: v3 vs v4
609 /// Current status: ngit_sync_relay_status{relay="wss://...", status="healthy|backoff|dead"}
610 relay_status: IntGaugeVec, // labels: [relay, status]
611 1023
612 /// Consecutive failures: ngit_sync_relay_failures{relay="wss://..."} 1024| Aspect | v3 | v4 |
613 relay_failures: IntGaugeVec, // labels: [relay] 1025| ------------------------ | ----------------------------------------- | --------------------------------------------- |
1026| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1027| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1028| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1029| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1030| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1031| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1032| PSI clearing | On disconnect | On disconnect (confirmed) |
1033| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
614 1034
615 // === Event Source Tracking === 1035---
616 /// Events received by source: ngit_sync_events_total{source="direct|live_sync|catchup|daily_catchup"}
617 events_total: CounterVec, // labels: [source]
618 1036
619 /// Sync gap events (should have been live synced): ngit_sync_gap_events_total{relay="wss://..."} 1037## Self-Subscriber Flow
620 sync_gap_events: CounterVec, // labels: [relay]
621 1038
622 // === Aggregate Metrics === 1039The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions.
623 /// Total relays being tracked
624 relays_tracked_total: IntGauge,
625 1040
626 /// Relays currently connected 1041### State Tracking
627 relays_connected_total: IntGauge,
628 1042
629 /// Relays in dead state 1043```rust
630 relays_dead_total: IntGauge, 1044pub struct SelfSubscriber {
1045 own_relay_url: String,
1046 relay_domain: String,
1047 repo_sync_index: RepoSyncIndex,
1048 pending_sync_index: PendingSyncIndex,
1049 relay_sync_index: RelaySyncIndex,
1050 action_tx: mpsc::Sender<AddFilters>,
1051 /// Timestamp of last successful connection - used for since filter on reconnection
1052 last_connected: Option<Timestamp>,
1053 /// The active client connection
1054 client: Option<Client>,
631} 1055}
632``` 1056```
633 1057
634### Metric Definitions 1058### On Startup / Reconnect (Unified)
635
636| Metric | Type | Labels | Description |
637| ------------------------------------- | ------- | ------------- | ------------------------------------------------------ |
638| `ngit_sync_relay_connected` | Gauge | relay | 1 if connected, 0 if not |
639| `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes |
640| `ngit_sync_relay_status` | Gauge | relay, status | 1 for current status, 0 otherwise |
641| `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count |
642| `ngit_sync_events_total` | Counter | source | Events received by source type |
643| `ngit_sync_gap_events_total` | Counter | relay | Events found during catchup that should have been live |
644| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered from announcements |
645| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count |
646| `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead |
647 1059
648**Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time. The `ngit_sync_gap_events_total` metric tracks this per relay. 1060Both initial startup and reconnection use the same `connect_and_subscribe` method:
649
650### Observability Integration
651 1061
652```rust 1062```rust
653impl SyncManager { 1063impl SelfSubscriber {
654 fn record_event_received(&self, event: &Event, source: EventSource) { 1064 async fn run(mut self) {
655 match source { 1065 loop {
656 EventSource::DirectSubmission => { 1066 // Connect or reconnect
657 self.metrics.events_total.with_label_values(&["direct"]).inc(); 1067 if let Err(e) = self.connect_and_subscribe().await {
1068 tracing::warn!("Connection failed: {}, will retry", e);
1069 tokio::time::sleep(Duration::from_secs(5)).await;
1070 continue;
658 } 1071 }
659 EventSource::LiveSync(relay) => {
660 self.metrics.events_total.with_label_values(&["live_sync"]).inc();
661 }
662 EventSource::Catchup(relay) => {
663 // This is a sync gap - we should have gotten it via live sync
664 self.metrics.events_total.with_label_values(&["catchup"]).inc();
665 self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc();
666 tracing::warn!(
667 relay = %relay,
668 event_id = %event.id.to_hex(),
669 "Sync gap detected: event found during catchup"
670 );
671 }
672 EventSource::DailyCatchup(relay) => {
673 // Sustained sync gap - missed by both live sync and initial catchup
674 self.metrics.events_total.with_label_values(&["daily_catchup"]).inc();
675 self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc();
676 tracing::error!(
677 relay = %relay,
678 event_id = %event.id.to_hex(),
679 "Sustained sync gap: event found during daily catchup"
680 );
681 }
682 }
683 }
684 1072
685 fn record_connection_attempt(&self, relay: &RelayUrl, success: bool) { 1073 // Run event loop until disconnection
686 let result = if success { "success" } else { "failure" }; 1074 self.event_loop().await;
687 self.metrics.connection_attempts
688 .with_label_values(&[relay.as_str(), result])
689 .inc();
690 }
691 1075
692 fn update_relay_status(&self, relay: &RelayUrl, status: &RelayStatus) { 1076 // Loop will retry connection
693 // Reset all status labels for this relay
694 for s in ["healthy", "backoff", "dead"] {
695 self.metrics.relay_status
696 .with_label_values(&[relay.as_str(), s])
697 .set(0);
698 } 1077 }
699 // Set current status
700 let status_label = match status {
701 RelayStatus::Healthy => "healthy",
702 RelayStatus::Backoff { .. } => "backoff",
703 RelayStatus::Dead => "dead",
704 };
705 self.metrics.relay_status
706 .with_label_values(&[relay.as_str(), status_label])
707 .set(1);
708 } 1078 }
709}
710```
711
712### Example Grafana Queries
713 1079
714```promql 1080 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
715# Relay health overview - count by status 1081 let client = Client::new(Keys::generate());
716sum by (status) (ngit_sync_relay_status == 1) 1082 client.add_relay(&self.own_relay_url).await?;
717 1083 client.connect().await;
718# Connection success rate over last hour 1084
719sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) 1085 // Build filter - add since only on reconnect
720/ sum(rate(ngit_sync_connection_attempts_total[1h])) 1086 let filter = Filter::new().kinds([
721 1087 Kind::Custom(30617), // Repository announcements
722# Sync gap detection - events that should have been live synced 1088 Kind::GitPatch, // 1617
723sum(rate(ngit_sync_gap_events_total[1h])) by (relay) 1089 Kind::Custom(1618), // PRs
724 1090 Kind::Custom(1619), // PR updates
725# Live sync effectiveness (lower is better - fewer gaps) 1091 Kind::GitIssue, // 1621
726sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) 1092 ]);
727/ sum(rate(ngit_sync_events_total[1h])) 1093
1094 let filter = if let Some(ts) = self.last_connected {
1095 // Reconnection: use since filter
1096 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
1097 filter.since(since)
1098 } else {
1099 // Initial connect: no since filter - get full history
1100 filter
1101 };
728 1102
729# Relays with high failure counts (potential issues) 1103 // Update last_connected AFTER computing since
730topk(10, ngit_sync_relay_failures) 1104 self.last_connected = Some(Timestamp::now());
731 1105
732# Alert: relay stuck in dead state 1106 client.subscribe(filter, None).await?;
733ngit_sync_relay_status{status="dead"} == 1 1107 self.client = Some(client);
1108 Ok(())
1109 }
1110}
734``` 1111```
735 1112
736### Log Levels for Sync Events 1113### Event Loop with Batching
737 1114
738| Event | Level | Context | 1115```rust
739| ----------------------- | ----- | ----------------------------- | 1116impl SelfSubscriber {
740| Event via live sync | DEBUG | Normal operation | 1117 async fn event_loop(&mut self) {
741| Event via catchup | WARN | Sync gap detected | 1118 let client = self.client.as_ref().unwrap();
742| Event via daily catchup | ERROR | Sustained gap | 1119 let mut pending_events: Vec<Event> = Vec::new();
743| Connection established | INFO | Relay URL | 1120 let mut batch_timer: Option<Instant> = None;
744| Connection failed | WARN | Relay URL, attempt #, backoff | 1121 let batch_window = Duration::from_secs(5);
745| Relay marked dead | ERROR | Relay URL, failure duration | 1122
746| Peer missing events | WARN | Relay URL, repo, count | 1123 loop {
1124 let timeout = batch_timer
1125 .map(|t| batch_window.saturating_sub(t.elapsed()))
1126 .unwrap_or(Duration::from_secs(60));
1127
1128 tokio::select! {
1129 notification = client.notifications().recv() => {
1130 match notification {
1131 Ok(RelayPoolNotification::Event { event, .. }) => {
1132 pending_events.push(*event);
1133
1134 // Start timer on first event - does NOT reset
1135 if batch_timer.is_none() {
1136 batch_timer = Some(Instant::now());
1137 }
1138 }
1139 Ok(RelayPoolNotification::Shutdown) => {
1140 // Connection lost
1141 break;
1142 }
1143 _ => {}
1144 }
1145 }
1146 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
1147 // Batch window elapsed
1148 self.process_batch(pending_events.drain(..).collect()).await;
1149 batch_timer = None;
1150 }
1151 }
1152 }
1153 }
747 1154
748## Configuration 1155 async fn process_batch(&self, events: Vec<Event>) {
1156 // 1. Update RepoSyncIndex
1157 for event in events {
1158 match event.kind.as_u16() {
1159 30617 => self.handle_announcement(&event).await,
1160 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
1161 _ => {}
1162 }
1163 }
749 1164
750```rust 1165 // 2. Derive targets and compute actions
751pub struct SyncConfig { 1166 let repo_index = self.repo_sync_index.read().await;
752 /// Warm-up delay before starting initial catchup 1167 let targets = derive_relay_targets(&repo_index);
753 pub startup_delay: Duration, // Default: 30s
754 1168
755 /// Delay between filter batches during catchup 1169 let pending = self.pending_sync_index.read().await;
756 pub batch_delay: Duration, // Default: 60s 1170 let confirmed = self.relay_sync_index.read().await;
757 1171
758 /// Delay after reconnect before catchup 1172 let actions = compute_actions(&targets, &pending, &confirmed);
759 pub reconnect_delay: Duration, // Default: 10s
760 1173
761 /// Maximum events in last N days for reconnect catchup 1174 drop(repo_index);
762 pub reconnect_lookback_days: u32, // Default: 3 1175 drop(pending);
1176 drop(confirmed);
763 1177
764 /// Maximum tagged event IDs per filter 1178 // 3. Send actions to SyncManager
765 pub max_tags_per_filter: usize, // Default: 100 1179 for action in actions {
1180 let _ = self.action_tx.send(action).await;
1181 }
1182 }
766 1183
767 /// Consolidate subscriptions when count exceeds 1184 async fn handle_announcement(&self, event: &Event) {
768 pub max_subscriptions: usize, // Default: 150 1185 // Extract repo_ref from event - 30617:pubkey:identifier
1186 let d_tag = event.tags.iter()
1187 .find_map(|tag| {
1188 if tag.kind() == TagKind::D {
1189 tag.content().map(|s| s.to_string())
1190 } else {
1191 None
1192 }
1193 })
1194 .unwrap_or_default();
1195
1196 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1197
1198 // Extract relay URLs from 'r' tags
1199 let relays: HashSet<String> = event.tags.iter()
1200 .filter_map(|tag| {
1201 if tag.kind() == TagKind::Relay {
1202 tag.content().map(|s| s.to_string())
1203 } else {
1204 None
1205 }
1206 })
1207 .collect();
1208
1209 // Update RepoSyncIndex
1210 let mut index = self.repo_sync_index.write().await;
1211 let needs = index.entry(repo_ref).or_default();
1212 needs.relays = relays;
1213 }
769 1214
770 /// Backoff configuration 1215 async fn handle_root_event(&self, event: &Event) {
771 pub max_backoff: Duration, // Default: 1h 1216 // Extract repo_ref from 'a' tag
772 pub dead_threshold: Duration, // Default: 24h 1217 let repo_ref = event.tags.iter()
773 pub dead_retry_interval: Duration, // Default: 24h 1218 .find_map(|tag| {
1219 if tag.kind() == TagKind::A {
1220 tag.content().map(|s| s.to_string())
1221 } else {
1222 None
1223 }
1224 });
1225
1226 if let Some(repo_ref) = repo_ref {
1227 let mut index = self.repo_sync_index.write().await;
1228 let needs = index.entry(repo_ref).or_default();
1229 needs.root_events.insert(event.id);
1230 }
1231 }
774} 1232}
775``` 1233```
776 1234
777## Summary
778
779| Component | Responsibility |
780| ---------------------- | -------------------------------------------------------------- |
781| **SyncManager** | Orchestrates connections, triggers catchup, processes events |
782| **FilterService** | Builds unified filters from database state |
783| **RelayHealthTracker** | Manages backoff, dead relay detection (in-memory + Prometheus) |
784| **ConnectionState** | Per-relay WebSocket + subscription management |
785| **SyncMetrics** | Prometheus metrics for operator visibility |
786
787### Key Design Decisions
788
7891. **Unified filters** for live sync and negentropy - same criteria, different delivery mechanism
7902. **Exclude ourselves** from relay list to prevent loops
7913. **One connection per relay** with combined filters for efficiency
7924. **In-memory health state** with Prometheus metrics for visibility (no database persistence needed for <100 relays)
7935. **Graceful degradation on restart** - conservative initial backoff with jitter avoids thundering herd
7946. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up
7957. **Client-side filtering** for 30617/30618, server-side for Layer 2/3
7968. **Dynamic subscription addition** with periodic consolidation
7979. **Custom acceptance policy** excluding rate limiting defaults
79810. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps, tracked in Prometheus
799
800--- 1235---
801 1236
802## Implementation Notes (Phase 6) 1237## Implementation Notes
803
804This section documents the final implementation as of Phase 6 (Observability & Production Readiness).
805 1238
806### What Was Actually Built 1239This section documents the actual implementation details as of December 2024 (Phases 1-10 complete).
807 1240
808The implementation closely follows the design document with the following completed components: 1241### Architectural Decisions During Implementation
809 1242
810#### Phase 1: Basic Sync (commit b167f1b) 1243**Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc<Mutex<SyncManager>>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup:
811- [`SyncManager`](../../src/sync/manager.rs) - Main coordinator for proactive sync
812- Bootstrap relay sync via `NGIT_SYNC_BOOTSTRAP_RELAY_URL` configuration
813- Dynamic relay discovery from repository announcements that list our service
814- Event validation through existing [`Nip34WritePolicy`](../../src/nostr/builder.rs)
815 1244
816#### Phase 2: Three-Layer Filters (commit bf558b0) 1245```rust
817- [`FilterService`](../../src/sync/filter.rs) - Builds three-layer filter strategy 1246// 7. Wrap self in Arc<Mutex> for sharing with timer task
818- Layer 1: All kind 30617+30618 (announcements) 1247let sync_manager = Arc::new(Mutex::new(self));
819- Layer 2: A/a tag filters for repository events 1248```
820- Layer 3: E/e tag filters for related events (PRs, Issues)
821- Multi-relay discovery from stored announcements
822
823#### Phase 3: Health Tracking (commit f639ecf)
824- [`RelayHealthTracker`](../../src/sync/health.rs) - DashMap-based health tracking
825- Three states: Healthy → Degraded → Dead
826- Exponential backoff: 5s → 10s → 20s → ... → max (default 1h)
827- Dead relay detection after 24h continuous failures
828- Startup jitter (0-10s) to prevent thundering herd
829 1249
830#### Phase 4: Dynamic Subscriptions (commit a19ff57) 1250This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber.
831- [`SubscriptionManager`](../../src/sync/subscription.rs) - Per-connection subscription tracking
832- Dynamic Layer 2 subscriptions when new announcements arrive
833- Dynamic Layer 3 subscriptions when new PRs/Issues arrive
834- Filter consolidation at threshold (150 filters)
835 1251
836#### Phase 5: Catchup & Gap Detection (commit 950c2e4) 1252**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses:
837- [`NegentropyService`](../../src/sync/negentropy.rs) - Gap-filling catchup operations
838- Startup catchup (configurable delay)
839- Reconnection catchup (limited lookback)
840- Daily catchup (not yet implemented - placeholder)
841 1253
842#### Phase 6: Observability (this phase) 1254- `DashMap` for thread-safe concurrent access without external locking
843- [`SyncMetrics`](../../src/sync/metrics.rs) - Full Prometheus integration 1255- Three states: `Healthy`, `Degraded`, `Dead`
844- Grafana dashboard panels for sync monitoring 1256- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff
845- Documentation updates 1257- Dead threshold: 24 hours of continuous failures
1258- Dead relay retry: Once per 24 hours
846 1259
847### Differences from Original Design 1260### Implementation Constants
848 1261
8491. **Negentropy (NIP-77)**: Simplified gap-filling was used instead of full NIP-77 negentropy reconciliation, as nostr-sdk 0.44 lacks built-in negentropy support. The current implementation uses timestamp-based catchup queries. 1262| Constant | Value | Purpose |
1263| --------------------------------- | ---------- | ------------------------------------------------ |
1264| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation |
1265| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation |
1266| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync |
1267| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect |
1268| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead |
1269| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff |
850 1270
8512. **Filter Consolidation Threshold**: Set at 150 filters (as designed) based on typical relay filter limits. 1271### Daily Timer Randomization
852 1272
8533. **Health Tracking**: Implemented exactly as designed - in-memory only (not persisted to database), which is acceptable for production as health state rebuilds quickly on restart. 1273The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running:
854 1274
8554. **Metric Label Strategy**: Used simpler numeric encoding for health status (1=healthy, 2=degraded, 3=dead) instead of multiple label values per relay, reducing cardinality. 1275```rust
1276let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0;
1277```
856 1278
8575. **Event Source Tracking**: Implemented four source types (`live`, `startup`, `reconnect`, `daily`) instead of the original (`direct`, `live_sync`, `catchup`, `daily_catchup`). 1279### Bootstrap Relay Protection
858 1280
859### Three-Layer Filter Strategy (As Implemented) 1281Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out:
860 1282
1283```rust
1284.filter(|(_, state)| {
1285 !state.is_bootstrap &&
1286 state.repos.is_empty() &&
1287 state.root_events.is_empty()
1288})
861``` 1289```
862Layer 1: Discovery Layer
863├── Query: kinds [30617, 30618] (announcements)
864├── Applied: At startup and during sync
865└── Purpose: Discover all repositories across network
866
867Layer 2: Repository Events
868├── Query: Events with A/a tags pointing to tracked repos
869├── Format: A tag = "30617:<pubkey>:<identifier>"
870├── Triggered: When new announcement is accepted
871└── Purpose: Get PRs, issues, patches for repositories
872
873Layer 3: Related Events
874├── Query: Events with E/e tags pointing to tracked PRs/Issues
875├── Triggered: When new PR/Issue is accepted
876└── Purpose: Get comments, reviews, status updates
877```
878
879### Prometheus Metrics (As Implemented)
880 1290
881| Metric | Type | Labels | Description | 1291### Graceful Shutdown
882|--------|------|--------|-------------|
883| `ngit_sync_relay_connected` | Gauge | relay | Connection status (1/0) |
884| `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome |
885| `ngit_sync_relay_status` | Gauge | relay | Health state (1/2/3) |
886| `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures |
887| `ngit_sync_events_total` | Counter | source | Events by source type |
888| `ngit_sync_gap_events_total` | Counter | relay | Gap events filled |
889| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered |
890| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected |
891| `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count |
892 1292
893### Configuration Options (As Implemented) 1293Shutdown uses a tokio broadcast channel for coordinated termination:
894 1294
895All configuration via environment variables or CLI flags: 1295```rust
1296let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
1297```
896 1298
897| Option | Type | Default | Description | 1299Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop.
898|--------|------|---------|-------------|
899| `NGIT_SYNC_BOOTSTRAP_RELAY_URL` | String | None | Bootstrap relay URL for initial sync |
900| `NGIT_SYNC_MAX_BACKOFF_SECS` | u64 | 3600 | Max backoff delay (seconds) |
901| `NGIT_SYNC_STARTUP_DELAY_SECS` | u64 | 30 | Catchup delay after startup |
902| `NGIT_SYNC_RECONNECT_DELAY_SECS` | u64 | 10 | Catchup delay after reconnect |
903| `NGIT_SYNC_RECONNECT_LOOKBACK_DAYS` | u64 | 3 | Days to look back on reconnect |
904 1300
905**Note:** Additional relays are automatically discovered from repository announcements (kind 30617) that list our service domain. The bootstrap relay provides an initial sync source but is not required - sync will discover relays from stored announcements. 1301### Actual Module Structure
906 1302
907### Module Structure (As Implemented) 1303The implemented module structure differs from the original spec:
908 1304
909``` 1305```
910src/sync/ 1306src/sync/
911├── mod.rs # Module exports, constants 1307├── mod.rs # SyncManager, main loop, index types, metrics
912├── manager.rs # SyncManager - orchestrates sync 1308├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters
913├── connection.rs # SyncConnection - per-relay WebSocket 1309├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters
914├── filter.rs # FilterService - three-layer filters 1310├── health.rs # RelayHealthTracker, HealthState, exponential backoff
915├── health.rs # RelayHealthTracker - health states 1311├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling
916├── metrics.rs # SyncMetrics - Prometheus integration 1312└── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic
917├── negentropy.rs # NegentropyService - gap-filling
918└── subscription.rs # SubscriptionManager - dynamic subs
919``` 1313```
920 1314
921### Production Readiness Checklist 1315Key differences from spec:
1316
1317- No separate `state.rs` - types are defined in `mod.rs`
1318- No separate `actions.rs` - moved to `algorithms.rs`
1319- No separate `consolidation.rs` - consolidation logic in `mod.rs`
1320- No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs`
1321
1322### Deviations from Original v4 Spec
1323
13241. **RelayState lacks `connection` field**: The spec showed `connection: Option<RelayConnection>` in `RelayState`, but the implementation stores connections in a separate `HashMap<String, RelayConnection>` in `SyncManager`.
1325
13262. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct.
1327
13283. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches.
922 1329
923- [x] All metrics exposed at `/metrics` endpoint 13304. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API.
924- [x] Health state tracking with configurable backoff
925- [x] Dead relay detection and minimal retry
926- [x] Startup jitter to prevent thundering herd
927- [x] Grafana dashboard with sync panels
928- [x] Configuration documented
929- [x] Integration tests passing