upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync.md
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:32:01 +0000
commit18ad93f8d0b8ce172c9c227385a21af66a507950 (patch)
tree275ada806570a2105f4e75388a565f61276209c8 /docs/explanation/grasp-02-proactive-sync.md
parent4e5a937a4ef5288e702ba2bae3daf2a78398b690 (diff)
docs: remove old grasp-02 design doc versions
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync.md1811
1 files changed, 1106 insertions, 705 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md
index c07f07c..dd508b3 100644
--- a/docs/explanation/grasp-02-proactive-sync.md
+++ b/docs/explanation/grasp-02-proactive-sync.md
@@ -1,929 +1,1330 @@
1# GRASP-02: Proactive Sync - Design Document 1# GRASP-02: Proactive Sync v4 - Health & Reconnection Design
2 2
3## Overview 3## Overview
4 4
5GRASP-02 Proactive Sync enables ngit-grasp to maintain live WebSocket connections to other relays listed in repository announcement events, synchronizing NIP-34 related events using both **live sync** (real-time subscriptions) and **negentropy catchup** (NIP-77 set reconciliation). 5This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles:
6 6
7This document covers **event syncing only**. Git data syncing is out of scope for this phase. 71. **Self-subscription as the only mechanism** - No database initialization at startup
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create
93. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions)
104. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch
115. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary
8 12
9## Goals 13---
10 14
111. **Data Availability**: Ensure we have all relevant events for repositories we host 15## Data Model
122. **Resilience**: Handle relay failures gracefully with backoff and health tracking
133. **Efficiency**: Minimize connections and bandwidth through filter consolidation
144. **Consistency**: Use unified filters for both live sync and negentropy catchup
15 16
16## Architecture Overview 17### RepoSyncIndex (Source of Truth)
17 18
18```mermaid 19```rust
19flowchart TB 20/// What we WANT to sync - derived from events received via self-subscription.
20 subgraph ngit-grasp 21/// Updated immediately when self-subscriber batch fires.
21 subgraph SyncManager 22/// Key: repo addressable ref - 30617:pubkey:identifier
22 SS[Self-Subscriber] 23pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
23 RC[Remote Connections] 24
24 end 25#[derive(Debug, Clone, Default)]
25 WS[WebSocket Server] 26pub struct RepoSyncNeeds {
26 FS[FilterService] 27 /// Relay URLs listed in this repo's 30617 announcement
27 RH[RelayHealthTracker] 28 pub relays: HashSet<String>,
28 DB[(Database)] 29 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
29 AP[AcceptancePolicy] 30 pub root_events: HashSet<EventId>,
30 MET[Prometheus Metrics] 31}
31 end 32```
32 33
33 subgraph External Relays 34### RelaySyncIndex (Confirmed State + Connection)
34 R1[relay.example.com]
35 R2[other-grasp.io]
36 R3[nostr.land]
37 end
38 35
39 WS -->|broadcasts events| SS 36```rust
40 SS -->|discovers relays| RC 37/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
41 RC -->|builds filters| FS 38/// Key: relay URL
42 RC -->|tracks health| RH 39pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
43 RC -->|stores events| DB 40
44 RC -->|validates| AP 41/// Connection status for a relay
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ConnectionStatus {
44 /// Not currently connected
45 Disconnected,
46 /// Connection attempt in progress
47 Connecting,
48 /// Successfully connected and subscribed
49 Connected,
50}
51
52/// Complete state for a single relay - combines sync needs with connection lifecycle
53#[derive(Debug)]
54pub struct RelayState {
55 /// Repos we have confirmed syncing from this relay
56 pub repos: HashSet<String>,
57 /// Root events we have confirmed tracking
58 pub root_events: HashSet<EventId>,
59 /// If true, never disconnect this relay
60 pub is_bootstrap: bool,
61 /// Current connection status
62 pub connection_status: ConnectionStatus,
63 /// When we last successfully connected - used for since filter on reconnect
64 pub last_connected: Option<Timestamp>,
65 /// When we disconnected - for 15-minute state retention rule
66 pub disconnected_at: Option<Timestamp>,
67}
45 68
46 RC <-->|WebSocket + NEG| R1 69impl RelayState {
47 RC <-->|WebSocket + NEG| R2 70 /// Check if state should be cleared based on 15-minute rule
48 RC <-->|WebSocket + NEG| R3 71 pub fn should_clear_state(&self) -> bool {
72 match self.disconnected_at {
73 Some(disconnected) => {
74 let now = Timestamp::now();
75 now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes
76 }
77 None => false, // Still connected or never connected
78 }
79 }
49 80
50 RH -->|exposes state| MET 81 /// Clear repos and root_events - called when reconnect takes > 15 minutes
82 pub fn clear_sync_state(&mut self) {
83 self.repos.clear();
84 self.root_events.clear();
85 }
86}
51``` 87```
52 88
53**Key Insight: Self-Subscribe Architecture** 89### PendingSyncIndex (In-Flight Batches)
54 90
55The SyncManager uses a "self-subscribe" pattern for relay discovery. Rather than polling the database periodically, it connects to its own WebSocket server as a client and subscribes to kind 30617 events. When new announcements are saved (from any source), the self-subscriber receives them instantly and can spawn connections to newly discovered relays. 91```rust
92/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
93/// Each batch has its own ID and can confirm independently.
94/// Key: relay URL
95pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
96
97#[derive(Debug, Clone)]
98pub struct PendingBatch {
99 /// Unique ID for this batch - for debugging/logging
100 pub batch_id: u64,
101 /// The items this batch is syncing
102 pub items: PendingItems,
103 /// Subscription IDs that must ALL receive EOSE before confirming
104 pub outstanding_subs: HashSet<SubscriptionId>,
105}
56 106
57## Connection Management 107#[derive(Debug, Clone, Default)]
108pub struct PendingItems {
109 pub repos: HashSet<String>,
110 pub root_events: HashSet<EventId>,
111}
112```
58 113
59### Relay Discovery 114---
60 115
61Relays to connect to are discovered using a **self-subscribe architecture** rather than periodic polling. The SyncManager connects to its own relay as a client and subscribes to kind 30617 (repository announcement) events. When a new announcement is saved to the database (from direct submission or sync), the self-subscriber receives it immediately and discovers new relays to connect to. 116## Connection Lifecycle State Machine
62 117
63```mermaid 118```mermaid
64flowchart LR 119stateDiagram-v2
65 subgraph Relay 120 [*] --> Disconnected: discover relay via RepoSyncIndex
66 WS[WebSocket Server] 121 Disconnected --> Connecting: AddFilters triggers spawn_connection
67 DB[(Database)] 122 Connecting --> Connected: success
68 end 123 Connecting --> Disconnected: failure + record in health tracker
69 124 Connected --> Disconnected: connection lost
70 subgraph SyncManager 125 Connected --> [*]: intentional disconnect via check_disconnects
71 SS[Self-Subscribe Client] 126
72 RC[Remote Connections] 127 note right of Disconnected: disconnected_at set for 15min rule
73 end 128 note right of Connected: last_connected tracked for since filter
74
75 WS -->|broadcast| SS
76 SS -->|extract relay URLs| RC
77 RC -->|sync events| WS
78``` 129```
79 130
80**Why Self-Subscribe vs Polling?** 131---
81 132
82| Approach | Latency | Complexity | Resource Use | 133## Flow Scenarios
83|----------|---------|------------|--------------|
84| Self-Subscribe | Instant | Low | Minimal (1 WS connection) |
85| Periodic Polling | 30s+ delay | Higher | DB queries every N seconds |
86 134
87The self-subscribe approach provides: 135### Scenario 1: Initial Connect via handle_connect_or_reconnect
88- **Immediate discovery**: New relays discovered instantly when announcement saved
89- **No polling overhead**: No periodic database queries
90- **Simple architecture**: Reuses existing WebSocket infrastructure
91 136
92**Implementation Pattern:** 137```mermaid
138flowchart TB
139 START[Startup] --> SS[Self-subscribe to own relay]
140 SS --> |no since filter| EVENTS[Receive historical events]
141 EVENTS --> RSI[Update RepoSyncIndex]
142 RSI --> DT[derive_relay_targets]
143 DT --> CA[compute_actions with targets and empty confirmed]
144 CA --> AF[AddFilters for each relay]
145 AF --> SPAWN{Relay connected?}
146 SPAWN --> |no| CONN[spawn_connection]
147 CONN --> HC[handle_connect_or_reconnect]
148 SPAWN --> |yes| SUB
149
150 subgraph handle_connect_or_reconnect - Fresh Sync
151 HC --> CHECK_FRESH{is_fresh_sync?}
152 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since]
153 L1 --> RCA[recompute_actions_for_relay]
154 end
93 155
94```rust 156 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters]
95// In SyncManager::run() 157 SUB --> PB[Create PendingBatch]
96let self_client = Client::default(); 158 PB --> EOSE[Wait for EOSE]
97self_client.add_relay(&own_relay_url).await?; 159 EOSE --> CONFIRM[Move items to confirmed repos/root_events]
98self_client.connect().await;
99
100let filter = Filter::new().kind(Kind::Custom(30617));
101self_client.subscribe(filter, None).await?;
102
103// Handle notifications - when announcement arrives, extract relay URLs
104client.handle_notifications(|notification| async {
105 if let RelayPoolNotification::Event { event, .. } = notification {
106 let new_urls = filter_service.extract_relay_urls_from_event(&event);
107 for url in new_urls {
108 if !active_relays.contains(&url) && !is_own_relay(&url) {
109 spawn_connection(url, tx.clone(), filter_service.clone());
110 }
111 }
112 }
113 Ok(false) // Continue processing
114});
115``` 160```
116 161
117**Startup Discovery:** At startup, existing announcements in the database are queried once to discover initial relays. After startup, all discovery is event-driven via self-subscribe. 162**Key points:**
118 163
119**Reconnection:** The self-subscriber has built-in exponential backoff reconnection (1s → 60s max) to handle temporary disconnections from our own relay. 164- No `since` filter on initial connect - get full history
165- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()`
166- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since
167- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking
120 168
121### Connection Lifecycle 169### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes
122 170
123```mermaid 171```mermaid
124stateDiagram-v2 172flowchart TB
125 [*] --> Connecting: startup/new relay 173 DISC[Connection lost] --> MARK[Set disconnected_at = now]
126 Connecting --> Connected: success 174 MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay]
127 Connecting --> Backoff: failure 175 CLEAR_PEND --> WAIT[Wait for reconnection]
128 Connected --> Disconnected: connection lost 176 WAIT --> RECONN[Connection restored]
129 Disconnected --> Backoff: reconnect failed 177 RECONN --> HC[handle_connect_or_reconnect]
130 Backoff --> Connecting: backoff timer expires 178
131 Backoff --> Dead: 24h continuous failures 179 subgraph handle_connect_or_reconnect - Quick Reconnect
132 Dead --> Connecting: daily retry timer 180 HC --> CHECK{is_fresh_sync?}
133 Connected --> Updating: filter change 181 CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min]
134 Updating --> Connected: complete 182 SINCE --> L1[build_announcement_filter - with since]
183 L1 --> L23[rebuild_layer2_and_layer3 - with since]
184 L23 --> RCA[recompute_actions_for_relay]
185 end
186
187 RCA --> AF[AddFilters for new items only]
188 AF --> SUB[Subscribe]
189 SUB --> PB[Create PendingBatch]
190 PB --> EOSE[Wait for EOSE]
191 EOSE --> EXTEND[Extend confirmed state]
135``` 192```
136 193
137### Health Tracking & Backoff 194**Key points:**
138 195
139| State | Behavior | 196- PendingSyncIndex cleared on disconnect (not reconnect)
140| ----------- | --------------------------------------------------- | 197- `handle_connect_or_reconnect`:
141| **Healthy** | Normal operation, immediate reconnect on disconnect | 198 1. `build_announcement_filter(Some(since))` - Layer 1 with since
142| **Backoff** | Exponential backoff: 5s → 10s → 20s → ... → 1h max | 199 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
143| **Dead** | 24h of continuous failures, retry once per day | 200 3. `recompute_actions_for_relay` - check for new items
201- since = last_connected - 15min ensures we catch events during disconnection
144 202
145Health state is **kept in-memory** using a `DashMap` for lock-free concurrent access: 203### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes
146 204
147```rust 205```mermaid
148/// In-memory relay health tracking (NOT persisted to database) 206flowchart TB
149/// 207 RECONN[Connection restored] --> HC[handle_connect_or_reconnect]
150/// Design rationale: For <100 relays, persistence adds complexity without
151/// significant benefit. Conservative initial backoff on restart avoids
152/// thundering herd issues.
153struct RelayHealthTracker {
154 health: DashMap<RelayUrl, RelayHealth>,
155 metrics: SyncMetrics, // Prometheus metrics for operator visibility
156}
157 208
158struct RelayHealth { 209 subgraph handle_connect_or_reconnect - Stale Reconnect
159 url: RelayUrl, 210 HC --> CHECK{is_fresh_sync?}
160 status: RelayStatus, // Healthy, Backoff, Dead 211 CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state]
161 consecutive_failures: u32, 212 CLEAR --> L1[build_announcement_filter - no since]
162 last_failure_at: Option<Instant>, 213 L1 --> RCA[recompute_actions_for_relay]
163 last_success_at: Option<Instant>, 214 end
164 next_retry_at: Instant,
165}
166 215
167enum RelayStatus { 216 RCA --> CA[compute_actions with empty confirmed]
168 Healthy, 217 CA --> AF[AddFilters for everything]
169 Backoff { attempt: u32 }, // backoff = min(5 * 2^attempt, 3600) seconds 218 AF --> SUB[Subscribe - no since filter]
170 Dead, // retry in 24h 219 SUB --> PB[Create PendingBatch]
171} 220 PB --> EOSE[Wait for EOSE]
221 EOSE --> CONFIRM[Populate confirmed state fresh]
172``` 222```
173 223
174### Restart Behavior (Graceful Degradation) 224**Key points:**
175 225
176On restart, all relay health state is reset. To avoid thundering herd: 226- `should_clear_state()` returns true → triggers fresh sync
227- Same path as initial connect after clearing state
228- Layer 1: `build_announcement_filter(None)` - full history
229- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything
177 230
1781. **Conservative initial backoff**: Start with 5s delay (not immediate) for all relays 231### Scenario 4: Consolidation - Triggered on Filter Add
1792. **Staggered connection attempts**: Add random jitter (0-2s) per relay
1803. **Health rebuilds organically**: Relays prove themselves healthy through successful connections
181 232
182```rust 233```mermaid
183impl RelayHealthTracker { 234flowchart TB
184 fn new(metrics: SyncMetrics) -> Self { 235 AF[handle_add_filters called] --> COUNT{current + new > 70?}
185 Self { 236 COUNT --> |yes| CONSOLIDATE[consolidate]
186 health: DashMap::new(), 237 CONSOLIDATE --> WAIT_PEND[wait_pending_complete]
187 metrics, 238 WAIT_PEND --> CLOSE[unsubscribe_all]
188 } 239 CLOSE --> SINCE[since = now - 15min]
189 } 240 SINCE --> L1[build_announcement_filter - with since]
241 L1 --> L23[rebuild_layer2_and_layer3 - with since]
242 COUNT --> |no| SUB[Subscribe new filters]
243 SUB --> PB[Create PendingBatch]
244```
190 245
191 /// Called on startup for each discovered relay 246**Key points:**
192 fn initialize_relay(&self, url: RelayUrl) {
193 self.health.insert(url.clone(), RelayHealth {
194 url,
195 status: RelayStatus::Backoff { attempt: 0 }, // Start conservative
196 consecutive_failures: 0,
197 last_failure_at: None,
198 last_success_at: None,
199 next_retry_at: Instant::now() + Self::initial_backoff_with_jitter(),
200 });
201 }
202 247
203 fn initial_backoff_with_jitter() -> Duration { 248- Consolidation checked in `handle_add_filters` BEFORE adding new filters
204 Duration::from_secs(5) + Duration::from_millis(rand::random::<u64>() % 2000) 249- After closing all subscriptions, re-subscribe:
205 } 250 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
206} 251 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
252- `since = now - 15min` prevents re-fetching old events
253- Keeps confirmed state, just reduces filter count
254
255### Scenario 5: Daily Timer - 23 to 25h Random
256
257```mermaid
258flowchart TB
259 DAILY[Daily timer fires] --> CLOSE[unsubscribe_all]
260 CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay]
261 CLEAR_PEND --> CLEAR_STATE[clear_sync_state]
262 CLEAR_STATE --> L1[build_announcement_filter - no since]
263 L1 --> RCA[recompute_actions_for_relay]
264 RCA --> CA[compute_actions with empty confirmed]
265 CA --> AF[AddFilters for everything]
266 AF --> SUB[Subscribe - no since filter]
267 SUB --> PB[Create PendingBatch]
268 PB --> EOSE[Wait for EOSE]
269 EOSE --> CONFIRM[Repopulate confirmed state]
207``` 270```
208 271
209**Trade-off**: We lose knowledge of chronically failing relays across restarts. This is acceptable because: 272**Key points:**
273
274- Daily timer is a full fresh sync, NOT consolidation
275- Clears both PendingSyncIndex and confirmed state
276- Layer 1: `build_announcement_filter(None)` - full history
277- Layer 2+3: via compute_actions with empty confirmed - full history
278- Detects any state drift accumulated over 24 hours
210 279
211- Scale is small (<100 relays) 280---
212- Conservative initial backoff prevents hammering bad relays
213- Prometheus metrics preserve historical health data for operators
214 281
215## Filter Strategy 282## Core Algorithms
216 283
217### Unified Filters for Live Sync and Negentropy 284### 1. derive_relay_targets
218 285
219The same filter logic is used for both live subscriptions and negentropy reconciliation: 286Transform RepoSyncIndex into per-relay sync targets:
220 287
221```mermaid 288```rust
222flowchart LR 289/// Inverts RepoSyncIndex to get per-relay view
223 subgraph Filter Layers 290fn derive_relay_targets(
224 F1[Layer 1: All 30617+30618] 291 repo_index: &HashMap<String, RepoSyncNeeds>
225 F2[Layer 2: Events tagging repos via A/a/q] 292) -> HashMap<String, RelaySyncNeeds> {
226 F3[Layer 3: Events tagging PRs/Issues via E/e/q] 293 let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
227 end 294
295 for (repo_ref, needs) in repo_index {
296 for relay_url in &needs.relays {
297 let target = targets.entry(relay_url.clone()).or_default();
298 target.repos.insert(repo_ref.clone());
299 target.root_events.extend(needs.root_events.iter().cloned());
300 }
301 }
228 302
229 F1 -->|client-side| AP[Acceptance Policy] 303 targets
230 F2 -->|server-side| Relay 304}
231 F3 -->|server-side| Relay
232``` 305```
233 306
234### Layer 1: Repository Announcements & States 307### 2. compute_actions (Three-Way Diff)
235 308
236Get ALL kind 30617 and 30618 events with unified `since` timestamp, then filter client-side through acceptance policy: 309**This is the ONLY decision point for what NEW subscriptions to create.**
237 310
238```rust 311```rust
239// Use same since filter as other layers for consistency 312/// Computes AddFilters for items that are:
240let layer1_filter = Filter::new() 313/// - In targets (what we want)
241 .kinds([Kind::from(30617), Kind::from(30618)]) 314/// - NOT in pending (already in-flight)
242 .since(since_timestamp); // Unified with Layer 2/3 315/// - NOT in confirmed (already confirmed)
316fn compute_actions(
317 targets: &HashMap<String, RelaySyncNeeds>,
318 pending: &HashMap<String, Vec<PendingBatch>>,
319 confirmed: &HashMap<String, RelayState>,
320) -> Vec<AddFilters> {
321 let mut actions = Vec::new();
322
323 for (relay_url, target) in targets {
324 // Skip disconnected relays - they will get AddFilters on reconnect
325 if let Some(state) = confirmed.get(relay_url) {
326 if state.connection_status != ConnectionStatus::Connected {
327 continue;
328 }
329 }
330
331 // Collect all pending items for this relay
332 let pending_repos: HashSet<_> = pending.get(relay_url)
333 .map(|batches| batches.iter()
334 .flat_map(|b| b.items.repos.iter().cloned())
335 .collect())
336 .unwrap_or_default();
337 let pending_events: HashSet<_> = pending.get(relay_url)
338 .map(|batches| batches.iter()
339 .flat_map(|b| b.items.root_events.iter().cloned())
340 .collect())
341 .unwrap_or_default();
342
343 // Collect confirmed items for this relay
344 let confirmed_repos = confirmed.get(relay_url)
345 .map(|c| &c.repos)
346 .unwrap_or(&HashSet::new());
347 let confirmed_events = confirmed.get(relay_url)
348 .map(|c| &c.root_events)
349 .unwrap_or(&HashSet::new());
350
351 // New = target - pending - confirmed
352 let new_repos: HashSet<_> = target.repos.iter()
353 .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r))
354 .cloned()
355 .collect();
356 let new_events: HashSet<_> = target.root_events.iter()
357 .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e))
358 .cloned()
359 .collect();
360
361 if !new_repos.is_empty() || !new_events.is_empty() {
362 let filters = build_filters(&new_repos, &new_events);
363 actions.push(AddFilters {
364 relay_url: relay_url.clone(),
365 repos: new_repos,
366 root_events: new_events,
367 filters,
368 });
369 }
370 }
371
372 actions
373}
243``` 374```
244 375
245**Client-side validation**: Only store events that pass our [`Nip34WritePolicy`](src/nostr/builder.rs:51). 376### 3. Filter Building Functions (Three-Layer Strategy)
246 377
247### Layer 2: Events Tagging Repositories 378The filter strategy uses three layers:
248 379
249For repo announcements **that list BOTH this relay AND our service**: 380- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation
381- **Layer 2**: Events tagging our repos
382- **Layer 3**: Events tagging our root events
383
384**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch).
250 385
251```rust 386```rust
252// Build addressable references: 30617:<pubkey>:<identifier> 387/// Layer 1: Announcements filter (kinds 30617 + 30618)
253let repo_refs: Vec<String> = announcements 388/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
254 .iter() 389/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
255 .filter(|a| a.relays.contains(&this_relay) && a.lists_service(&our_domain)) 390fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
256 .map(|a| format!("30617:{}:{}", a.pubkey.to_hex(), a.identifier)) 391 let filter = Filter::new().kinds([
257 .collect(); 392 Kind::Custom(30617), // Repository announcements
258 393 Kind::Custom(30618), // Maintainer lists
259let layer2_filter = Filter::new() 394 ]);
260 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_refs.clone()) 395
261 .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo_refs)); 396 match since {
262``` 397 Some(ts) => filter.since(ts),
398 None => filter,
399 }
400}
263 401
264### Layer 3: Events Tagging Issues/PRs/Patches 402/// Layer 2: Events tagging one of our repos
403/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
404/// Batched per 100 repo refs.
405fn tagged_one_of_our_repo_event_filters(
406 repos: &HashSet<String>,
407 since: Option<Timestamp>,
408) -> Vec<Filter> {
409 let mut filters = Vec::new();
410 let repo_refs: Vec<_> = repos.iter().collect();
411
412 for chunk in repo_refs.chunks(100) {
413 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
414
415 // Lowercase 'a' tag - standard addressable reference
416 let mut f1 = Filter::new()
417 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone());
418 // Uppercase 'A' tag - some clients use this
419 let mut f2 = Filter::new()
420 .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone());
421 // Quote 'q' tag - NIP-10 quote references to addressable events
422 let mut f3 = Filter::new()
423 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
424
425 if let Some(ts) = since {
426 f1 = f1.since(ts);
427 f2 = f2.since(ts);
428 f3 = f3.since(ts);
429 }
265 430
266For events that reference PRs, Patches, or Issues from repos we track: 431 filters.push(f1);
432 filters.push(f2);
433 filters.push(f3);
434 }
267 435
268```rust 436 filters
269// Collect event IDs of PRs, Patches, Issues we've stored 437}
270let tagged_event_ids: Vec<EventId> = database
271 .query(Filter::new().kinds([1618, 1619, 1621, 1622, 1630])) // PR, PR Update, Issue, Patch, etc.
272 .iter()
273 .filter(|e| references_tracked_repo(e, &announcements))
274 .map(|e| e.id)
275 .collect();
276
277let layer3_filter = Filter::new()
278 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), tagged_event_ids.clone())
279 .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), tagged_event_ids));
280```
281 438
282### Filter Size Management 439/// Layer 3: Events tagging one of our root events
440/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
441/// Batched per 100 event IDs.
442fn tagged_one_of_our_root_event_filters(
443 root_events: &HashSet<EventId>,
444 since: Option<Timestamp>,
445) -> Vec<Filter> {
446 let mut filters = Vec::new();
447 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
448
449 for chunk in event_ids.chunks(100) {
450 let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
451
452 // Lowercase 'e' tag - standard event reference
453 let mut f1 = Filter::new()
454 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone());
455 // Uppercase 'E' tag - some clients use this
456 let mut f2 = Filter::new()
457 .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone());
458 // Quote 'q' tag - NIP-10 quote references to events
459 let mut f3 = Filter::new()
460 .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec);
461
462 if let Some(ts) = since {
463 f1 = f1.since(ts);
464 f2 = f2.since(ts);
465 f3 = f3.since(ts);
466 }
283 467
284When the tag list exceeds a threshold, split into batches: 468 filters.push(f1);
469 filters.push(f2);
470 filters.push(f3);
471 }
285 472
286```rust 473 filters
287const MAX_TAGS_PER_FILTER: usize = 100; 474}
288 475
289fn build_filters(tag_values: Vec<String>) -> Vec<Filter> { 476/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
290 tag_values 477/// Used by:
291 .chunks(MAX_TAGS_PER_FILTER) 478/// - compute_actions for incremental subscriptions
292 .map(|chunk| Filter::new().custom_tag(tag, chunk.to_vec())) 479/// - consolidation rebuilds (Layer 1 remains active)
293 .collect() 480fn build_layer2_and_layer3_filters(
481 repos: &HashSet<String>,
482 root_events: &HashSet<EventId>,
483 since: Option<Timestamp>,
484) -> Vec<Filter> {
485 let mut filters = Vec::new();
486 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
487 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
488 filters
294} 489}
295``` 490```
296 491
297**Consolidation**: When total filter count exceeds ~150 across a connection, consolidate by rebuilding from scratch. 492**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently.
298 493
299### Filter Generation vs. Policy Validation 494### 4. handle_add_filters (SyncManager)
300 495
301The filter strategy and acceptance policies serve **different purposes** even though they share conceptual knowledge: 496```rust
497impl SyncManager {
498 async fn handle_add_filters(&mut self, action: AddFilters) {
499 let AddFilters { relay_url, repos, root_events, filters } = action;
500
501 // Auto-spawn connection if needed
502 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
503 match state {
504 None => {
505 // New relay discovered - create entry and spawn connection
506 self.relay_sync_index.write().await.insert(
507 relay_url.clone(),
508 RelayState {
509 repos: HashSet::new(),
510 root_events: HashSet::new(),
511 is_bootstrap: false,
512 connection_status: ConnectionStatus::Connecting,
513 last_connected: None,
514 disconnected_at: None,
515 connection: None,
516 }
517 );
518 self.spawn_connection(&relay_url).await;
519 return; // Subscriptions will happen on connection success
520 }
521 Some(state) if state.connection_status != ConnectionStatus::Connected => {
522 // Not connected - subscriptions will happen on connection success
523 return;
524 }
525 Some(_) => {
526 // Already connected - proceed with subscription
527 }
528 }
302 529
303| Concern | Filters | Policies | 530 // Subscribe and collect subscription IDs
304|---------|---------|----------| 531 let conn = self.connections.get(&relay_url).unwrap();
305| **Direction** | What to request FROM remote relays | What to accept INTO local database | 532 let mut sub_ids = HashSet::new();
306| **Input** | Stored events (announcements, PRs, etc.) | Single incoming event | 533
307| **Output** | Filter specification | Accept/Reject decision | 534 for filter in filters {
535 match conn.client.subscribe(filter, None).await {
536 Ok(output) => {
537 for sub_id in output.val {
538 sub_ids.insert(sub_id);
539 }
540 }
541 Err(e) => {
542 tracing::warn!("Failed to subscribe: {}", e);
543 }
544 }
545 }
308 546
309The modular sub-policies ([`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24), [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25), etc.) encode knowledge about event kinds and tag types, but this knowledge is applied differently: 547 // Create pending batch
548 let batch = PendingBatch {
549 batch_id: self.next_batch_id(),
550 items: PendingItems { repos, root_events },
551 outstanding_subs: sub_ids,
552 };
310 553
311- **In filters**: We enumerate **all** addressable refs (`30617:pubkey:id`) from stored announcements 554 // Add to pending index
312- **In policies**: [`RelatedEventPolicy::check_references()`](../../src/nostr/policy/related.rs:39) checks if incoming event references **any** accepted event 555 self.pending_sync_index.write().await
556 .entry(relay_url)
557 .or_default()
558 .push(batch);
559 }
560}
561```
313 562
314Because of this fundamental difference, filter generation logic stays in `src/sync/filter.rs` rather than being delegated to policy modules. Both share the understanding of NIP-34 event relationships, but they answer different questions. 563### 5. handle_disconnect
315 564
316## Subscription Updates 565```rust
566impl SyncManager {
567 /// Called when connection to a relay is lost
568 async fn handle_disconnect(&mut self, relay_url: &str) {
569 let mut index = self.relay_sync_index.write().await;
570
571 if let Some(state) = index.get_mut(relay_url) {
572 state.connection_status = ConnectionStatus::Disconnected;
573 state.disconnected_at = Some(Timestamp::now());
574 state.connection = None;
575 }
317 576
318### Dynamic Subscription Management 577 // Clear pending batches - these items were not confirmed
578 self.pending_sync_index.write().await.remove(relay_url);
319 579
320When new events arrive that affect our filter criteria: 580 // Remove from active connections map
581 self.connections.remove(relay_url);
321 582
322```mermaid 583 // Health tracker records failure for backoff
323sequenceDiagram 584 self.health_tracker.record_failure(relay_url);
324 participant LocalRelay 585 }
325 participant SyncManager 586}
326 participant RemoteRelay
327
328 LocalRelay->>SyncManager: New PR event accepted
329 SyncManager->>SyncManager: Extract event ID
330 SyncManager->>SyncManager: Build new filter for E/e/q tags
331 SyncManager->>RemoteRelay: REQ with new filter
332 RemoteRelay-->>SyncManager: Events matching new filter
333``` 587```
334 588
335**Events that trigger subscription updates**: 589### 6. handle_connect_or_reconnect (Unified)
336 590
337- New repository announcement accepted (adds to Layer 2) 591This method handles BOTH initial connection AND reconnection with unified logic:
338- New PR/Issue/Patch accepted (adds to Layer 3)
339 592
340### When to Consolidate 593```rust
594impl SyncManager {
595 /// Called when connection to a relay succeeds - handles both initial connect and reconnect.
596 ///
597 /// Decision tree:
598 /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history
599 /// - Quick reconnect (<15min): since = last_connected - 15min
600 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
601 let mut index = self.relay_sync_index.write().await;
602 let state = match index.get_mut(relay_url) {
603 Some(s) => s,
604 None => return, // Relay was removed while disconnected
605 };
341 606
342Track subscription count per connection: 607 // Determine if this is a fresh sync or quick reconnect
608 let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state();
609 let last_connected = state.last_connected;
343 610
344```rust 611 if is_fresh_sync && state.last_connected.is_some() {
345struct ConnectionState { 612 // Stale reconnect (>15min) - clear state
346 relay_url: RelayUrl, 613 tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url);
347 subscriptions: Vec<SubscriptionId>, 614 state.clear_sync_state();
348 total_filter_count: usize, 615 }
349}
350 616
351impl ConnectionState { 617 // Update connection state
352 fn should_consolidate(&self) -> bool { 618 state.connection_status = ConnectionStatus::Connected;
353 self.total_filter_count > 150 619 state.last_connected = Some(Timestamp::now());
354 } 620 state.disconnected_at = None;
355 621
356 async fn consolidate(&mut self) { 622 // Record success in health tracker
357 // Close all subscriptions 623 self.health_tracker.record_success(relay_url);
358 // Rebuild from scratch with current database state
359 }
360}
361```
362 624
363## Negentropy Catchup 625 drop(index); // Release lock
364 626
365### NIP-77 Reconciliation Protocol 627 let conn = match self.connections.get(relay_url) {
628 Some(c) => c,
629 None => return,
630 };
366 631
367Negentropy enables efficient set reconciliation - discovering which events we're missing without transferring full event lists. 632 if is_fresh_sync {
633 // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions
368 634
369### Timing 635 // Step 1: Subscribe Layer 1 (announcements) without since
636 let layer1 = build_announcement_filter(None);
637 let _ = conn.client.subscribe(layer1, None).await;
370 638
371| Trigger | Behavior | 639 // Step 2: compute_actions will handle Layer 2+3 (with since=None in build)
372| ------------------- | -------------------------------------------------------------------- | 640 self.recompute_actions_for_relay(relay_url).await;
373| **Initial startup** | Warm-up delay, staggered if many filters, initializes daily schedule | 641 } else {
374| **After reconnect** | Delay to avoid rate limiting, limited to events from last 3 days | 642 // Quick reconnect: Layer 1 with since, Layer 2+3 with since
375| **Daily** | Staggered batches, max 100 tagged events per filter | 643 let since = last_connected
644 .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900)))
645 .unwrap_or(Timestamp::from(0));
376 646
377### Startup Flow 647 // Step 1: Subscribe Layer 1 (announcements) with since
648 let layer1 = build_announcement_filter(Some(since));
649 let _ = conn.client.subscribe(layer1, None).await;
378 650
379```mermaid 651 // Step 2: Rebuild Layer 2+3 for confirmed items with since
380sequenceDiagram 652 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
381 participant Server
382 participant SyncManager
383 participant Relay
384
385 Server->>SyncManager: Start
386 SyncManager->>SyncManager: Wait warm-up delay
387 SyncManager->>SyncManager: Build batched filters
388
389 loop For each relay with stagger delay
390 SyncManager->>Relay: NEG-OPEN with filter batch 1
391 Relay-->>SyncManager: NEG-MSG with differences
392 SyncManager->>Relay: NEG-MSG response
393 Note over SyncManager,Relay: Reconciliation rounds
394 Relay-->>SyncManager: NEG-CLOSE or events
395 SyncManager->>SyncManager: Validate + store events
396
397 alt More batches
398 SyncManager->>SyncManager: Wait stagger delay
399 SyncManager->>Relay: NEG-OPEN with next batch
400 end
401 end
402 653
403 SyncManager->>SyncManager: Schedule daily catchup 654 // Step 3: Check for NEW items via compute_actions
404``` 655 self.recompute_actions_for_relay(relay_url).await;
656 }
657 }
405 658
406### Reconnection Catchup 659 /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1).
660 /// Used by:
661 /// - Quick reconnect: rebuild confirmed items with since filter
662 /// - Consolidation: close and rebuild with since filter
663 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
664 let confirmed = self.relay_sync_index.read().await;
665 let state = match confirmed.get(relay_url) {
666 Some(s) => s,
667 None => return,
668 };
407 669
408After connection reestablished: 670 // Build Layer 2+3 filters WITH since
671 let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since);
672 drop(confirmed);
409 673
410```rust 674 // Subscribe directly - no PendingBatch for catch-up (items already confirmed)
411async fn catchup_after_reconnect(&self, relay: &RelayUrl) { 675 let conn = match self.connections.get(relay_url) {
412 // Delay to avoid immediate disconnect for too many requests 676 Some(c) => c,
413 tokio::time::sleep(RECONNECT_CATCHUP_DELAY).await; 677 None => return,
678 };
679
680 for filter in filters {
681 let _ = conn.client.subscribe(filter, None).await;
682 }
683 }
684
685 /// Rerun compute_actions for a specific relay and process resulting AddFilters.
686 /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state.
687 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
688 let repo_index = self.repo_sync_index.read().await;
689 let targets = derive_relay_targets(&repo_index);
690 drop(repo_index);
691
692 // Filter to just this relay
693 let target = match targets.get(relay_url) {
694 Some(t) => t.clone(),
695 None => return, // No repos reference this relay anymore
696 };
414 697
415 // Only catch up on recent events (last 3 days) 698 let pending = self.pending_sync_index.read().await;
416 let since = Timestamp::now() - Duration::from_secs(3 * 24 * 60 * 60); 699 let confirmed = self.relay_sync_index.read().await;
417 700
418 let filters = self.build_filters_for_relay(relay) 701 let mut single_relay_targets = HashMap::new();
419 .into_iter() 702 single_relay_targets.insert(relay_url.to_string(), target);
420 .map(|f| f.since(since))
421 .collect();
422 703
423 self.run_negentropy(relay, filters).await; 704 let actions = compute_actions(&single_relay_targets, &pending, &confirmed);
705
706 drop(pending);
707 drop(confirmed);
708
709 // Process AddFilters
710 for action in actions {
711 self.handle_add_filters(action).await;
712 }
713 }
424} 714}
425``` 715```
426 716
427### Daily Catchup Schedule 717### 7. Daily Timer
428 718
429```rust 719```rust
430// Daily catchup runs at consistent time, staggered across relays 720impl SyncManager {
431async fn schedule_daily_catchup(&self) { 721 async fn run_daily_timer(&self) {
432 let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60)); 722 loop {
723 // Random 23-25 hours
724 let hours = 23.0 + rand::random::<f64>() * 2.0;
725 tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await;
726
727 let relay_urls: Vec<_> = self.relay_sync_index.read().await
728 .keys()
729 .cloned()
730 .collect();
731
732 for relay_url in relay_urls {
733 self.daily_sync(&relay_url).await;
734 }
735 }
736 }
433 737
434 loop { 738 /// Perform daily fresh sync for a relay
435 interval.tick().await; 739 async fn daily_sync(&mut self, relay_url: &str) {
740 tracing::info!("Daily sync triggered for {}", relay_url);
436 741
437 for (i, relay) in self.healthy_relays().enumerate() { 742 // Close all subscriptions
438 // Stagger: 5 minute delay between relays 743 if let Some(conn) = self.connections.get(relay_url) {
439 tokio::time::sleep(Duration::from_secs(i as u64 * 300)).await; 744 conn.client.unsubscribe_all().await;
745 }
440 746
441 // Batch filters to max 100 tagged events each 747 // Clear PendingSyncIndex
442 let batches = self.build_batched_filters(&relay, 100); 748 self.pending_sync_index.write().await.remove(relay_url);
443 749
444 for batch in batches { 750 // Clear confirmed state - triggers fresh sync
445 self.run_negentropy(&relay, batch).await; 751 {
446 tokio::time::sleep(Duration::from_secs(60)).await; // 1 min between batches 752 let mut index = self.relay_sync_index.write().await;
753 if let Some(state) = index.get_mut(relay_url) {
754 state.clear_sync_state();
447 } 755 }
448 } 756 }
757
758 // Recompute actions - will generate AddFilters for everything
759 self.recompute_actions_for_relay(relay_url).await;
449 } 760 }
450} 761}
451``` 762```
452 763
453## Event Processing 764### 8. Consolidation (Threshold-Based, Triggered on Add)
454 765
455### Acceptance Policy 766Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active.
456 767
457All synced events go through our acceptance policy, reusing the same [`Nip34WritePolicy`](../../src/nostr/builder.rs:36) validation logic used for direct client submissions. 768```rust
769impl SyncManager {
770 /// Check filter count and consolidate if needed.
771 /// Called from handle_add_filters BEFORE adding new filters.
772 async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) {
773 let current_count = self.get_filter_count(relay_url).await;
458 774
459#### Design: Reusing admit_event() 775 if current_count + new_filter_count > 70 {
776 self.consolidate(relay_url).await;
777 }
778 }
460 779
461The [`WritePolicy::admit_event()`](../../src/nostr/builder.rs:256-269) trait method takes a `SocketAddr` parameter designed for client connections: 780 /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active.
781 /// Does NOT clear state - just reduces filter count.
782 async fn consolidate(&mut self, relay_url: &str) {
783 tracing::info!("Consolidating filters for {} (count > 70)", relay_url);
462 784
463```rust 785 // Wait for all pending batches to complete first
464// From nostr-relay-builder WritePolicy trait 786 self.wait_pending_complete(relay_url).await;
465fn admit_event<'a>( 787
466 &'a self, 788 // Close Layer 2+3 subscriptions only - Layer 1 remains active
467 event: &'a Event, 789 // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately
468 _addr: &'a SocketAddr, // Unused in our implementation 790 // For simplicity, we close all and re-add Layer 1
469) -> BoxedFuture<'a, PolicyResult>; 791 if let Some(conn) = self.connections.get(relay_url) {
792 conn.client.unsubscribe_all().await;
793 }
794
795 // Re-subscribe Layer 1 with since (maintains announcements stream)
796 let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900));
797 let conn = self.connections.get(relay_url).unwrap();
798 let layer1 = build_announcement_filter(Some(since));
799 let _ = conn.client.subscribe(layer1, None).await;
800
801 // Rebuild Layer 2+3 only
802 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
803 }
804}
470``` 805```
471 806
472For synced events from remote relays, we pass a **synthetic localhost address** since: 807**Updated handle_add_filters to check consolidation:**
4731. The `_addr` parameter is currently unused in our [`Nip34WritePolicy`](../../src/nostr/builder.rs:259)
4742. All meaningful validation is done by the modular sub-policies (see below)
4753. This allows reusing 100% of the existing validation logic
476 808
477```rust 809```rust
478use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 810impl SyncManager {
479 811 async fn handle_add_filters(&mut self, action: AddFilters) {
480/// Synthetic address for synced events (not from a direct client connection) 812 let AddFilters { relay_url, repos, root_events, filters } = action;
481const SYNC_SOURCE_ADDR: SocketAddr = SocketAddr::new( 813
482 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 814 // Auto-spawn connection if needed (unchanged)
483 0 815 let state = self.relay_sync_index.read().await.get(&relay_url).cloned();
484); 816 match state {
485 817 None => {
486async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> { 818 // New relay discovered - create entry and spawn connection
487 // Apply our Nip34WritePolicy using synthetic address 819 self.relay_sync_index.write().await.insert(
488 // The SocketAddr is unused - all validation is by the modular sub-policies 820 relay_url.clone(),
489 let result = self.acceptance_policy 821 RelayState {
490 .admit_event(&event, &SYNC_SOURCE_ADDR) 822 repos: HashSet::new(),
491 .await; 823 root_events: HashSet::new(),
492 824 is_bootstrap: false,
493 match result { 825 connection_status: ConnectionStatus::Connecting,
494 PolicyResult::Accept => { 826 last_connected: None,
495 self.database.save_event(&event).await?; 827 disconnected_at: None,
496 tracing::debug!( 828 connection: None,
497 "Accepted synced event {} from {}", 829 }
498 event.id.to_hex(), 830 );
499 source_relay 831 self.spawn_connection(&relay_url).await;
500 ); 832 return; // Subscriptions will happen on connection success
501 self.trigger_subscription_updates(&event).await; 833 }
834 Some(state) if state.connection_status != ConnectionStatus::Connected => {
835 return; // Not connected - subscriptions will happen on connection success
836 }
837 Some(_) => {
838 // Already connected - proceed
839 }
502 } 840 }
503 PolicyResult::Reject(reason) => { 841
504 tracing::debug!( 842 // CHECK CONSOLIDATION BEFORE ADDING
505 "Rejected synced event {} from {}: {}", 843 self.maybe_consolidate(&relay_url, filters.len()).await;
506 event.id.to_hex(), 844
507 source_relay, 845 // Subscribe and collect subscription IDs
508 reason 846 let conn = self.connections.get(&relay_url).unwrap();
509 ); 847 let mut sub_ids = HashSet::new();
848
849 for filter in filters {
850 match conn.client.subscribe(filter, None).await {
851 Ok(output) => {
852 for sub_id in output.val {
853 sub_ids.insert(sub_id);
854 }
855 }
856 Err(e) => {
857 tracing::warn!("Failed to subscribe: {}", e);
858 }
859 }
510 } 860 }
511 }
512 861
513 Ok(()) 862 // Create pending batch (unchanged)
863 let batch = PendingBatch {
864 batch_id: self.next_batch_id(),
865 items: PendingItems { repos, root_events },
866 outstanding_subs: sub_ids,
867 };
868
869 self.pending_sync_index.write().await
870 .entry(relay_url)
871 .or_default()
872 .push(batch);
873 }
514} 874}
515``` 875```
516 876
517#### Modular Sub-Policies 877---
518
519The [`Nip34WritePolicy`](../../src/nostr/builder.rs:36-42) delegates to specialized sub-policies in [`src/nostr/policy/`](../../src/nostr/policy/mod.rs:1-41):
520 878
521| Sub-Policy | Kinds | Responsibility | 879## Disconnect (Relay Removal) Handling
522|------------|-------|----------------|
523| [`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24-27) | 30617 | Validates service listing, maintainer exception, creates bare repos |
524| [`StatePolicy`](../../src/nostr/policy/state.rs:43-46) | 30618 | Validates state structure, aligns git refs with authorized state |
525| [`PrEventPolicy`](../../src/nostr/policy/pr_event.rs) | 1618, 1619 | Validates PR/PR Update events, manages refs/nostr/* |
526| [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25-29) | All others | Checks forward/backward references to accepted repos/events |
527 880
528All sub-policies share a common [`PolicyContext`](../../src/nostr/policy/mod.rs:22-27) containing: 881```rust
529- `domain`: Our service domain for validation 882impl SyncManager {
530- `database`: For querying existing events 883 /// Periodically check for relays that should be disconnected
531- `git_data_path`: For git operations 884 async fn check_disconnects(&mut self) {
885 let confirmed = self.relay_sync_index.read().await;
886 let relays_to_disconnect: Vec<_> = confirmed.iter()
887 .filter(|(_, state)| {
888 !state.is_bootstrap &&
889 state.repos.is_empty() &&
890 state.root_events.is_empty()
891 })
892 .map(|(url, _)| url.clone())
893 .collect();
894 drop(confirmed);
895
896 for relay_url in relays_to_disconnect {
897 self.disconnect_relay(&relay_url).await;
898 }
899 }
532 900
533#### Why Not Call Sub-Policies Directly? 901 async fn disconnect_relay(&mut self, relay_url: &str) {
902 tracing::info!("Disconnecting relay {} (no repos)", relay_url);
534 903
535While we could bypass `admit_event()` and call sub-policies directly: 904 self.relay_sync_index.write().await.remove(relay_url);
905 self.pending_sync_index.write().await.remove(relay_url);
536 906
537```rust 907 if let Some(conn) = self.connections.remove(relay_url) {
538// Alternative: Direct sub-policy calls (NOT recommended) 908 let _ = conn.client.disconnect().await;
539match event.kind.as_u16() { 909 }
540 30617 => self.announcement_policy.validate(&event).await, 910 }
541 30618 => self.state_policy.validate(&event),
542 1618 | 1619 => self.pr_event_policy.validate_nostr_ref(&event).await,
543 _ => self.related_event_policy.check_references(&event).await,
544} 911}
545``` 912```
546 913
547This is **not recommended** because: 914---
5481. Duplicates the kind-routing logic from [`admit_event()`](../../src/nostr/builder.rs:261-268)
5492. Misses important post-validation steps (e.g., `handle_announcement()` also calls `ensure_bare_repository()`)
5503. Creates maintenance burden when policy logic changes
551 915
552## Module Structure 916## State Flow Summary
553 917
554### New `src/sync/` Module 918```mermaid
919flowchart TB
920 subgraph Input
921 SS[SelfSubscriber]
922 OWN[Own Relay]
923 end
555 924
556``` 925 subgraph RepoSyncIndex - What We Want
557src/ 926 RSI[HashMap: Repo to Relays+Events]
558├── sync/ 927 end
559│ ├── mod.rs # Module exports
560│ ├── manager.rs # SyncManager - main coordinator
561│ ├── connection.rs # Per-relay connection handling
562│ ├── filter.rs # Filter building and batching
563│ ├── health.rs # RelayHealth tracking
564│ ├── negentropy.rs # NIP-77 reconciliation logic
565│ └── subscription.rs # Dynamic subscription management
566├── nostr/
567│ └── ... (existing)
568└── ...
569```
570 928
571### Integration with Main Binary 929 subgraph Derived Target
930 DT[derive_relay_targets fn]
931 TGT[Per-relay: repos + events we should sync]
932 end
572 933
573```rust 934 subgraph compute_actions - Decision Point
574// In main.rs 935 CA[Three-way diff: target - pending - confirmed]
575async fn main() -> Result<()> { 936 end
576 // ... existing setup ...
577 937
578 // Start sync manager as background task 938 subgraph PendingSyncIndex - In Flight
579 let sync_manager = SyncManager::new( 939 PSI[Vec PendingBatch per relay]
580 database.clone(), 940 end
581 config.domain.clone(),
582 );
583 941
584 tokio::spawn(async move { 942 subgraph RelaySyncIndex - Confirmed State
585 sync_manager.run().await 943 RLI[RelayState per relay]
586 }); 944 CONN[connection_status]
945 REPOS[repos + root_events]
946 TIMES[last_connected + disconnected_at]
947 end
587 948
588 // ... rest of server startup ... 949 SS -->|subscribe| OWN
589} 950 OWN -->|events| SS
951 SS -->|batch fires| RSI
952 RSI --> DT
953 DT --> TGT
954 TGT --> CA
955 PSI --> CA
956 RLI --> CA
957 CA -->|Layer 2+3 new items| AF[AddFilters]
958 AF -->|check filter count| CONSOL{count + new > 70?}
959 CONSOL -->|yes| CONSOLIDATE[consolidate]
960 CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since]
961 L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since]
962 CONSOL -->|no| SUB[subscribe]
963 AF -->|spawn if needed| CONN
964 SUB --> PSI
965 PSI -->|EOSE| REPOS
966
967 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
968 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
969
970 subgraph handle_connect_or_reconnect
971 HC --> FRESH_CHECK{is_fresh_sync?}
972 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
973 FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since]
974 L1_FRESH --> RCA1[recompute_actions_for_relay]
975 L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since]
976 L23_QUICK --> RCA2[recompute_actions_for_relay]
977 end
590``` 978```
591 979
592## Metrics & Observability 980---
981
982## Key Design Decisions
983
984| Decision | Choice | Rationale |
985| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- |
986| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect |
987| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect |
988| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation |
989| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 |
990| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up |
991| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events |
992| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up |
993| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up |
994| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation |
995| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters |
996| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream |
997| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect |
998| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
999| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
1000| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
1001| Connection spawning | Via AddFilters handler | Single path for new relay discovery |
1002| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates |
593 1003
594All sync metrics are exposed via Prometheus at `/metrics`. For <100 relays, per-relay labels are acceptable cardinality. 1004---
595 1005
596### Prometheus Metrics 1006## Module Structure
597 1007
598```rust 1008```
599/// Sync module metrics registered with the global Prometheus registry 1009src/sync/
600pub struct SyncMetrics { 1010├── mod.rs # SyncManager, main loop
601 // === Connection Metrics (per relay) === 1011├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types
602 /// Active outbound connections: ngit_sync_relay_connected{relay="wss://..."} 1012├── actions.rs # AddFilters struct, compute_actions, build_filters
603 relay_connected: IntGaugeVec, // labels: [relay] 1013├── self_subscriber.rs # SelfSubscriber, batching logic
1014├── relay_connection.rs # Per-relay WebSocket connection
1015├── consolidation.rs # Consolidation logic, daily timer
1016├── health.rs # Health tracking (reuse from v2)
1017└── metrics.rs # Prometheus metrics (reuse from v2)
1018```
604 1019
605 /// Connection attempts: ngit_sync_connection_attempts_total{relay="wss://...", result="success|failure"} 1020---
606 connection_attempts: CounterVec, // labels: [relay, result]
607 1021
608 // === Relay Health Status === 1022## Comparison: v3 vs v4
609 /// Current status: ngit_sync_relay_status{relay="wss://...", status="healthy|backoff|dead"}
610 relay_status: IntGaugeVec, // labels: [relay, status]
611 1023
612 /// Consecutive failures: ngit_sync_relay_failures{relay="wss://..."} 1024| Aspect | v3 | v4 |
613 relay_failures: IntGaugeVec, // labels: [relay] 1025| ------------------------ | ----------------------------------------- | --------------------------------------------- |
1026| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect |
1027| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included |
1028| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 |
1029| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method |
1030| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters |
1031| Since filter application | Applied in handle_reconnect | build_all_filters with optional since |
1032| PSI clearing | On disconnect | On disconnect (confirmed) |
1033| Daily timer | Consolidation-style | Fresh sync (different from consolidation) |
614 1034
615 // === Event Source Tracking === 1035---
616 /// Events received by source: ngit_sync_events_total{source="direct|live_sync|catchup|daily_catchup"}
617 events_total: CounterVec, // labels: [source]
618 1036
619 /// Sync gap events (should have been live synced): ngit_sync_gap_events_total{relay="wss://..."} 1037## Self-Subscriber Flow
620 sync_gap_events: CounterVec, // labels: [relay]
621 1038
622 // === Aggregate Metrics === 1039The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions.
623 /// Total relays being tracked
624 relays_tracked_total: IntGauge,
625 1040
626 /// Relays currently connected 1041### State Tracking
627 relays_connected_total: IntGauge,
628 1042
629 /// Relays in dead state 1043```rust
630 relays_dead_total: IntGauge, 1044pub struct SelfSubscriber {
1045 own_relay_url: String,
1046 relay_domain: String,
1047 repo_sync_index: RepoSyncIndex,
1048 pending_sync_index: PendingSyncIndex,
1049 relay_sync_index: RelaySyncIndex,
1050 action_tx: mpsc::Sender<AddFilters>,
1051 /// Timestamp of last successful connection - used for since filter on reconnection
1052 last_connected: Option<Timestamp>,
1053 /// The active client connection
1054 client: Option<Client>,
631} 1055}
632``` 1056```
633 1057
634### Metric Definitions 1058### On Startup / Reconnect (Unified)
635
636| Metric | Type | Labels | Description |
637| ------------------------------------- | ------- | ------------- | ------------------------------------------------------ |
638| `ngit_sync_relay_connected` | Gauge | relay | 1 if connected, 0 if not |
639| `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes |
640| `ngit_sync_relay_status` | Gauge | relay, status | 1 for current status, 0 otherwise |
641| `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count |
642| `ngit_sync_events_total` | Counter | source | Events received by source type |
643| `ngit_sync_gap_events_total` | Counter | relay | Events found during catchup that should have been live |
644| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered from announcements |
645| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count |
646| `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead |
647 1059
648**Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time. The `ngit_sync_gap_events_total` metric tracks this per relay. 1060Both initial startup and reconnection use the same `connect_and_subscribe` method:
649
650### Observability Integration
651 1061
652```rust 1062```rust
653impl SyncManager { 1063impl SelfSubscriber {
654 fn record_event_received(&self, event: &Event, source: EventSource) { 1064 async fn run(mut self) {
655 match source { 1065 loop {
656 EventSource::DirectSubmission => { 1066 // Connect or reconnect
657 self.metrics.events_total.with_label_values(&["direct"]).inc(); 1067 if let Err(e) = self.connect_and_subscribe().await {
1068 tracing::warn!("Connection failed: {}, will retry", e);
1069 tokio::time::sleep(Duration::from_secs(5)).await;
1070 continue;
658 } 1071 }
659 EventSource::LiveSync(relay) => {
660 self.metrics.events_total.with_label_values(&["live_sync"]).inc();
661 }
662 EventSource::Catchup(relay) => {
663 // This is a sync gap - we should have gotten it via live sync
664 self.metrics.events_total.with_label_values(&["catchup"]).inc();
665 self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc();
666 tracing::warn!(
667 relay = %relay,
668 event_id = %event.id.to_hex(),
669 "Sync gap detected: event found during catchup"
670 );
671 }
672 EventSource::DailyCatchup(relay) => {
673 // Sustained sync gap - missed by both live sync and initial catchup
674 self.metrics.events_total.with_label_values(&["daily_catchup"]).inc();
675 self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc();
676 tracing::error!(
677 relay = %relay,
678 event_id = %event.id.to_hex(),
679 "Sustained sync gap: event found during daily catchup"
680 );
681 }
682 }
683 }
684 1072
685 fn record_connection_attempt(&self, relay: &RelayUrl, success: bool) { 1073 // Run event loop until disconnection
686 let result = if success { "success" } else { "failure" }; 1074 self.event_loop().await;
687 self.metrics.connection_attempts
688 .with_label_values(&[relay.as_str(), result])
689 .inc();
690 }
691 1075
692 fn update_relay_status(&self, relay: &RelayUrl, status: &RelayStatus) { 1076 // Loop will retry connection
693 // Reset all status labels for this relay
694 for s in ["healthy", "backoff", "dead"] {
695 self.metrics.relay_status
696 .with_label_values(&[relay.as_str(), s])
697 .set(0);
698 } 1077 }
699 // Set current status
700 let status_label = match status {
701 RelayStatus::Healthy => "healthy",
702 RelayStatus::Backoff { .. } => "backoff",
703 RelayStatus::Dead => "dead",
704 };
705 self.metrics.relay_status
706 .with_label_values(&[relay.as_str(), status_label])
707 .set(1);
708 } 1078 }
709}
710```
711
712### Example Grafana Queries
713 1079
714```promql 1080 async fn connect_and_subscribe(&mut self) -> Result<(), Error> {
715# Relay health overview - count by status 1081 let client = Client::new(Keys::generate());
716sum by (status) (ngit_sync_relay_status == 1) 1082 client.add_relay(&self.own_relay_url).await?;
717 1083 client.connect().await;
718# Connection success rate over last hour 1084
719sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) 1085 // Build filter - add since only on reconnect
720/ sum(rate(ngit_sync_connection_attempts_total[1h])) 1086 let filter = Filter::new().kinds([
721 1087 Kind::Custom(30617), // Repository announcements
722# Sync gap detection - events that should have been live synced 1088 Kind::GitPatch, // 1617
723sum(rate(ngit_sync_gap_events_total[1h])) by (relay) 1089 Kind::Custom(1618), // PRs
724 1090 Kind::Custom(1619), // PR updates
725# Live sync effectiveness (lower is better - fewer gaps) 1091 Kind::GitIssue, // 1621
726sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) 1092 ]);
727/ sum(rate(ngit_sync_events_total[1h])) 1093
1094 let filter = if let Some(ts) = self.last_connected {
1095 // Reconnection: use since filter
1096 let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer
1097 filter.since(since)
1098 } else {
1099 // Initial connect: no since filter - get full history
1100 filter
1101 };
728 1102
729# Relays with high failure counts (potential issues) 1103 // Update last_connected AFTER computing since
730topk(10, ngit_sync_relay_failures) 1104 self.last_connected = Some(Timestamp::now());
731 1105
732# Alert: relay stuck in dead state 1106 client.subscribe(filter, None).await?;
733ngit_sync_relay_status{status="dead"} == 1 1107 self.client = Some(client);
1108 Ok(())
1109 }
1110}
734``` 1111```
735 1112
736### Log Levels for Sync Events 1113### Event Loop with Batching
737 1114
738| Event | Level | Context | 1115```rust
739| ----------------------- | ----- | ----------------------------- | 1116impl SelfSubscriber {
740| Event via live sync | DEBUG | Normal operation | 1117 async fn event_loop(&mut self) {
741| Event via catchup | WARN | Sync gap detected | 1118 let client = self.client.as_ref().unwrap();
742| Event via daily catchup | ERROR | Sustained gap | 1119 let mut pending_events: Vec<Event> = Vec::new();
743| Connection established | INFO | Relay URL | 1120 let mut batch_timer: Option<Instant> = None;
744| Connection failed | WARN | Relay URL, attempt #, backoff | 1121 let batch_window = Duration::from_secs(5);
745| Relay marked dead | ERROR | Relay URL, failure duration | 1122
746| Peer missing events | WARN | Relay URL, repo, count | 1123 loop {
1124 let timeout = batch_timer
1125 .map(|t| batch_window.saturating_sub(t.elapsed()))
1126 .unwrap_or(Duration::from_secs(60));
1127
1128 tokio::select! {
1129 notification = client.notifications().recv() => {
1130 match notification {
1131 Ok(RelayPoolNotification::Event { event, .. }) => {
1132 pending_events.push(*event);
1133
1134 // Start timer on first event - does NOT reset
1135 if batch_timer.is_none() {
1136 batch_timer = Some(Instant::now());
1137 }
1138 }
1139 Ok(RelayPoolNotification::Shutdown) => {
1140 // Connection lost
1141 break;
1142 }
1143 _ => {}
1144 }
1145 }
1146 _ = tokio::time::sleep(timeout), if batch_timer.is_some() => {
1147 // Batch window elapsed
1148 self.process_batch(pending_events.drain(..).collect()).await;
1149 batch_timer = None;
1150 }
1151 }
1152 }
1153 }
747 1154
748## Configuration 1155 async fn process_batch(&self, events: Vec<Event>) {
1156 // 1. Update RepoSyncIndex
1157 for event in events {
1158 match event.kind.as_u16() {
1159 30617 => self.handle_announcement(&event).await,
1160 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await,
1161 _ => {}
1162 }
1163 }
749 1164
750```rust 1165 // 2. Derive targets and compute actions
751pub struct SyncConfig { 1166 let repo_index = self.repo_sync_index.read().await;
752 /// Warm-up delay before starting initial catchup 1167 let targets = derive_relay_targets(&repo_index);
753 pub startup_delay: Duration, // Default: 30s
754 1168
755 /// Delay between filter batches during catchup 1169 let pending = self.pending_sync_index.read().await;
756 pub batch_delay: Duration, // Default: 60s 1170 let confirmed = self.relay_sync_index.read().await;
757 1171
758 /// Delay after reconnect before catchup 1172 let actions = compute_actions(&targets, &pending, &confirmed);
759 pub reconnect_delay: Duration, // Default: 10s
760 1173
761 /// Maximum events in last N days for reconnect catchup 1174 drop(repo_index);
762 pub reconnect_lookback_days: u32, // Default: 3 1175 drop(pending);
1176 drop(confirmed);
763 1177
764 /// Maximum tagged event IDs per filter 1178 // 3. Send actions to SyncManager
765 pub max_tags_per_filter: usize, // Default: 100 1179 for action in actions {
1180 let _ = self.action_tx.send(action).await;
1181 }
1182 }
766 1183
767 /// Consolidate subscriptions when count exceeds 1184 async fn handle_announcement(&self, event: &Event) {
768 pub max_subscriptions: usize, // Default: 150 1185 // Extract repo_ref from event - 30617:pubkey:identifier
1186 let d_tag = event.tags.iter()
1187 .find_map(|tag| {
1188 if tag.kind() == TagKind::D {
1189 tag.content().map(|s| s.to_string())
1190 } else {
1191 None
1192 }
1193 })
1194 .unwrap_or_default();
1195
1196 let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag);
1197
1198 // Extract relay URLs from 'r' tags
1199 let relays: HashSet<String> = event.tags.iter()
1200 .filter_map(|tag| {
1201 if tag.kind() == TagKind::Relay {
1202 tag.content().map(|s| s.to_string())
1203 } else {
1204 None
1205 }
1206 })
1207 .collect();
1208
1209 // Update RepoSyncIndex
1210 let mut index = self.repo_sync_index.write().await;
1211 let needs = index.entry(repo_ref).or_default();
1212 needs.relays = relays;
1213 }
769 1214
770 /// Backoff configuration 1215 async fn handle_root_event(&self, event: &Event) {
771 pub max_backoff: Duration, // Default: 1h 1216 // Extract repo_ref from 'a' tag
772 pub dead_threshold: Duration, // Default: 24h 1217 let repo_ref = event.tags.iter()
773 pub dead_retry_interval: Duration, // Default: 24h 1218 .find_map(|tag| {
1219 if tag.kind() == TagKind::A {
1220 tag.content().map(|s| s.to_string())
1221 } else {
1222 None
1223 }
1224 });
1225
1226 if let Some(repo_ref) = repo_ref {
1227 let mut index = self.repo_sync_index.write().await;
1228 let needs = index.entry(repo_ref).or_default();
1229 needs.root_events.insert(event.id);
1230 }
1231 }
774} 1232}
775``` 1233```
776 1234
777## Summary
778
779| Component | Responsibility |
780| ---------------------- | -------------------------------------------------------------- |
781| **SyncManager** | Orchestrates connections, triggers catchup, processes events |
782| **FilterService** | Builds unified filters from database state |
783| **RelayHealthTracker** | Manages backoff, dead relay detection (in-memory + Prometheus) |
784| **ConnectionState** | Per-relay WebSocket + subscription management |
785| **SyncMetrics** | Prometheus metrics for operator visibility |
786
787### Key Design Decisions
788
7891. **Unified filters** for live sync and negentropy - same criteria, different delivery mechanism
7902. **Exclude ourselves** from relay list to prevent loops
7913. **One connection per relay** with combined filters for efficiency
7924. **In-memory health state** with Prometheus metrics for visibility (no database persistence needed for <100 relays)
7935. **Graceful degradation on restart** - conservative initial backoff with jitter avoids thundering herd
7946. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up
7957. **Client-side filtering** for 30617/30618, server-side for Layer 2/3
7968. **Dynamic subscription addition** with periodic consolidation
7979. **Custom acceptance policy** excluding rate limiting defaults
79810. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps, tracked in Prometheus
799
800--- 1235---
801 1236
802## Implementation Notes (Phase 6) 1237## Implementation Notes
803
804This section documents the final implementation as of Phase 6 (Observability & Production Readiness).
805 1238
806### What Was Actually Built 1239This section documents the actual implementation details as of December 2024 (Phases 1-10 complete).
807 1240
808The implementation closely follows the design document with the following completed components: 1241### Architectural Decisions During Implementation
809 1242
810#### Phase 1: Basic Sync (commit b167f1b) 1243**Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc<Mutex<SyncManager>>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup:
811- [`SyncManager`](../../src/sync/manager.rs) - Main coordinator for proactive sync
812- Bootstrap relay sync via `NGIT_SYNC_BOOTSTRAP_RELAY_URL` configuration
813- Dynamic relay discovery from repository announcements that list our service
814- Event validation through existing [`Nip34WritePolicy`](../../src/nostr/builder.rs)
815 1244
816#### Phase 2: Three-Layer Filters (commit bf558b0) 1245```rust
817- [`FilterService`](../../src/sync/filter.rs) - Builds three-layer filter strategy 1246// 7. Wrap self in Arc<Mutex> for sharing with timer task
818- Layer 1: All kind 30617+30618 (announcements) 1247let sync_manager = Arc::new(Mutex::new(self));
819- Layer 2: A/a tag filters for repository events 1248```
820- Layer 3: E/e tag filters for related events (PRs, Issues)
821- Multi-relay discovery from stored announcements
822
823#### Phase 3: Health Tracking (commit f639ecf)
824- [`RelayHealthTracker`](../../src/sync/health.rs) - DashMap-based health tracking
825- Three states: Healthy → Degraded → Dead
826- Exponential backoff: 5s → 10s → 20s → ... → max (default 1h)
827- Dead relay detection after 24h continuous failures
828- Startup jitter (0-10s) to prevent thundering herd
829 1249
830#### Phase 4: Dynamic Subscriptions (commit a19ff57) 1250This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber.
831- [`SubscriptionManager`](../../src/sync/subscription.rs) - Per-connection subscription tracking
832- Dynamic Layer 2 subscriptions when new announcements arrive
833- Dynamic Layer 3 subscriptions when new PRs/Issues arrive
834- Filter consolidation at threshold (150 filters)
835 1251
836#### Phase 5: Catchup & Gap Detection (commit 950c2e4) 1252**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses:
837- [`NegentropyService`](../../src/sync/negentropy.rs) - Gap-filling catchup operations
838- Startup catchup (configurable delay)
839- Reconnection catchup (limited lookback)
840- Daily catchup (not yet implemented - placeholder)
841 1253
842#### Phase 6: Observability (this phase) 1254- `DashMap` for thread-safe concurrent access without external locking
843- [`SyncMetrics`](../../src/sync/metrics.rs) - Full Prometheus integration 1255- Three states: `Healthy`, `Degraded`, `Dead`
844- Grafana dashboard panels for sync monitoring 1256- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff
845- Documentation updates 1257- Dead threshold: 24 hours of continuous failures
1258- Dead relay retry: Once per 24 hours
846 1259
847### Differences from Original Design 1260### Implementation Constants
848 1261
8491. **Negentropy (NIP-77)**: Simplified gap-filling was used instead of full NIP-77 negentropy reconciliation, as nostr-sdk 0.44 lacks built-in negentropy support. The current implementation uses timestamp-based catchup queries. 1262| Constant | Value | Purpose |
1263| --------------------------------- | ---------- | ------------------------------------------------ |
1264| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation |
1265| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation |
1266| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync |
1267| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect |
1268| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead |
1269| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff |
850 1270
8512. **Filter Consolidation Threshold**: Set at 150 filters (as designed) based on typical relay filter limits. 1271### Daily Timer Randomization
852 1272
8533. **Health Tracking**: Implemented exactly as designed - in-memory only (not persisted to database), which is acceptable for production as health state rebuilds quickly on restart. 1273The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running:
854 1274
8554. **Metric Label Strategy**: Used simpler numeric encoding for health status (1=healthy, 2=degraded, 3=dead) instead of multiple label values per relay, reducing cardinality. 1275```rust
1276let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0;
1277```
856 1278
8575. **Event Source Tracking**: Implemented four source types (`live`, `startup`, `reconnect`, `daily`) instead of the original (`direct`, `live_sync`, `catchup`, `daily_catchup`). 1279### Bootstrap Relay Protection
858 1280
859### Three-Layer Filter Strategy (As Implemented) 1281Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out:
860 1282
1283```rust
1284.filter(|(_, state)| {
1285 !state.is_bootstrap &&
1286 state.repos.is_empty() &&
1287 state.root_events.is_empty()
1288})
861``` 1289```
862Layer 1: Discovery Layer
863├── Query: kinds [30617, 30618] (announcements)
864├── Applied: At startup and during sync
865└── Purpose: Discover all repositories across network
866
867Layer 2: Repository Events
868├── Query: Events with A/a tags pointing to tracked repos
869├── Format: A tag = "30617:<pubkey>:<identifier>"
870├── Triggered: When new announcement is accepted
871└── Purpose: Get PRs, issues, patches for repositories
872
873Layer 3: Related Events
874├── Query: Events with E/e tags pointing to tracked PRs/Issues
875├── Triggered: When new PR/Issue is accepted
876└── Purpose: Get comments, reviews, status updates
877```
878
879### Prometheus Metrics (As Implemented)
880 1290
881| Metric | Type | Labels | Description | 1291### Graceful Shutdown
882|--------|------|--------|-------------|
883| `ngit_sync_relay_connected` | Gauge | relay | Connection status (1/0) |
884| `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome |
885| `ngit_sync_relay_status` | Gauge | relay | Health state (1/2/3) |
886| `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures |
887| `ngit_sync_events_total` | Counter | source | Events by source type |
888| `ngit_sync_gap_events_total` | Counter | relay | Gap events filled |
889| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered |
890| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected |
891| `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count |
892 1292
893### Configuration Options (As Implemented) 1293Shutdown uses a tokio broadcast channel for coordinated termination:
894 1294
895All configuration via environment variables or CLI flags: 1295```rust
1296let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
1297```
896 1298
897| Option | Type | Default | Description | 1299Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop.
898|--------|------|---------|-------------|
899| `NGIT_SYNC_BOOTSTRAP_RELAY_URL` | String | None | Bootstrap relay URL for initial sync |
900| `NGIT_SYNC_MAX_BACKOFF_SECS` | u64 | 3600 | Max backoff delay (seconds) |
901| `NGIT_SYNC_STARTUP_DELAY_SECS` | u64 | 30 | Catchup delay after startup |
902| `NGIT_SYNC_RECONNECT_DELAY_SECS` | u64 | 10 | Catchup delay after reconnect |
903| `NGIT_SYNC_RECONNECT_LOOKBACK_DAYS` | u64 | 3 | Days to look back on reconnect |
904 1300
905**Note:** Additional relays are automatically discovered from repository announcements (kind 30617) that list our service domain. The bootstrap relay provides an initial sync source but is not required - sync will discover relays from stored announcements. 1301### Actual Module Structure
906 1302
907### Module Structure (As Implemented) 1303The implemented module structure differs from the original spec:
908 1304
909``` 1305```
910src/sync/ 1306src/sync/
911├── mod.rs # Module exports, constants 1307├── mod.rs # SyncManager, main loop, index types, metrics
912├── manager.rs # SyncManager - orchestrates sync 1308├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters
913├── connection.rs # SyncConnection - per-relay WebSocket 1309├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters
914├── filter.rs # FilterService - three-layer filters 1310├── health.rs # RelayHealthTracker, HealthState, exponential backoff
915├── health.rs # RelayHealthTracker - health states 1311├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling
916├── metrics.rs # SyncMetrics - Prometheus integration 1312└── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic
917├── negentropy.rs # NegentropyService - gap-filling
918└── subscription.rs # SubscriptionManager - dynamic subs
919``` 1313```
920 1314
921### Production Readiness Checklist 1315Key differences from spec:
1316
1317- No separate `state.rs` - types are defined in `mod.rs`
1318- No separate `actions.rs` - moved to `algorithms.rs`
1319- No separate `consolidation.rs` - consolidation logic in `mod.rs`
1320- No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs`
1321
1322### Deviations from Original v4 Spec
1323
13241. **RelayState lacks `connection` field**: The spec showed `connection: Option<RelayConnection>` in `RelayState`, but the implementation stores connections in a separate `HashMap<String, RelayConnection>` in `SyncManager`.
1325
13262. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct.
1327
13283. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches.
922 1329
923- [x] All metrics exposed at `/metrics` endpoint 13304. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API.
924- [x] Health state tracking with configurable backoff
925- [x] Dead relay detection and minimal retry
926- [x] Startup jitter to prevent thundering herd
927- [x] Grafana dashboard with sync panels
928- [x] Configuration documented
929- [x] Integration tests passing