upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync.md')
-rw-r--r--docs/explanation/grasp-02-proactive-sync.md565
1 files changed, 565 insertions, 0 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md
new file mode 100644
index 0000000..250aece
--- /dev/null
+++ b/docs/explanation/grasp-02-proactive-sync.md
@@ -0,0 +1,565 @@
1# GRASP-02: Proactive Sync - Design Document
2
3## Overview
4
5GRASP-02 Proactive Sync enables ngit-grasp to maintain live WebSocket connections to other relays listed in repository announcement events, synchronizing NIP-34 related events using both **live sync** (real-time subscriptions) and **negentropy catchup** (NIP-77 set reconciliation).
6
7This document covers **event syncing only**. Git data syncing is out of scope for this phase.
8
9## Goals
10
111. **Data Availability**: Ensure we have all relevant events for repositories we host
122. **Resilience**: Handle relay failures gracefully with backoff and health tracking
133. **Efficiency**: Minimize connections and bandwidth through filter consolidation
144. **Consistency**: Use unified filters for both live sync and negentropy catchup
15
16## Architecture Overview
17
18```mermaid
19flowchart TB
20 subgraph ngit-grasp
21 SM[SyncManager]
22 FS[FilterService]
23 RH[RelayHealthTracker]
24 DB[(Database)]
25 AP[AcceptancePolicy]
26 end
27
28 subgraph External Relays
29 R1[relay.example.com]
30 R2[other-grasp.io]
31 R3[nostr.land]
32 end
33
34 SM -->|builds filters| FS
35 SM -->|tracks health| RH
36 SM -->|stores events| DB
37 SM -->|validates| AP
38
39 SM <-->|WebSocket + NEG| R1
40 SM <-->|WebSocket + NEG| R2
41 SM <-->|WebSocket + NEG| R3
42
43 RH -->|persists state| DB
44```
45
46## Connection Management
47
48### Relay Discovery
49
50Relays to connect to are discovered from **all stored repository announcements**:
51
52```rust
53// Pseudocode for relay discovery
54fn discover_relays(database: &Database) -> HashSet<RelayUrl> {
55 let announcements = database.query(Filter::new().kind(30617));
56 let mut relays = HashSet::new();
57
58 for announcement in announcements {
59 for relay_url in announcement.relays_tags() {
60 if relay_url != our_domain { // Exclude ourselves
61 relays.insert(relay_url);
62 }
63 }
64 }
65 relays
66}
67```
68
69### Connection Lifecycle
70
71```mermaid
72stateDiagram-v2
73 [*] --> Connecting: startup/new relay
74 Connecting --> Connected: success
75 Connecting --> Backoff: failure
76 Connected --> Disconnected: connection lost
77 Disconnected --> Backoff: reconnect failed
78 Backoff --> Connecting: backoff timer expires
79 Backoff --> Dead: 24h continuous failures
80 Dead --> Connecting: daily retry timer
81 Connected --> Updating: filter change
82 Updating --> Connected: complete
83```
84
85### Health Tracking & Backoff
86
87| State | Behavior |
88| ----------- | --------------------------------------------------- |
89| **Healthy** | Normal operation, immediate reconnect on disconnect |
90| **Backoff** | Exponential backoff: 1s → 2s → 4s → ... → 1h max |
91| **Dead** | 24h of continuous failures, retry once per day |
92
93Health state is **persisted to database** to survive restarts:
94
95```rust
96struct RelayHealth {
97 url: RelayUrl,
98 status: RelayStatus, // Healthy, Backoff, Dead
99 consecutive_failures: u32,
100 last_failure_at: Option<Timestamp>,
101 last_success_at: Option<Timestamp>,
102 next_retry_at: Timestamp,
103}
104
105enum RelayStatus {
106 Healthy,
107 Backoff { attempt: u32 }, // backoff = min(2^attempt, 3600) seconds
108 Dead, // retry in 24h
109}
110```
111
112## Filter Strategy
113
114### Unified Filters for Live Sync and Negentropy
115
116The same filter logic is used for both live subscriptions and negentropy reconciliation:
117
118```mermaid
119flowchart LR
120 subgraph Filter Layers
121 F1[Layer 1: All 30617+30618]
122 F2[Layer 2: Events tagging repos via A/a/q]
123 F3[Layer 3: Events tagging PRs/Issues via E/e/q]
124 end
125
126 F1 -->|client-side| AP[Acceptance Policy]
127 F2 -->|server-side| Relay
128 F3 -->|server-side| Relay
129```
130
131### Layer 1: Repository Announcements & States
132
133Get ALL kind 30617 and 30618 events with unified `since` timestamp, then filter client-side through acceptance policy:
134
135```rust
136// Use same since filter as other layers for consistency
137let layer1_filter = Filter::new()
138 .kinds([Kind::from(30617), Kind::from(30618)])
139 .since(since_timestamp); // Unified with Layer 2/3
140```
141
142**Client-side validation**: Only store events that pass our [`Nip34WritePolicy`](src/nostr/builder.rs:51).
143
144### Layer 2: Events Tagging Repositories
145
146For repo announcements **that list BOTH this relay AND our service**:
147
148```rust
149// Build addressable references: 30617:<pubkey>:<identifier>
150let repo_refs: Vec<String> = announcements
151 .iter()
152 .filter(|a| a.relays.contains(&this_relay) && a.lists_service(&our_domain))
153 .map(|a| format!("30617:{}:{}", a.pubkey.to_hex(), a.identifier))
154 .collect();
155
156let layer2_filter = Filter::new()
157 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_refs.clone())
158 .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo_refs));
159```
160
161### Layer 3: Events Tagging Issues/PRs/Patches
162
163For events that reference PRs, Patches, or Issues from repos we track:
164
165```rust
166// Collect event IDs of PRs, Patches, Issues we've stored
167let tagged_event_ids: Vec<EventId> = database
168 .query(Filter::new().kinds([1618, 1619, 1621, 1622, 1630])) // PR, PR Update, Issue, Patch, etc.
169 .iter()
170 .filter(|e| references_tracked_repo(e, &announcements))
171 .map(|e| e.id)
172 .collect();
173
174let layer3_filter = Filter::new()
175 .custom_tag(SingleLetterTag::lowercase(Alphabet::E), tagged_event_ids.clone())
176 .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), tagged_event_ids));
177```
178
179### Filter Size Management
180
181When the tag list exceeds a threshold, split into batches:
182
183```rust
184const MAX_TAGS_PER_FILTER: usize = 100;
185
186fn build_filters(tag_values: Vec<String>) -> Vec<Filter> {
187 tag_values
188 .chunks(MAX_TAGS_PER_FILTER)
189 .map(|chunk| Filter::new().custom_tag(tag, chunk.to_vec()))
190 .collect()
191}
192```
193
194**Consolidation**: When total filter count exceeds ~150 across a connection, consolidate by rebuilding from scratch.
195
196## Subscription Updates
197
198### Dynamic Subscription Management
199
200When new events arrive that affect our filter criteria:
201
202```mermaid
203sequenceDiagram
204 participant LocalRelay
205 participant SyncManager
206 participant RemoteRelay
207
208 LocalRelay->>SyncManager: New PR event accepted
209 SyncManager->>SyncManager: Extract event ID
210 SyncManager->>SyncManager: Build new filter for E/e/q tags
211 SyncManager->>RemoteRelay: REQ with new filter
212 RemoteRelay-->>SyncManager: Events matching new filter
213```
214
215**Events that trigger subscription updates**:
216
217- New repository announcement accepted (adds to Layer 2)
218- New PR/Issue/Patch accepted (adds to Layer 3)
219
220### When to Consolidate
221
222Track subscription count per connection:
223
224```rust
225struct ConnectionState {
226 relay_url: RelayUrl,
227 subscriptions: Vec<SubscriptionId>,
228 total_filter_count: usize,
229}
230
231impl ConnectionState {
232 fn should_consolidate(&self) -> bool {
233 self.total_filter_count > 150
234 }
235
236 async fn consolidate(&mut self) {
237 // Close all subscriptions
238 // Rebuild from scratch with current database state
239 }
240}
241```
242
243## Negentropy Catchup
244
245### NIP-77 Reconciliation Protocol
246
247Negentropy enables efficient set reconciliation - discovering which events we're missing without transferring full event lists.
248
249### Timing
250
251| Trigger | Behavior |
252| ------------------- | -------------------------------------------------------------------- |
253| **Initial startup** | Warm-up delay, staggered if many filters, initializes daily schedule |
254| **After reconnect** | Delay to avoid rate limiting, limited to events from last 3 days |
255| **Daily** | Staggered batches, max 100 tagged events per filter |
256
257### Startup Flow
258
259```mermaid
260sequenceDiagram
261 participant Server
262 participant SyncManager
263 participant Relay
264
265 Server->>SyncManager: Start
266 SyncManager->>SyncManager: Wait warm-up delay
267 SyncManager->>SyncManager: Build batched filters
268
269 loop For each relay with stagger delay
270 SyncManager->>Relay: NEG-OPEN with filter batch 1
271 Relay-->>SyncManager: NEG-MSG with differences
272 SyncManager->>Relay: NEG-MSG response
273 Note over SyncManager,Relay: Reconciliation rounds
274 Relay-->>SyncManager: NEG-CLOSE or events
275 SyncManager->>SyncManager: Validate + store events
276
277 alt More batches
278 SyncManager->>SyncManager: Wait stagger delay
279 SyncManager->>Relay: NEG-OPEN with next batch
280 end
281 end
282
283 SyncManager->>SyncManager: Schedule daily catchup
284```
285
286### Reconnection Catchup
287
288After connection reestablished:
289
290```rust
291async fn catchup_after_reconnect(&self, relay: &RelayUrl) {
292 // Delay to avoid immediate disconnect for too many requests
293 tokio::time::sleep(RECONNECT_CATCHUP_DELAY).await;
294
295 // Only catch up on recent events (last 3 days)
296 let since = Timestamp::now() - Duration::from_secs(3 * 24 * 60 * 60);
297
298 let filters = self.build_filters_for_relay(relay)
299 .into_iter()
300 .map(|f| f.since(since))
301 .collect();
302
303 self.run_negentropy(relay, filters).await;
304}
305```
306
307### Daily Catchup Schedule
308
309```rust
310// Daily catchup runs at consistent time, staggered across relays
311async fn schedule_daily_catchup(&self) {
312 let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60));
313
314 loop {
315 interval.tick().await;
316
317 for (i, relay) in self.healthy_relays().enumerate() {
318 // Stagger: 5 minute delay between relays
319 tokio::time::sleep(Duration::from_secs(i as u64 * 300)).await;
320
321 // Batch filters to max 100 tagged events each
322 let batches = self.build_batched_filters(&relay, 100);
323
324 for batch in batches {
325 self.run_negentropy(&relay, batch).await;
326 tokio::time::sleep(Duration::from_secs(60)).await; // 1 min between batches
327 }
328 }
329 }
330}
331```
332
333## Event Processing
334
335### Acceptance Policy
336
337All synced events go through our acceptance policy (same as direct submissions), but **excluding nostr-sdk defaults** like rate limiting:
338
339```rust
340async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> {
341 // Skip nostr-sdk's built-in rate limiting - we trust our filter strategy
342 // and don't want to reject valid events just because they arrived quickly
343
344 // Apply our custom Nip34WritePolicy
345 let result = self.acceptance_policy
346 .admit_event(&event, source_relay)
347 .await;
348
349 match result {
350 PolicyResult::Accept => {
351 self.database.save_event(&event).await?;
352 self.trigger_subscription_updates(&event).await;
353 }
354 PolicyResult::Reject(reason) => {
355 tracing::debug!("Rejected synced event {}: {}", event.id.to_hex(), reason);
356 }
357 }
358
359 Ok(())
360}
361```
362
363## Module Structure
364
365### New `src/sync/` Module
366
367```
368src/
369├── sync/
370│ ├── mod.rs # Module exports
371│ ├── manager.rs # SyncManager - main coordinator
372│ ├── connection.rs # Per-relay connection handling
373│ ├── filter.rs # Filter building and batching
374│ ├── health.rs # RelayHealth tracking
375│ ├── negentropy.rs # NIP-77 reconciliation logic
376│ └── subscription.rs # Dynamic subscription management
377├── nostr/
378│ └── ... (existing)
379└── ...
380```
381
382### Integration with Main Binary
383
384```rust
385// In main.rs
386async fn main() -> Result<()> {
387 // ... existing setup ...
388
389 // Start sync manager as background task
390 let sync_manager = SyncManager::new(
391 database.clone(),
392 config.domain.clone(),
393 );
394
395 tokio::spawn(async move {
396 sync_manager.run().await
397 });
398
399 // ... rest of server startup ...
400}
401```
402
403## Metrics & Observability
404
405### Event Source Tracking
406
407Track provenance of every event to measure live sync effectiveness:
408
409```rust
410enum EventSource {
411 DirectSubmission, // Sent directly to our relay by a user
412 LiveSync(RelayUrl), // Received via live subscription
413 Catchup(RelayUrl), // Discovered during negentropy catchup
414 DailyCatchup(RelayUrl), // Found during daily reconciliation
415}
416
417struct SyncMetrics {
418 /// Events by source
419 events_from_direct: Counter,
420 events_from_live_sync: Counter,
421 events_from_catchup: Counter, // Indicates live sync failure
422 events_from_daily_catchup: Counter, // Indicates sustained sync gap
423
424 /// Catchup gap tracking - events found that should have been live synced
425 catchup_gap_total: Counter,
426 catchup_gap_by_relay: HashMap<RelayUrl, Counter>,
427}
428```
429
430**Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time.
431
432### Peer Reliability Tracking
433
434Track relay health and data completeness:
435
436```rust
437struct PeerReliability {
438 relay_url: RelayUrl,
439
440 // Connection metrics
441 connection_attempts: u64,
442 connection_failures: u64,
443 total_uptime_seconds: u64,
444 total_downtime_seconds: u64,
445
446 // Event coverage metrics (per repo we track)
447 repos_tracked: HashSet<RepoRef>,
448 missing_events_detected: HashMap<RepoRef, u64>, // Events we have that they dont
449 events_received_from: u64,
450
451 // Calculated scores
452 uptime_percentage: f64, // uptime / (uptime + downtime)
453 event_coverage_score: f64, // ratio of events we have vs what we expect from them
454}
455```
456
457### Observability Integration
458
459```rust
460// Prometheus-style metrics
461impl SyncManager {
462 fn record_event_received(&self, event: &Event, source: EventSource) {
463 match source {
464 EventSource::DirectSubmission => {
465 self.metrics.events_from_direct.inc();
466 }
467 EventSource::LiveSync(relay) => {
468 self.metrics.events_from_live_sync.inc();
469 self.peer_metrics.get(&relay).events_received_from += 1;
470 }
471 EventSource::Catchup(relay) => {
472 // This is a sync gap - we should have gotten it via live sync
473 self.metrics.events_from_catchup.inc();
474 self.metrics.catchup_gap_total.inc();
475 self.metrics.catchup_gap_by_relay.entry(relay.clone()).or_default().inc();
476 tracing::warn!(
477 relay = %relay,
478 event_id = %event.id.to_hex(),
479 "Sync gap detected: event found during catchup"
480 );
481 }
482 EventSource::DailyCatchup(relay) => {
483 // Sustained sync gap - missed by both live sync and initial catchup
484 self.metrics.events_from_daily_catchup.inc();
485 tracing::error!(
486 relay = %relay,
487 event_id = %event.id.to_hex(),
488 "Sustained sync gap: event found during daily catchup"
489 );
490 }
491 }
492 }
493
494 fn record_connection_attempt(&self, relay: &RelayUrl, success: bool) {
495 let peer = self.peer_metrics.entry(relay.clone()).or_default();
496 peer.connection_attempts += 1;
497 if !success {
498 peer.connection_failures += 1;
499 }
500 }
501}
502```
503
504### Log Levels for Sync Events
505
506| Event | Level | Context |
507| ----------------------- | ----- | ----------------------------- |
508| Event via live sync | DEBUG | Normal operation |
509| Event via catchup | WARN | Sync gap detected |
510| Event via daily catchup | ERROR | Sustained gap |
511| Connection established | INFO | Relay URL |
512| Connection failed | WARN | Relay URL, attempt #, backoff |
513| Relay marked dead | ERROR | Relay URL, failure duration |
514| Peer missing events | WARN | Relay URL, repo, count |
515
516## Configuration
517
518```rust
519pub struct SyncConfig {
520 /// Warm-up delay before starting initial catchup
521 pub startup_delay: Duration, // Default: 30s
522
523 /// Delay between filter batches during catchup
524 pub batch_delay: Duration, // Default: 60s
525
526 /// Delay after reconnect before catchup
527 pub reconnect_delay: Duration, // Default: 10s
528
529 /// Maximum events in last N days for reconnect catchup
530 pub reconnect_lookback_days: u32, // Default: 3
531
532 /// Maximum tagged event IDs per filter
533 pub max_tags_per_filter: usize, // Default: 100
534
535 /// Consolidate subscriptions when count exceeds
536 pub max_subscriptions: usize, // Default: 150
537
538 /// Backoff configuration
539 pub max_backoff: Duration, // Default: 1h
540 pub dead_threshold: Duration, // Default: 24h
541 pub dead_retry_interval: Duration, // Default: 24h
542}
543```
544
545## Summary
546
547| Component | Responsibility |
548| ---------------------- | ------------------------------------------------------------ |
549| **SyncManager** | Orchestrates connections, triggers catchup, processes events |
550| **FilterService** | Builds unified filters from database state |
551| **RelayHealthTracker** | Manages backoff, dead relay detection, persistence |
552| **ConnectionState** | Per-relay WebSocket + subscription management |
553
554### Key Design Decisions
555
5561. **Unified filters** for live sync and negentropy - same criteria, different delivery mechanism
5572. **Exclude ourselves** from relay list to prevent loops
5583. **One connection per relay** with combined filters for efficiency
5594. **Persisted health state** survives restarts
5605. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up
5616. **Client-side filtering** for 30617/30618, server-side for Layer 2/3
5627. **Dynamic subscription addition** with periodic consolidation
5638. **Custom acceptance policy** excluding rate limiting defaults
5649. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps
56510. **Peer reliability tracking** - monitor uptime and event coverage per relay