upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-08 19:41:16 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-08 19:41:16 +0000
commit20388e4e76864195860dfab81a5b80725184e7c9 (patch)
tree9a429d6f6b2adf0d88b85165adb523d664456367 /docs/explanation
parent9f594fadf2a1d5bfda0ab027f2b3cf7a247900ec (diff)
redesign sync - document only
Diffstat (limited to 'docs/explanation')
-rw-r--r--docs/explanation/grasp-02-proactive-sync-v2.md704
1 files changed, 704 insertions, 0 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v2.md b/docs/explanation/grasp-02-proactive-sync-v2.md
new file mode 100644
index 0000000..faa9255
--- /dev/null
+++ b/docs/explanation/grasp-02-proactive-sync-v2.md
@@ -0,0 +1,704 @@
1# GRASP-02: Proactive Sync v2 - Simplified Design
2
3## Overview
4
5This document presents a simplified redesign of the proactive sync module. The key insight is that **all sync filters can be derived from two database queries**, with incremental updates via self-subscription.
6
7## Goals (Same as v1)
8
91. **Data Availability**: Ensure we have all relevant events for repositories we host
102. **Resilience**: Handle relay failures gracefully with backoff and health tracking
113. **Efficiency**: Minimize connections and bandwidth through filter consolidation
124. **Consistency**: Use unified filters for both live sync and catchup
13
14## Core Data Structures
15
16The entire sync filter state is captured in two HashMaps, initialized from database queries at startup:
17
18```rust
19/// Repository root events we're following
20/// Key: repo addressable reference (e.g., "30617:pubkey:identifier")
21/// Value: Set of event IDs (kinds 1617, 1618, 1619, 1621) that tag this repo
22///
23/// Note: May include a few extra repo refs that aren't in sync_relays.
24/// This is acceptable - we won't query other relays for them.
25type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>;
26
27/// Relays we sync from, including their repos and events
28/// Key: relay URL
29/// Value: Map of repo_ref -> event IDs for repos that list both this relay AND our service
30///
31/// Note: Bootstrap relay (if configured) is always present and excluded from removal logic.
32type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>;
33```
34
35## Architecture Overview
36
37```mermaid
38flowchart TB
39 subgraph Startup
40 DB[(Database)]
41 Q1[Query kinds 1617/1618/1619/1621]
42 Q2[Query kind 30617]
43 DB --> Q1 --> FRRE[following_repo_root_events]
44 DB --> Q2 --> SR[sync_relays]
45 BR[Bootstrap Relay] --> SR
46 end
47
48 subgraph SyncManager
49 SS[Self-Subscriber]
50 FRRE --> SS
51 SR --> SS
52 end
53
54 subgraph SyncRelays
55 R1[Relay Connection 1]
56 R2[Relay Connection 2]
57 RN[Relay Connection N]
58 end
59
60 SS -->|spawn/update| R1
61 SS -->|spawn/update| R2
62 SS -->|spawn/update| RN
63
64 R1 -->|events| AP[Acceptance Policy]
65 R2 -->|events| AP
66 RN -->|events| AP
67 AP -->|store| DB
68```
69
70## Design Decision: No Jitter
71
72We considered adding jitter to prevent thundering herd scenarios when:
73
74- Multiple relay connections initialize simultaneously
75- Batched updates affect multiple relays
76- Filter consolidation triggers across connections
77
78**Decision: No jitter implemented.**
79
80**Rationale:**
81
82- Our GRASP server should handle the load of simultaneous operations
83- Jitter would lead to more orphan filters (filters added one at a time rather than atomically)
84- Jitter creates inefficiency - partial subscriptions miss events during the stagger window
85- The batching window (5s) already provides natural smoothing without the downsides
86
87## Health Tracking & Backoff
88
89```rust
90/// Health state machine for relay connections
91enum HealthState {
92 Healthy, // Connected and working
93 Backoff(u32), // Failed, attempt count for exponential backoff
94 Dead, // 24h+ continuous failures
95}
96
97impl RelayHealthTracker {
98 /// Backoff durations:
99 /// - Attempt 1: 5s
100 /// - Attempt 2: 10s
101 /// - Attempt 3: 20s
102 /// - Attempt 4: 40s
103 /// - ... exponential up to 1h max
104 /// - After reaching 1h, continue hourly until 24h total failure time
105 /// - After 24h: marked Dead, retry once per 24h
106 fn get_backoff(&self, relay_url: &str) -> Duration;
107}
108```
109
110| State | Retry Behavior |
111| ----------- | -------------------------------------------- |
112| **Healthy** | Immediate reconnect on disconnect |
113| **Backoff** | 5s → 10s → 20s → ... → 1h max (exponential) |
114| **Hourly** | Once per hour after hitting 1h cap |
115| **Dead** | After 24h total failures, retry once per 24h |
116
117## Startup Initialization
118
119At startup, two database queries initialize the sync state:
120
121```rust
122impl SyncManager {
123 async fn initialize_from_database(&mut self) -> Result<()> {
124 // Initialize bootstrap relay if configured (never removed)
125 if let Some(bootstrap_url) = &self.config.bootstrap_relay_url {
126 self.sync_relays.write().await.insert(
127 bootstrap_url.clone(),
128 HashMap::new() // Repos potentially populated below but may stay empty (Layer 1 only)
129 );
130 }
131
132 // Query 1: Build following_repo_root_events
133 // Find all 1617/1618/1619/1621 events and extract their repo references
134 let root_events = self.database
135 .query(Filter::new().kinds([
136 Kind::GitPatch, // 1617
137 Kind::Custom(1618), // PRs
138 Kind::Custom(1619), // PR updates
139 Kind::Custom(1621), // Issues
140 ]))
141 .await?;
142
143 for event in root_events {
144 // An event may have multiple 'a' tags pointing to different repos
145 let repo_refs = self.extract_all_repo_refs(&event);
146 for repo_ref in repo_refs {
147 self.following_repo_root_events
148 .write().await
149 .entry(repo_ref)
150 .or_default()
151 .insert(event.id);
152 }
153 }
154
155 // Query 2: Build sync_relays from kind 30617 announcements
156 let announcements = self.database
157 .query(Filter::new().kind(Kind::Custom(30617)))
158 .await?;
159
160 for event in announcements {
161 let repo_ref = self.build_repo_ref(&event);
162 let relay_urls = self.extract_relay_urls(&event);
163
164 // Only track repos that list BOTH a remote relay AND our service
165 if self.lists_our_service(&event) {
166 for relay_url in relay_urls {
167 if !self.is_own_relay(&relay_url) {
168 // Get events for this repo from following_repo_root_events
169 let events = self.following_repo_root_events
170 .read().await
171 .get(&repo_ref)
172 .cloned()
173 .unwrap_or_default();
174
175 self.sync_relays
176 .write().await
177 .entry(relay_url)
178 .or_default()
179 .insert(repo_ref.clone(), events);
180 }
181 }
182 }
183 }
184
185 Ok(())
186 }
187
188 /// Extract ALL repo refs from an event (it may tag multiple repos)
189 fn extract_all_repo_refs(&self, event: &Event) -> Vec<String> {
190 event.tags.iter()
191 .filter_map(|tag| {
192 let tag_vec = tag.clone().to_vec();
193 if tag_vec.len() >= 2 && tag_vec[0] == "a" {
194 // Validate it's a 30617 reference
195 if tag_vec[1].starts_with("30617:") {
196 Some(tag_vec[1].clone())
197 } else {
198 None
199 }
200 } else {
201 None
202 }
203 })
204 .collect()
205 }
206}
207```
208
209## Self-Subscriber: Event-Driven Updates
210
211A single self-subscriber watches for new events from **our own relay** and updates the HashMaps.
212
213**Important:** The self-subscriber does NOT subscribe to kind 30618 as this would never lead to refreshing the sync filters. Those events are synced from remote relays only (via Layer 1 filter on sync relay connections).
214
215### Batching Strategy
216
217The batch timer **starts only when the first event arrives**, not on a fixed interval. This prevents the scenario where an event arriving at second 4 of a 5-second interval only gets 1 second before the batch fires.
218
219```rust
220impl SelfSubscriber {
221 async fn run(&self) {
222 // Subscribe to our own relay for relevant kinds
223 // Note: 30618 NOT included - synced from remote relays only
224 let filter = Filter::new()
225 .kinds([
226 Kind::Custom(30617), // Repository announcements
227 Kind::GitPatch, // 1617 Patches
228 Kind::Custom(1618), // PRs
229 Kind::Custom(1619), // PR updates
230 Kind::Custom(1621), // Issues
231 ]);
232
233 let mut pending_updates: Vec<PendingUpdate> = Vec::new();
234 let mut batch_deadline: Option<Instant> = None;
235
236 loop {
237 let timeout = batch_deadline
238 .map(|d| d.saturating_duration_since(Instant::now()))
239 .unwrap_or(Duration::MAX);
240
241 tokio::select! {
242 Some(event) = self.event_receiver.recv() => {
243 pending_updates.push(self.classify_update(&event));
244
245 // Start batch timer on first event
246 if batch_deadline.is_none() {
247 batch_deadline = Some(Instant::now() + Duration::from_secs(5));
248 }
249 }
250 _ = tokio::time::sleep(timeout), if batch_deadline.is_some() => {
251 // Batch window elapsed - apply all pending updates
252 self.apply_batched_updates(pending_updates.drain(..).collect()).await;
253 batch_deadline = None;
254 }
255 }
256 }
257 }
258
259 fn classify_update(&self, event: &Event) -> PendingUpdate {
260 match event.kind.as_u16() {
261 30617 => PendingUpdate::NewAnnouncement(event.clone()),
262 1617 | 1618 | 1619 | 1621 => PendingUpdate::NewRootEvent(event.clone()),
263 _ => PendingUpdate::None,
264 }
265 }
266}
267```
268
269### Applying Batched Updates
270
271When the batch window closes, we process all pending updates together:
272
273```rust
274/// Batched updates grouped by relay
275struct RelayUpdateBatch {
276 /// New repo refs to subscribe to (Layer 2)
277 new_repo_refs: HashSet<String>,
278 /// New event IDs to subscribe to (Layer 3)
279 new_event_ids: HashSet<EventId>,
280 /// Whether this is a newly discovered relay
281 is_new_relay: bool,
282}
283
284impl SelfSubscriber {
285 async fn apply_batched_updates(&mut self, updates: Vec<PendingUpdate>) {
286 // Step 1: Process all updates and update HashMaps
287 // Build batched actions per relay
288 let mut relay_batches: HashMap<String, RelayUpdateBatch> = HashMap::new();
289
290 for update in updates {
291 match update {
292 PendingUpdate::NewAnnouncement(event) => {
293 self.process_announcement(&event, &mut relay_batches).await;
294 }
295 PendingUpdate::NewRootEvent(event) => {
296 self.process_root_event(&event, &mut relay_batches).await;
297 }
298 PendingUpdate::None => {}
299 }
300 }
301
302 // Step 2: Apply batched updates to each relay
303 for (relay_url, batch) in relay_batches {
304 self.apply_batch_to_relay(&relay_url, batch).await;
305 }
306
307 // Step 3: Check for relay removal (repos removed from announcements)
308 self.check_relay_removal().await;
309 }
310
311 async fn apply_batch_to_relay(&mut self, relay_url: &str, batch: RelayUpdateBatch) {
312 if batch.is_new_relay {
313 // Spawn new relay connection with full filters
314 self.spawn_sync_relay(relay_url.to_string()).await;
315 return;
316 }
317
318 // Build incremental filters for new content (NO since - get historical)
319 let incremental_filters = self.build_incremental_filters(&batch);
320
321 if incremental_filters.is_empty() {
322 return;
323 }
324
325 // Check if we need to consolidate
326 let current_filter_count = self.get_filter_count_for_relay(relay_url).await;
327 let new_filter_count = current_filter_count + incremental_filters.len();
328
329 if new_filter_count > 70 {
330 // Consolidate: add incremental filters first (no since), wait for EOSE,
331 // then close all and resubscribe with consolidated filters (with since)
332 self.consolidate_relay_subscription(relay_url, incremental_filters).await;
333 } else {
334 // Just add incremental filters (no since - to get historical events)
335 self.send_filters_to_relay(relay_url, incremental_filters).await;
336 }
337 }
338
339 fn build_incremental_filters(&self, batch: &RelayUpdateBatch) -> Vec<Filter> {
340 let mut filters = Vec::new();
341
342 // Layer 2: New repo refs (for ALL kinds that tag repos with 'a' tags)
343 if !batch.new_repo_refs.is_empty() {
344 let refs: Vec<String> = batch.new_repo_refs.iter().cloned().collect();
345 for chunk in refs.chunks(100) {
346 // All kinds with lowercase 'a' tag
347 filters.push(
348 Filter::new()
349 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec())
350 );
351 // All kinds with uppercase 'A' tag
352 filters.push(
353 Filter::new()
354 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec())
355 );
356 // All kinds with 'q' tag (quote)
357 filters.push(
358 Filter::new()
359 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
360 );
361 }
362 }
363
364 // Layer 3: New event IDs
365 if !batch.new_event_ids.is_empty() {
366 let ids: Vec<String> = batch.new_event_ids.iter()
367 .map(|id| id.to_hex())
368 .collect();
369 for chunk in ids.chunks(100) {
370 filters.push(
371 Filter::new()
372 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec())
373 );
374 filters.push(
375 Filter::new()
376 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec())
377 );
378 filters.push(
379 Filter::new()
380 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
381 );
382 }
383 }
384
385 filters
386 }
387}
388```
389
390### Consolidation Strategy
391
392When consolidating, we need a two-phase approach:
393
3941. First, subscribe with incremental filters (no `since`) to get any historical events we missed
3952. After receiving EOSE, close all subscriptions and resubscribe with consolidated filters (with `since`)
396
397```rust
398async fn consolidate_relay_subscription(
399 &mut self,
400 relay_url: &str,
401 incremental_filters: Vec<Filter>,
402) {
403 // Phase 1: Add incremental filters WITHOUT since to catch up on new content
404 // These filters are for new repo_refs / event_ids we just discovered
405 let phase1_sub_id = self.send_filters_to_relay_and_wait_eose(
406 relay_url,
407 incremental_filters
408 ).await;
409
410 // Phase 2: After EOSE, consolidate everything
411 // Close ALL existing subscriptions for this relay
412 self.close_all_subscriptions(relay_url).await;
413
414 // Build fresh consolidated filters using current HashMap state
415 let consolidated_filters = self.build_three_layer_filters_for_relay(relay_url).await;
416
417 // Resubscribe with since = now - 15 minutes
418 let since = Timestamp::now() - 900;
419 let filters_with_since: Vec<Filter> = consolidated_filters
420 .into_iter()
421 .map(|f| f.since(since))
422 .collect();
423
424 self.send_filters_to_relay(relay_url, filters_with_since).await;
425}
426```
427
428## Sync Relay Connections
429
430Each sync relay connection uses the three-layer filter strategy:
431
432```rust
433impl SyncRelayConnection {
434 async fn start(&mut self) {
435 loop {
436 match self.connect_and_subscribe().await {
437 Ok(()) => {
438 // Record successful connection
439 self.last_successful_connection = Instant::now();
440 self.health_tracker.record_success(&self.url);
441
442 // Run event loop until disconnect
443 self.run_event_loop().await;
444 }
445 Err(e) => {
446 self.health_tracker.record_failure(&self.url);
447 }
448 }
449
450 // Reconnect with backoff and since filter
451 let backoff = self.health_tracker.get_backoff(&self.url);
452 tokio::time::sleep(backoff).await;
453
454 // On reconnect, use since = last_successful - 15 minutes
455 self.reconnect_since = Some(
456 Timestamp::from(self.last_successful_connection - Duration::from_secs(900))
457 );
458 }
459 }
460
461 async fn connect_and_subscribe(&mut self) -> Result<()> {
462 self.client.connect().await?;
463
464 let filters = self.build_three_layer_filters().await;
465
466 // Apply since filter if reconnecting
467 let filters = if let Some(since) = self.reconnect_since {
468 filters.into_iter().map(|f| f.since(since)).collect()
469 } else {
470 filters
471 };
472
473 for filter in filters {
474 self.client.subscribe(filter, None).await?;
475 }
476
477 Ok(())
478 }
479}
480```
481
482## Three-Layer Filter Strategy
483
484```rust
485impl SyncRelayConnection {
486 async fn build_three_layer_filters(&self) -> Vec<Filter> {
487 let mut filters = Vec::new();
488
489 // Get repos for this relay
490 let repos = self.sync_relays.read().await
491 .get(&self.url)
492 .cloned()
493 .unwrap_or_default();
494
495 // Layer 1: Announcements (kinds 30617 + 30618)
496 // Note: 30618 is ONLY synced from remote relays, not self-subscribed
497 // Always included even if relay has no repos (bootstrap relay case)
498 filters.push(
499 Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)])
500 );
501
502 // Layer 2: Events tagging repos with 'a' tags (ALL kinds)
503 // Batched per 100 repo refs
504 let repo_refs: Vec<String> = repos.keys().cloned().collect();
505 for chunk in repo_refs.chunks(100) {
506 filters.push(
507 Filter::new()
508 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec())
509 );
510 filters.push(
511 Filter::new()
512 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec())
513 );
514 filters.push(
515 Filter::new()
516 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
517 );
518 }
519
520 // Layer 3: Events tagging root events (batch per 100 event IDs)
521 let all_event_ids: HashSet<EventId> = repos.values()
522 .flat_map(|ids| ids.iter().cloned())
523 .collect();
524
525 let event_id_strs: Vec<String> = all_event_ids
526 .iter()
527 .map(|id| id.to_hex())
528 .collect();
529
530 for chunk in event_id_strs.chunks(100) {
531 filters.push(
532 Filter::new()
533 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec())
534 );
535 filters.push(
536 Filter::new()
537 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec())
538 );
539 filters.push(
540 Filter::new()
541 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec())
542 );
543 }
544
545 filters
546 }
547}
548```
549
550## Relay Removal
551
552```rust
553async fn check_relay_removal(&mut self) {
554 let relays_to_check: Vec<String> = self.sync_relays.read().await
555 .keys()
556 .cloned()
557 .collect();
558
559 for relay_url in relays_to_check {
560 // Never remove bootstrap relay
561 if Some(relay_url.as_str()) == self.config.bootstrap_relay_url.as_deref() {
562 continue;
563 }
564
565 // Check if relay has any repos left
566 let should_remove = {
567 let sync_relays = self.sync_relays.read().await;
568 sync_relays.get(&relay_url)
569 .map(|repos| repos.is_empty())
570 .unwrap_or(true)
571 };
572
573 if should_remove {
574 // Remove from HashMap
575 self.sync_relays.write().await.remove(&relay_url);
576
577 // Close connection
578 self.close_relay_connection(&relay_url).await;
579 }
580 }
581}
582```
583
584## Prometheus Metrics (Same as v1)
585
586| Metric | Type | Labels | Description |
587| ------------------------------------- | ------- | ------------- | ----------------------- |
588| `ngit_sync_relay_connected` | Gauge | relay | Connection status 1/0 |
589| `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome |
590| `ngit_sync_relay_status` | Gauge | relay | Health state 1/2/3 |
591| `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures |
592| `ngit_sync_events_total` | Counter | source | Events by source type |
593| `ngit_sync_gap_events_total` | Counter | relay | Gap events filled |
594| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered |
595| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected |
596| `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count |
597
598## Module Structure (Simplified)
599
600```
601src/sync/
602├── mod.rs # Module exports, constants
603├── manager.rs # SyncManager - orchestrates sync
604├── state.rs # FollowingRepoRootEvents + SyncRelays HashMaps
605├── self_subscriber.rs # Self-subscriber + batching logic
606├── relay_connection.rs # Per-relay WebSocket + filters
607├── health.rs # RelayHealthTracker (reuse from v1)
608└── metrics.rs # SyncMetrics (reuse from v1)
609```
610
611## Comparison: v1 vs v2
612
613| Aspect | v1 (Current) | v2 (Simplified) |
614| ------------------- | ------------------------------------------------------------------ | --------------------------------------------- |
615| **State Model** | Spread across FilterService, SubscriptionManager, ConnectionState | Two HashMaps derived from DB |
616| **Relay Discovery** | Multiple paths: bootstrap, DB query, self-subscribe, remote events | Single path: DB init + self-subscribe |
617| **Filter Updates** | Dynamic per-event subscription adds | Batched updates (5s window, starts on event) |
618| **Consolidation** | Per-connection at 150 filters | Per-connection at 70 filters |
619| **Batching** | Per 100 tags | Per 100 tags |
620| **Reconnection** | Various backoff strategies | Unified: since = last_success - 15min |
621| **Jitter** | Startup jitter | None (see design decision) |
622| **30618 Handling** | Synced everywhere | Remote relays only, not self-subscribed |
623| **1621 (Issues)** | Not included | Included with 1617/1618/1619 |
624| **Layer 2 Scope** | Specific NIP-34 kinds | ALL kinds with 'a' tags |
625| **Health Backoff** | Variable | 5s → exp → 1h max → hourly → dead@24h → daily |
626
627## Key Design Decisions
628
6291. **Single Source of Truth**: Two HashMaps represent all sync state, initialized from database
6302. **Event-Driven Updates**: Self-subscriber updates HashMaps; relay connections read from them
6313. **Batched Filter Updates**: 5-second window that starts on first event (not fixed interval)
6324. **Uniform Reconnection**: Always use `since = last_successful - 15min`
6335. **No Jitter**: Trade-offs not worth it - orphan filters and inefficiency outweigh thundering herd concerns
6346. **Bootstrap Relay Protected**: Never removed from sync_relays; subscribes with Layer 1 even if empty
6357. **30618 Remote-Only**: Maintainer state synced from remote relays, not self-subscribed
6368. **70 Filter Consolidation Threshold**: Lower than v1's 150 for earlier consolidation
6379. **100-Tag Batching**: Consistent batch size for Layer 2 and Layer 3 filters
63810. **Layer 2 All Kinds**: Subscribe to ALL events with 'a' tags, not just NIP-34 kinds
63911. **Two-Phase Consolidation**: Incremental filters WITHOUT since first, then consolidated WITH since
64012. **Multiple Repo Refs**: Handle events that tag multiple repos correctly
641
642---
643
644## Detailed Flow Diagram
645
646```mermaid
647sequenceDiagram
648 participant DB as Database
649 participant SM as SyncManager
650 participant SS as Self-Subscriber
651 participant RC as RelayConnection
652
653 Note over SM: Startup
654 SM->>SM: Add bootstrap relay to sync_relays
655 SM->>DB: Query kinds 1617/1618/1619/1621
656 DB-->>SM: Root events
657 SM->>SM: Build following_repo_root_events
658 SM->>SM: Handle multi-repo events
659
660 SM->>DB: Query kind 30617
661 DB-->>SM: Announcements
662 SM->>SM: Build sync_relays
663
664 SM->>RC: Spawn connections for each relay
665 RC->>RC: Build 3-layer filters from sync_relays
666 RC->>RC: Connect and subscribe
667
668 Note over SS: Event-Driven Updates
669 SS->>SS: Subscribe to 30617/1617/1618/1619/1621
670 SS->>SS: Receive event - start 5s batch timer
671 SS->>SS: Collect more events in batch window
672 SS->>SS: Batch window closes
673 SS->>SM: Apply batched updates to HashMaps
674
675 alt New Relay Discovered
676 SM->>RC: Spawn new connection
677 else New Content for Existing Relay
678 alt Under 70 filter limit
679 SM->>RC: Add incremental filter - no since
680 else Over 70 filter limit
681 SM->>RC: Add incremental filters - no since
682 RC-->>SM: EOSE received
683 SM->>RC: Close all subscriptions
684 SM->>RC: Resubscribe consolidated - with since
685 end
686 else Relay Has No More Repos
687 alt Is Bootstrap Relay
688 SM->>SM: Keep connection - Layer 1 only
689 else Not Bootstrap Relay
690 SM->>RC: Close connection
691 end
692 end
693
694 Note over RC: Connection Lifecycle
695 RC->>RC: Process incoming events
696 RC->>DB: Store via acceptance policy
697
698 RC->>RC: Connection drops
699 RC->>RC: Wait backoff - 5s to 1h exponential
700 RC->>RC: Reconnect with since = last_success - 15min
701
702 Note over RC: If failures continue 24h
703 RC->>RC: Mark dead - retry once per 24h
704```