upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync-v2.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v2.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v2.md785
1 files changed, 0 insertions, 785 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v2.md b/docs/explanation/grasp-02-proactive-sync-v2.md
deleted file mode 100644
index 311e93c..0000000
--- a/docs/explanation/grasp-02-proactive-sync-v2.md
+++ /dev/null
@@ -1,785 +0,0 @@
1# GRASP-02: Proactive Sync v2 - Simplified Design
2
3## Overview
4
5This document presents a simplified redesign of the proactive sync module. The key insight is that **all sync filters can be derived from two database queries**, with incremental updates via self-subscription.
6
7## Goals (Same as v1)
8
91. **Data Availability**: Ensure we have all relevant events for repositories we host
102. **Resilience**: Handle relay failures gracefully with backoff and health tracking
113. **Efficiency**: Minimize connections and bandwidth through filter consolidation
124. **Consistency**: Use unified filters for both live sync and catchup
13
14## Scale Targets & Upper Bounds
15
16This design targets the following scale:
17
18| Metric | Target | Notes |
19| ----------------------------- | -------- | ----------------------------------------- |
20| **Repositories** | 1,000 | Repos we host/track |
21| **Root events per repo** | 50 (avg) | PRs, Issues, Patches per repo |
22| **Total relays in ecosystem** | 100 | Unique relays across all repos |
23| **Relays per repo** | 5 (avg) | Relays listed in each repo's announcement |
24| **Total root events** | ~50,000 | 1,000 repos × 50 events |
25| **Sync connections** | ~50-100 | Based on relay overlap |
26
27**Memory Estimate (in-memory HashMaps):**
28
29- `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB
30- `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB
31- **Total in-memory state**: ~10 MB (well within acceptable limits)
32
33**Upper Bounds (redesign triggers):**
34
35- **10,000+ repos**: Consider database-backed state instead of in-memory HashMaps
36- **500+ sync relays**: Consider connection pooling or relay prioritization
37- **500+ root events per repo**: Consider per-repo pagination in Layer 3 filters
38- **Sustained >100 events/second**: Consider write batching to database
39
40Beyond these limits, the in-memory HashMap model may need to evolve to a database-backed approach with lazy loading.
41
42## Core Data Structures
43
44The entire sync filter state is captured in two HashMaps, initialized from database queries at startup:
45
46```rust
47/// Repository root events we're following
48/// Key: repo addressable reference (e.g., "30617:pubkey:identifier")
49/// Value: Set of event IDs (kinds 1617, 1618, 1619, 1621) that tag this repo
50///
51/// Note: May include a few extra repo refs that aren't in sync_relays.
52/// This is acceptable - we won't query other relays for them.
53type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>;
54
55/// Relays we sync from, including their repos and events
56/// Key: relay URL
57/// Value: Map of repo_ref -> event IDs for repos that list both this relay AND our service
58///
59/// Note: Bootstrap relay (if configured) is always present and excluded from removal logic.
60type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>;
61```
62
63## Architecture Overview
64
65```mermaid
66flowchart TB
67 subgraph Startup
68 DB[(Database)]
69 Q1[Query kinds 1617/1618/1619/1621]
70 Q2[Query kind 30617]
71 DB --> Q1 --> FRRE[following_repo_root_events]
72 DB --> Q2 --> SR[sync_relays]
73 BR[Bootstrap Relay] --> SR
74 end
75
76 subgraph SyncManager
77 SS[Self-Subscriber]
78 FRRE --> SS
79 SR --> SS
80 end
81
82 subgraph SyncRelays
83 R1[Relay Connection 1]
84 R2[Relay Connection 2]
85 RN[Relay Connection N]
86 end
87
88 SS -->|spawn/update| R1
89 SS -->|spawn/update| R2
90 SS -->|spawn/update| RN
91
92 R1 -->|events| AP[Acceptance Policy]
93 R2 -->|events| AP
94 RN -->|events| AP
95 AP -->|store| DB
96```
97
98## Module Structure
99
100The sync module is organized following the pattern used by `src/http/mod.rs` and `src/metrics/mod.rs` where the primary struct lives in `mod.rs`:
101
102```
103src/sync/
104├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays)
105├── self_subscriber.rs # SelfSubscriber struct and batching logic
106├── relay_connection.rs # Per-relay WebSocket connection management
107├── health.rs # RelayHealthTracker for backoff and dead relay detection
108└── metrics.rs # SyncMetrics for Prometheus integration
109```
110
111**Rationale:** The state type aliases (`FollowingRepoRootEvents`, `SyncRelays`) are simple `Arc<RwLock<HashMap<...>>>` wrappers owned by SyncManager. Rather than creating a separate `state.rs` for two type aliases, they are colocated with SyncManager in `mod.rs` to reduce file count while maintaining clarity.
112
113## Design Decision: No Jitter
114
115We considered adding jitter to prevent thundering herd scenarios when:
116
117- Multiple relay connections initialize simultaneously
118- Batched updates affect multiple relays
119- Filter consolidation triggers across connections
120
121**Decision: No jitter implemented.**
122
123**Rationale:**
124
125- Our GRASP server should handle the load of simultaneous operations
126- Jitter would lead to more orphan filters (filters added one at a time rather than atomically)
127- Jitter creates inefficiency - partial subscriptions miss events during the stagger window
128- The batching window (5s) already provides natural smoothing without the downsides
129
130## Health Tracking & Backoff
131
132```rust
133/// Health state machine for relay connections
134enum HealthState {
135 Healthy, // Connected and working
136 Backoff(u32), // Failed, attempt count for exponential backoff
137 Dead, // 24h+ continuous failures
138}
139
140impl RelayHealthTracker {
141 /// Backoff durations:
142 /// - Attempt 1: 5s
143 /// - Attempt 2: 10s
144 /// - Attempt 3: 20s
145 /// - Attempt 4: 40s
146 /// - ... exponential up to 1h max
147 /// - After reaching 1h, continue hourly until 24h total failure time
148 /// - After 24h: marked Dead, retry once per 24h
149 fn get_backoff(&self, relay_url: &str) -> Duration;
150}
151```
152
153| State | Retry Behavior |
154| ----------- | -------------------------------------------- |
155| **Healthy** | Immediate reconnect on disconnect |
156| **Backoff** | 5s → 10s → 20s → ... → 1h max (exponential) |
157| **Hourly** | Once per hour after hitting 1h cap |
158| **Dead** | After 24h total failures, retry once per 24h |
159
160## Startup Initialization
161
162At startup, two database queries initialize the sync state:
163
164```rust
165impl SyncManager {
166 async fn initialize_from_database(&mut self) -> Result<()> {
167 // Initialize bootstrap relay if configured (never removed)
168 if let Some(bootstrap_url) = &self.config.bootstrap_relay_url {
169 self.sync_relays.write().await.insert(
170 bootstrap_url.clone(),
171 HashMap::new() // Repos potentially populated below but may stay empty (Layer 1 only)
172 );
173 }
174
175 // Query 1: Build following_repo_root_events
176 // Find all 1617/1618/1619/1621 events and extract their repo references
177 let root_events = self.database
178 .query(Filter::new().kinds([
179 Kind::GitPatch, // 1617
180 Kind::Custom(1618), // PRs
181 Kind::Custom(1619), // PR updates
182 Kind::Custom(1621), // Issues
183 ]))
184 .await?;
185
186 for event in root_events {
187 // An event may have multiple 'a' tags pointing to different repos
188 let repo_refs = self.extract_all_repo_refs(&event);
189 for repo_ref in repo_refs {
190 self.following_repo_root_events
191 .write().await
192 .entry(repo_ref)
193 .or_default()
194 .insert(event.id);
195 }
196 }
197
198 // Query 2: Build sync_relays from kind 30617 announcements
199 let announcements = self.database
200 .query(Filter::new().kind(Kind::Custom(30617)))
201 .await?;
202
203 for event in announcements {
204 let repo_ref = self.build_repo_ref(&event);
205 let relay_urls = self.extract_relay_urls(&event);
206
207 // Only track repos that list BOTH a remote relay AND our service
208 if self.lists_our_service(&event) {
209 for relay_url in relay_urls {
210 if !self.is_own_relay(&relay_url) {
211 // Get events for this repo from following_repo_root_events
212 let events = self.following_repo_root_events
213 .read().await
214 .get(&repo_ref)
215 .cloned()
216 .unwrap_or_default();
217
218 self.sync_relays
219 .write().await
220 .entry(relay_url)
221 .or_default()
222 .insert(repo_ref.clone(), events);
223 }
224 }
225 }
226 }
227
228 Ok(())
229 }
230
231 /// Extract ALL repo refs from an event (it may tag multiple repos)
232 fn extract_all_repo_refs(&self, event: &Event) -> Vec<String> {
233 event.tags.iter()
234 .filter_map(|tag| {
235 let tag_vec = tag.clone().to_vec();
236 if tag_vec.len() >= 2 && tag_vec[0] == "a" {
237 // Validate it's a 30617 reference
238 if tag_vec[1].starts_with("30617:") {
239 Some(tag_vec[1].clone())
240 } else {
241 None
242 }
243 } else {
244 None
245 }
246 })
247 .collect()
248 }
249}
250```
251
252## Self-Subscriber: Event-Driven Updates
253
254A single self-subscriber watches for new events from **our own relay** and updates the HashMaps.
255
256**Important:** The self-subscriber does NOT subscribe to kind 30618 as this would never lead to refreshing the sync filters. Those events are synced from remote relays only (via Layer 1 filter on sync relay connections).
257
258### Batching Strategy
259
260The batch timer **starts only when the first event arrives**, not on a fixed interval. This prevents the scenario where an event arriving at second 4 of a 5-second interval only gets 1 second before the batch fires.
261
262**Important:** Once the batch timer starts, it does NOT reset when additional events arrive. The batch will fire exactly 5 seconds after the first event, regardless of how many subsequent events are queued. This ensures predictable latency and prevents indefinite batching during high-activity periods.
263
264```rust
265impl SelfSubscriber {
266 async fn run(&self) {
267 // Subscribe to our own relay for relevant kinds
268 // Note: 30618 NOT included - synced from remote relays only
269 let filter = Filter::new()
270 .kinds([
271 Kind::Custom(30617), // Repository announcements
272 Kind::GitPatch, // 1617 Patches
273 Kind::Custom(1618), // PRs
274 Kind::Custom(1619), // PR updates
275 Kind::Custom(1621), // Issues
276 ]);
277
278 let mut pending_updates: Vec<PendingUpdate> = Vec::new();
279 let mut batch_deadline: Option<Instant> = None;
280
281 loop {
282 let timeout = batch_deadline
283 .map(|d| d.saturating_duration_since(Instant::now()))
284 .unwrap_or(Duration::MAX);
285
286 tokio::select! {
287 Some(event) = self.event_receiver.recv() => {
288 pending_updates.push(self.classify_update(&event));
289
290 // Start batch timer on first event
291 if batch_deadline.is_none() {
292 batch_deadline = Some(Instant::now() + Duration::from_secs(5));
293 }
294 }
295 _ = tokio::time::sleep(timeout), if batch_deadline.is_some() => {
296 // Batch window elapsed - apply all pending updates
297 self.apply_batched_updates(pending_updates.drain(..).collect()).await;
298 batch_deadline = None;
299 }
300 }
301 }
302 }
303
304 fn classify_update(&self, event: &Event) -> PendingUpdate {
305 match event.kind.as_u16() {
306 30617 => PendingUpdate::NewAnnouncement(event.clone()),
307 1617 | 1618 | 1619 | 1621 => PendingUpdate::NewRootEvent(event.clone()),
308 _ => PendingUpdate::None,
309 }
310 }
311}
312```
313
314### Applying Batched Updates
315
316When the batch window closes, we process all pending updates together:
317
318```rust
319/// Batched updates grouped by relay
320struct RelayUpdateBatch {
321 /// New repo refs to subscribe to (Layer 2)
322 new_repo_refs: HashSet<String>,
323 /// New event IDs to subscribe to (Layer 3)
324 new_event_ids: HashSet<EventId>,
325 /// Whether this is a newly discovered relay
326 is_new_relay: bool,
327}
328
329impl SelfSubscriber {
330 async fn apply_batched_updates(&mut self, updates: Vec<PendingUpdate>) {
331 // Step 1: Process all updates and update HashMaps
332 // Build batched actions per relay
333 let mut relay_batches: HashMap<String, RelayUpdateBatch> = HashMap::new();
334
335 for update in updates {
336 match update {
337 PendingUpdate::NewAnnouncement(event) => {
338 self.process_announcement(&event, &mut relay_batches).await;
339 }
340 PendingUpdate::NewRootEvent(event) => {
341 self.process_root_event(&event, &mut relay_batches).await;
342 }
343 PendingUpdate::None => {}
344 }
345 }
346
347 // Step 2: Apply batched updates to each relay
348 for (relay_url, batch) in relay_batches {
349 self.apply_batch_to_relay(&relay_url, batch).await;
350 }
351
352 // Step 3: Check for relay removal (repos removed from announcements)
353 self.check_relay_removal().await;
354 }
355
356 async fn apply_batch_to_relay(&mut self, relay_url: &str, batch: RelayUpdateBatch) {
357 if batch.is_new_relay {
358 // Spawn new relay connection with full filters
359 self.spawn_sync_relay(relay_url.to_string()).await;
360 return;
361 }
362
363 // Build incremental filters for new content (NO since - get historical)
364 let incremental_filters = self.build_incremental_filters(&batch);
365
366 if incremental_filters.is_empty() {
367 return;
368 }
369
370 // Check if we need to consolidate
371 let current_filter_count = self.get_filter_count_for_relay(relay_url).await;
372 let new_filter_count = current_filter_count + incremental_filters.len();
373
374 // Note: 70 is a conservative threshold that may need tuning based on
375 // production observations. It was chosen to trigger consolidation earlier
376 // than v1's 150, but optimal value depends on relay behavior.
377 if new_filter_count > 70 {
378 // Consolidate: add incremental filters first (no since), wait for EOSE,
379 // then close all and resubscribe with consolidated filters (with since)
380 self.consolidate_relay_subscription(relay_url, incremental_filters).await;
381 } else {
382 // Just add incremental filters (no since - to get historical events)
383 self.send_filters_to_relay(relay_url, incremental_filters).await;
384 }
385 }
386
387 fn build_incremental_filters(&self, batch: &RelayUpdateBatch) -> Vec<Filter> {
388 let mut filters = Vec::new();
389
390 // Layer 2: New repo refs (for ALL kinds that tag repos with 'a' tags)
391 if !batch.new_repo_refs.is_empty() {
392 let refs: Vec<String> = batch.new_repo_refs.iter().cloned().collect();
393 for chunk in refs.chunks(100) {
394 // All kinds with lowercase 'a' tag
395 filters.push(
396 Filter::new()
397 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec())
398 );
399 // All kinds with uppercase 'A' tag
400 filters.push(
401 Filter::new()
402 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec())
403 );
404 // All kinds with 'q' tag (quote)
405 filters.push(
406 Filter::new()
407 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
408 );
409 }
410 }
411
412 // Layer 3: New event IDs
413 if !batch.new_event_ids.is_empty() {
414 let ids: Vec<String> = batch.new_event_ids.iter()
415 .map(|id| id.to_hex())
416 .collect();
417 for chunk in ids.chunks(100) {
418 filters.push(
419 Filter::new()
420 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec())
421 );
422 filters.push(
423 Filter::new()
424 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec())
425 );
426 filters.push(
427 Filter::new()
428 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
429 );
430 }
431 }
432
433 filters
434 }
435}
436```
437
438### Consolidation Strategy
439
440When consolidating, we need a two-phase approach:
441
4421. First, subscribe with incremental filters (no `since`) to get any historical events we missed
4432. After receiving EOSE, close all subscriptions and resubscribe with consolidated filters (with `since`)
444
445```rust
446async fn consolidate_relay_subscription(
447 &mut self,
448 relay_url: &str,
449 incremental_filters: Vec<Filter>,
450) {
451 // Phase 1: Add incremental filters WITHOUT since to catch up on new content
452 // These filters are for new repo_refs / event_ids we just discovered
453 let phase1_sub_id = self.send_filters_to_relay_and_wait_eose(
454 relay_url,
455 incremental_filters
456 ).await;
457
458 // Phase 2: After EOSE, consolidate everything
459 // Close ALL existing subscriptions for this relay
460 self.close_all_subscriptions(relay_url).await;
461
462 // Build fresh consolidated filters using current HashMap state
463 let consolidated_filters = self.build_three_layer_filters_for_relay(relay_url).await;
464
465 // Resubscribe with since = now - 15 minutes
466 let since = Timestamp::now() - 900;
467 let filters_with_since: Vec<Filter> = consolidated_filters
468 .into_iter()
469 .map(|f| f.since(since))
470 .collect();
471
472 self.send_filters_to_relay(relay_url, filters_with_since).await;
473}
474```
475
476## Daily Full Catchup
477
478To capture events that may have taken longer than 15 minutes to propagate through the nostr network, we perform a daily full catchup:
479
480```rust
481impl SyncManager {
482 /// Runs approximately every 24 hours per relay connection
483 async fn daily_catchup(&mut self, relay_url: &str) {
484 // Close all current subscriptions for this relay
485 self.close_all_subscriptions(relay_url).await;
486
487 // Rebuild fresh filters from current HashMap state
488 let filters = self.build_three_layer_filters_for_relay(relay_url).await;
489
490 // Subscribe WITHOUT since filter to get full historical sync
491 for filter in filters {
492 self.subscribe_to_relay(relay_url, filter).await;
493 }
494
495 // After EOSE, switch back to live mode with since filter
496 self.wait_for_eose(relay_url).await;
497
498 // Re-add since filter for ongoing live sync
499 let since = Timestamp::now() - 900; // 15 minutes ago
500 self.resubscribe_with_since(relay_url, since).await;
501 }
502}
503```
504
505**Rationale:** The 15-minute reconnection window is standard for nostr event propagation, but some events may take longer. Rather than increasing the window (which would cause more duplicate processing), we do a daily full catchup to ensure nothing is missed. This adds minimal complexity while providing comprehensive coverage.
506
507## Sync Relay Connections
508
509Each sync relay connection uses the three-layer filter strategy:
510
511```rust
512impl SyncRelayConnection {
513 async fn start(&mut self) {
514 loop {
515 match self.connect_and_subscribe().await {
516 Ok(()) => {
517 // Record successful connection
518 self.last_successful_connection = Instant::now();
519 self.health_tracker.record_success(&self.url);
520
521 // Run event loop until disconnect
522 self.run_event_loop().await;
523 }
524 Err(e) => {
525 self.health_tracker.record_failure(&self.url);
526 }
527 }
528
529 // Reconnect with backoff and since filter
530 let backoff = self.health_tracker.get_backoff(&self.url);
531 tokio::time::sleep(backoff).await;
532
533 // On reconnect, use since = last_successful - 15 minutes
534 self.reconnect_since = Some(
535 Timestamp::from(self.last_successful_connection - Duration::from_secs(900))
536 );
537 }
538 }
539
540 async fn connect_and_subscribe(&mut self) -> Result<()> {
541 self.client.connect().await?;
542
543 let filters = self.build_three_layer_filters().await;
544
545 // Apply since filter if reconnecting
546 let filters = if let Some(since) = self.reconnect_since {
547 filters.into_iter().map(|f| f.since(since)).collect()
548 } else {
549 filters
550 };
551
552 for filter in filters {
553 self.client.subscribe(filter, None).await?;
554 }
555
556 Ok(())
557 }
558}
559```
560
561## Three-Layer Filter Strategy
562
563```rust
564impl SyncRelayConnection {
565 async fn build_three_layer_filters(&self) -> Vec<Filter> {
566 let mut filters = Vec::new();
567
568 // Get repos for this relay
569 let repos = self.sync_relays.read().await
570 .get(&self.url)
571 .cloned()
572 .unwrap_or_default();
573
574 // Layer 1: Announcements (kinds 30617 + 30618)
575 // Note: 30618 is ONLY synced from remote relays, not self-subscribed
576 // Always included even if relay has no repos (bootstrap relay case)
577 filters.push(
578 Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)])
579 );
580
581 // Layer 2: Events tagging repos with 'a' tags (ALL kinds)
582 // Batched per 100 repo refs
583 let repo_refs: Vec<String> = repos.keys().cloned().collect();
584 for chunk in repo_refs.chunks(100) {
585 filters.push(
586 Filter::new()
587 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec())
588 );
589 filters.push(
590 Filter::new()
591 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec())
592 );
593 filters.push(
594 Filter::new()
595 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
596 );
597 }
598
599 // Layer 3: Events tagging root events (batch per 100 event IDs)
600 let all_event_ids: HashSet<EventId> = repos.values()
601 .flat_map(|ids| ids.iter().cloned())
602 .collect();
603
604 let event_id_strs: Vec<String> = all_event_ids
605 .iter()
606 .map(|id| id.to_hex())
607 .collect();
608
609 for chunk in event_id_strs.chunks(100) {
610 filters.push(
611 Filter::new()
612 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec())
613 );
614 filters.push(
615 Filter::new()
616 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec())
617 );
618 filters.push(
619 Filter::new()
620 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
621 );
622 }
623
624 filters
625 }
626}
627```
628
629## Relay Removal
630
631```rust
632async fn check_relay_removal(&mut self) {
633 let relays_to_check: Vec<String> = self.sync_relays.read().await
634 .keys()
635 .cloned()
636 .collect();
637
638 for relay_url in relays_to_check {
639 // Never remove bootstrap relay
640 if Some(relay_url.as_str()) == self.config.bootstrap_relay_url.as_deref() {
641 continue;
642 }
643
644 // Check if relay has any repos left
645 let should_remove = {
646 let sync_relays = self.sync_relays.read().await;
647 sync_relays.get(&relay_url)
648 .map(|repos| repos.is_empty())
649 .unwrap_or(true)
650 };
651
652 if should_remove {
653 // Remove from HashMap
654 self.sync_relays.write().await.remove(&relay_url);
655
656 // Close connection
657 self.close_relay_connection(&relay_url).await;
658 }
659 }
660}
661```
662
663## Prometheus Metrics (Same as v1)
664
665| Metric | Type | Labels | Description |
666| ------------------------------------- | ------- | ------------- | ----------------------- |
667| `ngit_sync_relay_connected` | Gauge | relay | Connection status 1/0 |
668| `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome |
669| `ngit_sync_relay_status` | Gauge | relay | Health state 1/2/3 |
670| `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures |
671| `ngit_sync_events_total` | Counter | source | Events by source type |
672| `ngit_sync_gap_events_total` | Counter | relay | Gap events filled |
673| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered |
674| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected |
675| `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count |
676
677## Module Structure (Simplified)
678
679```
680src/sync/
681├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays)
682├── self_subscriber.rs # SelfSubscriber + batching logic
683├── relay_connection.rs # Per-relay WebSocket + filters
684├── health.rs # RelayHealthTracker (reuse from v1)
685└── metrics.rs # SyncMetrics (reuse from v1)
686```
687
688> **Note:** SyncManager and state type aliases are colocated in `mod.rs` following the pattern of `src/http/mod.rs` (HttpService) and `src/metrics/mod.rs` (Metrics). See the earlier "Module Structure" section for rationale.
689
690## Comparison: v1 vs v2
691
692| Aspect | v1 (Current) | v2 (Simplified) |
693| ------------------- | ------------------------------------------------------------------ | --------------------------------------------- |
694| **State Model** | Spread across FilterService, SubscriptionManager, ConnectionState | Two HashMaps derived from DB |
695| **Relay Discovery** | Multiple paths: bootstrap, DB query, self-subscribe, remote events | Single path: DB init + self-subscribe |
696| **Filter Updates** | Dynamic per-event subscription adds | Batched updates (5s window, starts on event) |
697| **Consolidation** | Per-connection at 150 filters | Per-connection at 70 filters |
698| **Batching** | Per 100 tags | Per 100 tags |
699| **Reconnection** | Various backoff strategies | Unified: since = last_success - 15min |
700| **Jitter** | Startup jitter | None (see design decision) |
701| **30618 Handling** | Synced everywhere | Remote relays only, not self-subscribed |
702| **1621 (Issues)** | Not included | Included with 1617/1618/1619 |
703| **Layer 2 Scope** | Specific NIP-34 kinds | ALL kinds with 'a' tags |
704| **Health Backoff** | Variable | 5s → exp → 1h max → hourly → dead@24h → daily |
705
706## Key Design Decisions
707
7081. **Single Source of Truth**: Two HashMaps represent all sync state, initialized from database
7092. **Event-Driven Updates**: Self-subscriber updates HashMaps; relay connections read from them
7103. **Batched Filter Updates**: 5-second window that starts on first event (timer does NOT reset on subsequent events)
7114. **Uniform Reconnection**: Always use `since = last_successful - 15min`
7125. **No Jitter**: Trade-offs not worth it - orphan filters and inefficiency outweigh thundering herd concerns
7136. **Bootstrap Relay Protected**: Never removed from sync_relays - ensures at least one sync connection exists even when no repositories currently list our service (cold start / recovery scenario)
7147. **30618 Remote-Only**: Maintainer state synced from remote relays, not self-subscribed
7158. **70 Filter Consolidation Threshold**: Lower than v1's 150 for earlier consolidation (conservative value that may need tuning based on production observation)
7169. **100-Tag Batching**: Consistent batch size for Layer 2 and Layer 3 filters
71710. **Layer 2 All Kinds**: Subscribe to ALL events with 'a' tags, not just NIP-34 kinds
71811. **Two-Phase Consolidation**: Incremental filters WITHOUT since first, then consolidated WITH since
71912. **Multiple Repo Refs**: Handle events that tag multiple repos correctly
72013. **Daily Full Catchup**: Periodic sync restart without `since` filter (~24h) to catch slow-propagating events
72114. **Dual DB Queries at Startup**: Separate queries for root events and announcements. Could be combined into a single query, but some relays cache by kind which may make separate queries more efficient. Trade-off deferred for future optimization.
722
723---
724
725## Detailed Flow Diagram
726
727```mermaid
728sequenceDiagram
729 participant DB as Database
730 participant SM as SyncManager
731 participant SS as Self-Subscriber
732 participant RC as RelayConnection
733
734 Note over SM: Startup
735 SM->>SM: Add bootstrap relay to sync_relays
736 SM->>DB: Query kinds 1617/1618/1619/1621
737 DB-->>SM: Root events
738 SM->>SM: Build following_repo_root_events
739 SM->>SM: Handle multi-repo events
740
741 SM->>DB: Query kind 30617
742 DB-->>SM: Announcements
743 SM->>SM: Build sync_relays
744
745 SM->>RC: Spawn connections for each relay
746 RC->>RC: Build 3-layer filters from sync_relays
747 RC->>RC: Connect and subscribe
748
749 Note over SS: Event-Driven Updates
750 SS->>SS: Subscribe to 30617/1617/1618/1619/1621
751 SS->>SS: Receive event - start 5s batch timer
752 SS->>SS: Collect more events in batch window
753 SS->>SS: Batch window closes
754 SS->>SM: Apply batched updates to HashMaps
755
756 alt New Relay Discovered
757 SM->>RC: Spawn new connection
758 else New Content for Existing Relay
759 alt Under 70 filter limit
760 SM->>RC: Add incremental filter - no since
761 else Over 70 filter limit
762 SM->>RC: Add incremental filters - no since
763 RC-->>SM: EOSE received
764 SM->>RC: Close all subscriptions
765 SM->>RC: Resubscribe consolidated - with since
766 end
767 else Relay Has No More Repos
768 alt Is Bootstrap Relay
769 SM->>SM: Keep connection - Layer 1 only
770 else Not Bootstrap Relay
771 SM->>RC: Close connection
772 end
773 end
774
775 Note over RC: Connection Lifecycle
776 RC->>RC: Process incoming events
777 RC->>DB: Store via acceptance policy
778
779 RC->>RC: Connection drops
780 RC->>RC: Wait backoff - 5s to 1h exponential
781 RC->>RC: Reconnect with since = last_success - 15min
782
783 Note over RC: If failures continue 24h
784 RC->>RC: Mark dead - retry once per 24h
785```