From 18ad93f8d0b8ce172c9c227385a21af66a507950 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 14:32:01 +0000 Subject: docs: remove old grasp-02 design doc versions --- docs/explanation/grasp-02-proactive-sync-v2.md | 785 ---------- docs/explanation/grasp-02-proactive-sync-v3.md | 871 ------------ docs/explanation/grasp-02-proactive-sync-v4.md | 1330 ----------------- docs/explanation/grasp-02-proactive-sync.md | 1811 +++++++++++++++--------- 4 files changed, 1106 insertions(+), 3691 deletions(-) delete mode 100644 docs/explanation/grasp-02-proactive-sync-v2.md delete mode 100644 docs/explanation/grasp-02-proactive-sync-v3.md delete mode 100644 docs/explanation/grasp-02-proactive-sync-v4.md (limited to 'docs') diff --git a/docs/explanation/grasp-02-proactive-sync-v2.md b/docs/explanation/grasp-02-proactive-sync-v2.md deleted file mode 100644 index 311e93c..0000000 --- a/docs/explanation/grasp-02-proactive-sync-v2.md +++ /dev/null @@ -1,785 +0,0 @@ -# GRASP-02: Proactive Sync v2 - Simplified Design - -## Overview - -This document presents a simplified redesign of the proactive sync module. The key insight is that **all sync filters can be derived from two database queries**, with incremental updates via self-subscription. - -## Goals (Same as v1) - -1. **Data Availability**: Ensure we have all relevant events for repositories we host -2. **Resilience**: Handle relay failures gracefully with backoff and health tracking -3. **Efficiency**: Minimize connections and bandwidth through filter consolidation -4. **Consistency**: Use unified filters for both live sync and catchup - -## Scale Targets & Upper Bounds - -This design targets the following scale: - -| Metric | Target | Notes | -| ----------------------------- | -------- | ----------------------------------------- | -| **Repositories** | 1,000 | Repos we host/track | -| **Root events per repo** | 50 (avg) | PRs, Issues, Patches per repo | -| **Total relays in ecosystem** | 100 | Unique relays across all repos | -| **Relays per repo** | 5 (avg) | Relays listed in each repo's announcement | -| **Total root events** | ~50,000 | 1,000 repos × 50 events | -| **Sync connections** | ~50-100 | Based on relay overlap | - -**Memory Estimate (in-memory HashMaps):** - -- `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB -- `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB -- **Total in-memory state**: ~10 MB (well within acceptable limits) - -**Upper Bounds (redesign triggers):** - -- **10,000+ repos**: Consider database-backed state instead of in-memory HashMaps -- **500+ sync relays**: Consider connection pooling or relay prioritization -- **500+ root events per repo**: Consider per-repo pagination in Layer 3 filters -- **Sustained >100 events/second**: Consider write batching to database - -Beyond these limits, the in-memory HashMap model may need to evolve to a database-backed approach with lazy loading. - -## Core Data Structures - -The entire sync filter state is captured in two HashMaps, initialized from database queries at startup: - -```rust -/// Repository root events we're following -/// Key: repo addressable reference (e.g., "30617:pubkey:identifier") -/// Value: Set of event IDs (kinds 1617, 1618, 1619, 1621) that tag this repo -/// -/// Note: May include a few extra repo refs that aren't in sync_relays. -/// This is acceptable - we won't query other relays for them. -type FollowingRepoRootEvents = Arc>>>; - -/// Relays we sync from, including their repos and events -/// Key: relay URL -/// Value: Map of repo_ref -> event IDs for repos that list both this relay AND our service -/// -/// Note: Bootstrap relay (if configured) is always present and excluded from removal logic. -type SyncRelays = Arc>>>>; -``` - -## Architecture Overview - -```mermaid -flowchart TB - subgraph Startup - DB[(Database)] - Q1[Query kinds 1617/1618/1619/1621] - Q2[Query kind 30617] - DB --> Q1 --> FRRE[following_repo_root_events] - DB --> Q2 --> SR[sync_relays] - BR[Bootstrap Relay] --> SR - end - - subgraph SyncManager - SS[Self-Subscriber] - FRRE --> SS - SR --> SS - end - - subgraph SyncRelays - R1[Relay Connection 1] - R2[Relay Connection 2] - RN[Relay Connection N] - end - - SS -->|spawn/update| R1 - SS -->|spawn/update| R2 - SS -->|spawn/update| RN - - R1 -->|events| AP[Acceptance Policy] - R2 -->|events| AP - RN -->|events| AP - AP -->|store| DB -``` - -## Module Structure - -The sync module is organized following the pattern used by `src/http/mod.rs` and `src/metrics/mod.rs` where the primary struct lives in `mod.rs`: - -``` -src/sync/ -├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays) -├── self_subscriber.rs # SelfSubscriber struct and batching logic -├── relay_connection.rs # Per-relay WebSocket connection management -├── health.rs # RelayHealthTracker for backoff and dead relay detection -└── metrics.rs # SyncMetrics for Prometheus integration -``` - -**Rationale:** The state type aliases (`FollowingRepoRootEvents`, `SyncRelays`) are simple `Arc>>` wrappers owned by SyncManager. Rather than creating a separate `state.rs` for two type aliases, they are colocated with SyncManager in `mod.rs` to reduce file count while maintaining clarity. - -## Design Decision: No Jitter - -We considered adding jitter to prevent thundering herd scenarios when: - -- Multiple relay connections initialize simultaneously -- Batched updates affect multiple relays -- Filter consolidation triggers across connections - -**Decision: No jitter implemented.** - -**Rationale:** - -- Our GRASP server should handle the load of simultaneous operations -- Jitter would lead to more orphan filters (filters added one at a time rather than atomically) -- Jitter creates inefficiency - partial subscriptions miss events during the stagger window -- The batching window (5s) already provides natural smoothing without the downsides - -## Health Tracking & Backoff - -```rust -/// Health state machine for relay connections -enum HealthState { - Healthy, // Connected and working - Backoff(u32), // Failed, attempt count for exponential backoff - Dead, // 24h+ continuous failures -} - -impl RelayHealthTracker { - /// Backoff durations: - /// - Attempt 1: 5s - /// - Attempt 2: 10s - /// - Attempt 3: 20s - /// - Attempt 4: 40s - /// - ... exponential up to 1h max - /// - After reaching 1h, continue hourly until 24h total failure time - /// - After 24h: marked Dead, retry once per 24h - fn get_backoff(&self, relay_url: &str) -> Duration; -} -``` - -| State | Retry Behavior | -| ----------- | -------------------------------------------- | -| **Healthy** | Immediate reconnect on disconnect | -| **Backoff** | 5s → 10s → 20s → ... → 1h max (exponential) | -| **Hourly** | Once per hour after hitting 1h cap | -| **Dead** | After 24h total failures, retry once per 24h | - -## Startup Initialization - -At startup, two database queries initialize the sync state: - -```rust -impl SyncManager { - async fn initialize_from_database(&mut self) -> Result<()> { - // Initialize bootstrap relay if configured (never removed) - if let Some(bootstrap_url) = &self.config.bootstrap_relay_url { - self.sync_relays.write().await.insert( - bootstrap_url.clone(), - HashMap::new() // Repos potentially populated below but may stay empty (Layer 1 only) - ); - } - - // Query 1: Build following_repo_root_events - // Find all 1617/1618/1619/1621 events and extract their repo references - let root_events = self.database - .query(Filter::new().kinds([ - Kind::GitPatch, // 1617 - Kind::Custom(1618), // PRs - Kind::Custom(1619), // PR updates - Kind::Custom(1621), // Issues - ])) - .await?; - - for event in root_events { - // An event may have multiple 'a' tags pointing to different repos - let repo_refs = self.extract_all_repo_refs(&event); - for repo_ref in repo_refs { - self.following_repo_root_events - .write().await - .entry(repo_ref) - .or_default() - .insert(event.id); - } - } - - // Query 2: Build sync_relays from kind 30617 announcements - let announcements = self.database - .query(Filter::new().kind(Kind::Custom(30617))) - .await?; - - for event in announcements { - let repo_ref = self.build_repo_ref(&event); - let relay_urls = self.extract_relay_urls(&event); - - // Only track repos that list BOTH a remote relay AND our service - if self.lists_our_service(&event) { - for relay_url in relay_urls { - if !self.is_own_relay(&relay_url) { - // Get events for this repo from following_repo_root_events - let events = self.following_repo_root_events - .read().await - .get(&repo_ref) - .cloned() - .unwrap_or_default(); - - self.sync_relays - .write().await - .entry(relay_url) - .or_default() - .insert(repo_ref.clone(), events); - } - } - } - } - - Ok(()) - } - - /// Extract ALL repo refs from an event (it may tag multiple repos) - fn extract_all_repo_refs(&self, event: &Event) -> Vec { - event.tags.iter() - .filter_map(|tag| { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() >= 2 && tag_vec[0] == "a" { - // Validate it's a 30617 reference - if tag_vec[1].starts_with("30617:") { - Some(tag_vec[1].clone()) - } else { - None - } - } else { - None - } - }) - .collect() - } -} -``` - -## Self-Subscriber: Event-Driven Updates - -A single self-subscriber watches for new events from **our own relay** and updates the HashMaps. - -**Important:** The self-subscriber does NOT subscribe to kind 30618 as this would never lead to refreshing the sync filters. Those events are synced from remote relays only (via Layer 1 filter on sync relay connections). - -### Batching Strategy - -The batch timer **starts only when the first event arrives**, not on a fixed interval. This prevents the scenario where an event arriving at second 4 of a 5-second interval only gets 1 second before the batch fires. - -**Important:** Once the batch timer starts, it does NOT reset when additional events arrive. The batch will fire exactly 5 seconds after the first event, regardless of how many subsequent events are queued. This ensures predictable latency and prevents indefinite batching during high-activity periods. - -```rust -impl SelfSubscriber { - async fn run(&self) { - // Subscribe to our own relay for relevant kinds - // Note: 30618 NOT included - synced from remote relays only - let filter = Filter::new() - .kinds([ - Kind::Custom(30617), // Repository announcements - Kind::GitPatch, // 1617 Patches - Kind::Custom(1618), // PRs - Kind::Custom(1619), // PR updates - Kind::Custom(1621), // Issues - ]); - - let mut pending_updates: Vec = Vec::new(); - let mut batch_deadline: Option = None; - - loop { - let timeout = batch_deadline - .map(|d| d.saturating_duration_since(Instant::now())) - .unwrap_or(Duration::MAX); - - tokio::select! { - Some(event) = self.event_receiver.recv() => { - pending_updates.push(self.classify_update(&event)); - - // Start batch timer on first event - if batch_deadline.is_none() { - batch_deadline = Some(Instant::now() + Duration::from_secs(5)); - } - } - _ = tokio::time::sleep(timeout), if batch_deadline.is_some() => { - // Batch window elapsed - apply all pending updates - self.apply_batched_updates(pending_updates.drain(..).collect()).await; - batch_deadline = None; - } - } - } - } - - fn classify_update(&self, event: &Event) -> PendingUpdate { - match event.kind.as_u16() { - 30617 => PendingUpdate::NewAnnouncement(event.clone()), - 1617 | 1618 | 1619 | 1621 => PendingUpdate::NewRootEvent(event.clone()), - _ => PendingUpdate::None, - } - } -} -``` - -### Applying Batched Updates - -When the batch window closes, we process all pending updates together: - -```rust -/// Batched updates grouped by relay -struct RelayUpdateBatch { - /// New repo refs to subscribe to (Layer 2) - new_repo_refs: HashSet, - /// New event IDs to subscribe to (Layer 3) - new_event_ids: HashSet, - /// Whether this is a newly discovered relay - is_new_relay: bool, -} - -impl SelfSubscriber { - async fn apply_batched_updates(&mut self, updates: Vec) { - // Step 1: Process all updates and update HashMaps - // Build batched actions per relay - let mut relay_batches: HashMap = HashMap::new(); - - for update in updates { - match update { - PendingUpdate::NewAnnouncement(event) => { - self.process_announcement(&event, &mut relay_batches).await; - } - PendingUpdate::NewRootEvent(event) => { - self.process_root_event(&event, &mut relay_batches).await; - } - PendingUpdate::None => {} - } - } - - // Step 2: Apply batched updates to each relay - for (relay_url, batch) in relay_batches { - self.apply_batch_to_relay(&relay_url, batch).await; - } - - // Step 3: Check for relay removal (repos removed from announcements) - self.check_relay_removal().await; - } - - async fn apply_batch_to_relay(&mut self, relay_url: &str, batch: RelayUpdateBatch) { - if batch.is_new_relay { - // Spawn new relay connection with full filters - self.spawn_sync_relay(relay_url.to_string()).await; - return; - } - - // Build incremental filters for new content (NO since - get historical) - let incremental_filters = self.build_incremental_filters(&batch); - - if incremental_filters.is_empty() { - return; - } - - // Check if we need to consolidate - let current_filter_count = self.get_filter_count_for_relay(relay_url).await; - let new_filter_count = current_filter_count + incremental_filters.len(); - - // Note: 70 is a conservative threshold that may need tuning based on - // production observations. It was chosen to trigger consolidation earlier - // than v1's 150, but optimal value depends on relay behavior. - if new_filter_count > 70 { - // Consolidate: add incremental filters first (no since), wait for EOSE, - // then close all and resubscribe with consolidated filters (with since) - self.consolidate_relay_subscription(relay_url, incremental_filters).await; - } else { - // Just add incremental filters (no since - to get historical events) - self.send_filters_to_relay(relay_url, incremental_filters).await; - } - } - - fn build_incremental_filters(&self, batch: &RelayUpdateBatch) -> Vec { - let mut filters = Vec::new(); - - // Layer 2: New repo refs (for ALL kinds that tag repos with 'a' tags) - if !batch.new_repo_refs.is_empty() { - let refs: Vec = batch.new_repo_refs.iter().cloned().collect(); - for chunk in refs.chunks(100) { - // All kinds with lowercase 'a' tag - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) - ); - // All kinds with uppercase 'A' tag - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) - ); - // All kinds with 'q' tag (quote) - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) - ); - } - } - - // Layer 3: New event IDs - if !batch.new_event_ids.is_empty() { - let ids: Vec = batch.new_event_ids.iter() - .map(|id| id.to_hex()) - .collect(); - for chunk in ids.chunks(100) { - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) - ); - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) - ); - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) - ); - } - } - - filters - } -} -``` - -### Consolidation Strategy - -When consolidating, we need a two-phase approach: - -1. First, subscribe with incremental filters (no `since`) to get any historical events we missed -2. After receiving EOSE, close all subscriptions and resubscribe with consolidated filters (with `since`) - -```rust -async fn consolidate_relay_subscription( - &mut self, - relay_url: &str, - incremental_filters: Vec, -) { - // Phase 1: Add incremental filters WITHOUT since to catch up on new content - // These filters are for new repo_refs / event_ids we just discovered - let phase1_sub_id = self.send_filters_to_relay_and_wait_eose( - relay_url, - incremental_filters - ).await; - - // Phase 2: After EOSE, consolidate everything - // Close ALL existing subscriptions for this relay - self.close_all_subscriptions(relay_url).await; - - // Build fresh consolidated filters using current HashMap state - let consolidated_filters = self.build_three_layer_filters_for_relay(relay_url).await; - - // Resubscribe with since = now - 15 minutes - let since = Timestamp::now() - 900; - let filters_with_since: Vec = consolidated_filters - .into_iter() - .map(|f| f.since(since)) - .collect(); - - self.send_filters_to_relay(relay_url, filters_with_since).await; -} -``` - -## Daily Full Catchup - -To capture events that may have taken longer than 15 minutes to propagate through the nostr network, we perform a daily full catchup: - -```rust -impl SyncManager { - /// Runs approximately every 24 hours per relay connection - async fn daily_catchup(&mut self, relay_url: &str) { - // Close all current subscriptions for this relay - self.close_all_subscriptions(relay_url).await; - - // Rebuild fresh filters from current HashMap state - let filters = self.build_three_layer_filters_for_relay(relay_url).await; - - // Subscribe WITHOUT since filter to get full historical sync - for filter in filters { - self.subscribe_to_relay(relay_url, filter).await; - } - - // After EOSE, switch back to live mode with since filter - self.wait_for_eose(relay_url).await; - - // Re-add since filter for ongoing live sync - let since = Timestamp::now() - 900; // 15 minutes ago - self.resubscribe_with_since(relay_url, since).await; - } -} -``` - -**Rationale:** The 15-minute reconnection window is standard for nostr event propagation, but some events may take longer. Rather than increasing the window (which would cause more duplicate processing), we do a daily full catchup to ensure nothing is missed. This adds minimal complexity while providing comprehensive coverage. - -## Sync Relay Connections - -Each sync relay connection uses the three-layer filter strategy: - -```rust -impl SyncRelayConnection { - async fn start(&mut self) { - loop { - match self.connect_and_subscribe().await { - Ok(()) => { - // Record successful connection - self.last_successful_connection = Instant::now(); - self.health_tracker.record_success(&self.url); - - // Run event loop until disconnect - self.run_event_loop().await; - } - Err(e) => { - self.health_tracker.record_failure(&self.url); - } - } - - // Reconnect with backoff and since filter - let backoff = self.health_tracker.get_backoff(&self.url); - tokio::time::sleep(backoff).await; - - // On reconnect, use since = last_successful - 15 minutes - self.reconnect_since = Some( - Timestamp::from(self.last_successful_connection - Duration::from_secs(900)) - ); - } - } - - async fn connect_and_subscribe(&mut self) -> Result<()> { - self.client.connect().await?; - - let filters = self.build_three_layer_filters().await; - - // Apply since filter if reconnecting - let filters = if let Some(since) = self.reconnect_since { - filters.into_iter().map(|f| f.since(since)).collect() - } else { - filters - }; - - for filter in filters { - self.client.subscribe(filter, None).await?; - } - - Ok(()) - } -} -``` - -## Three-Layer Filter Strategy - -```rust -impl SyncRelayConnection { - async fn build_three_layer_filters(&self) -> Vec { - let mut filters = Vec::new(); - - // Get repos for this relay - let repos = self.sync_relays.read().await - .get(&self.url) - .cloned() - .unwrap_or_default(); - - // Layer 1: Announcements (kinds 30617 + 30618) - // Note: 30618 is ONLY synced from remote relays, not self-subscribed - // Always included even if relay has no repos (bootstrap relay case) - filters.push( - Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)]) - ); - - // Layer 2: Events tagging repos with 'a' tags (ALL kinds) - // Batched per 100 repo refs - let repo_refs: Vec = repos.keys().cloned().collect(); - for chunk in repo_refs.chunks(100) { - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) - ); - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) - ); - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) - ); - } - - // Layer 3: Events tagging root events (batch per 100 event IDs) - let all_event_ids: HashSet = repos.values() - .flat_map(|ids| ids.iter().cloned()) - .collect(); - - let event_id_strs: Vec = all_event_ids - .iter() - .map(|id| id.to_hex()) - .collect(); - - for chunk in event_id_strs.chunks(100) { - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) - ); - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) - ); - filters.push( - Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) - ); - } - - filters - } -} -``` - -## Relay Removal - -```rust -async fn check_relay_removal(&mut self) { - let relays_to_check: Vec = self.sync_relays.read().await - .keys() - .cloned() - .collect(); - - for relay_url in relays_to_check { - // Never remove bootstrap relay - if Some(relay_url.as_str()) == self.config.bootstrap_relay_url.as_deref() { - continue; - } - - // Check if relay has any repos left - let should_remove = { - let sync_relays = self.sync_relays.read().await; - sync_relays.get(&relay_url) - .map(|repos| repos.is_empty()) - .unwrap_or(true) - }; - - if should_remove { - // Remove from HashMap - self.sync_relays.write().await.remove(&relay_url); - - // Close connection - self.close_relay_connection(&relay_url).await; - } - } -} -``` - -## Prometheus Metrics (Same as v1) - -| Metric | Type | Labels | Description | -| ------------------------------------- | ------- | ------------- | ----------------------- | -| `ngit_sync_relay_connected` | Gauge | relay | Connection status 1/0 | -| `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome | -| `ngit_sync_relay_status` | Gauge | relay | Health state 1/2/3 | -| `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures | -| `ngit_sync_events_total` | Counter | source | Events by source type | -| `ngit_sync_gap_events_total` | Counter | relay | Gap events filled | -| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | -| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected | -| `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count | - -## Module Structure (Simplified) - -``` -src/sync/ -├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays) -├── self_subscriber.rs # SelfSubscriber + batching logic -├── relay_connection.rs # Per-relay WebSocket + filters -├── health.rs # RelayHealthTracker (reuse from v1) -└── metrics.rs # SyncMetrics (reuse from v1) -``` - -> **Note:** SyncManager and state type aliases are colocated in `mod.rs` following the pattern of `src/http/mod.rs` (HttpService) and `src/metrics/mod.rs` (Metrics). See the earlier "Module Structure" section for rationale. - -## Comparison: v1 vs v2 - -| Aspect | v1 (Current) | v2 (Simplified) | -| ------------------- | ------------------------------------------------------------------ | --------------------------------------------- | -| **State Model** | Spread across FilterService, SubscriptionManager, ConnectionState | Two HashMaps derived from DB | -| **Relay Discovery** | Multiple paths: bootstrap, DB query, self-subscribe, remote events | Single path: DB init + self-subscribe | -| **Filter Updates** | Dynamic per-event subscription adds | Batched updates (5s window, starts on event) | -| **Consolidation** | Per-connection at 150 filters | Per-connection at 70 filters | -| **Batching** | Per 100 tags | Per 100 tags | -| **Reconnection** | Various backoff strategies | Unified: since = last_success - 15min | -| **Jitter** | Startup jitter | None (see design decision) | -| **30618 Handling** | Synced everywhere | Remote relays only, not self-subscribed | -| **1621 (Issues)** | Not included | Included with 1617/1618/1619 | -| **Layer 2 Scope** | Specific NIP-34 kinds | ALL kinds with 'a' tags | -| **Health Backoff** | Variable | 5s → exp → 1h max → hourly → dead@24h → daily | - -## Key Design Decisions - -1. **Single Source of Truth**: Two HashMaps represent all sync state, initialized from database -2. **Event-Driven Updates**: Self-subscriber updates HashMaps; relay connections read from them -3. **Batched Filter Updates**: 5-second window that starts on first event (timer does NOT reset on subsequent events) -4. **Uniform Reconnection**: Always use `since = last_successful - 15min` -5. **No Jitter**: Trade-offs not worth it - orphan filters and inefficiency outweigh thundering herd concerns -6. **Bootstrap Relay Protected**: Never removed from sync_relays - ensures at least one sync connection exists even when no repositories currently list our service (cold start / recovery scenario) -7. **30618 Remote-Only**: Maintainer state synced from remote relays, not self-subscribed -8. **70 Filter Consolidation Threshold**: Lower than v1's 150 for earlier consolidation (conservative value that may need tuning based on production observation) -9. **100-Tag Batching**: Consistent batch size for Layer 2 and Layer 3 filters -10. **Layer 2 All Kinds**: Subscribe to ALL events with 'a' tags, not just NIP-34 kinds -11. **Two-Phase Consolidation**: Incremental filters WITHOUT since first, then consolidated WITH since -12. **Multiple Repo Refs**: Handle events that tag multiple repos correctly -13. **Daily Full Catchup**: Periodic sync restart without `since` filter (~24h) to catch slow-propagating events -14. **Dual DB Queries at Startup**: Separate queries for root events and announcements. Could be combined into a single query, but some relays cache by kind which may make separate queries more efficient. Trade-off deferred for future optimization. - ---- - -## Detailed Flow Diagram - -```mermaid -sequenceDiagram - participant DB as Database - participant SM as SyncManager - participant SS as Self-Subscriber - participant RC as RelayConnection - - Note over SM: Startup - SM->>SM: Add bootstrap relay to sync_relays - SM->>DB: Query kinds 1617/1618/1619/1621 - DB-->>SM: Root events - SM->>SM: Build following_repo_root_events - SM->>SM: Handle multi-repo events - - SM->>DB: Query kind 30617 - DB-->>SM: Announcements - SM->>SM: Build sync_relays - - SM->>RC: Spawn connections for each relay - RC->>RC: Build 3-layer filters from sync_relays - RC->>RC: Connect and subscribe - - Note over SS: Event-Driven Updates - SS->>SS: Subscribe to 30617/1617/1618/1619/1621 - SS->>SS: Receive event - start 5s batch timer - SS->>SS: Collect more events in batch window - SS->>SS: Batch window closes - SS->>SM: Apply batched updates to HashMaps - - alt New Relay Discovered - SM->>RC: Spawn new connection - else New Content for Existing Relay - alt Under 70 filter limit - SM->>RC: Add incremental filter - no since - else Over 70 filter limit - SM->>RC: Add incremental filters - no since - RC-->>SM: EOSE received - SM->>RC: Close all subscriptions - SM->>RC: Resubscribe consolidated - with since - end - else Relay Has No More Repos - alt Is Bootstrap Relay - SM->>SM: Keep connection - Layer 1 only - else Not Bootstrap Relay - SM->>RC: Close connection - end - end - - Note over RC: Connection Lifecycle - RC->>RC: Process incoming events - RC->>DB: Store via acceptance policy - - RC->>RC: Connection drops - RC->>RC: Wait backoff - 5s to 1h exponential - RC->>RC: Reconnect with since = last_success - 15min - - Note over RC: If failures continue 24h - RC->>RC: Mark dead - retry once per 24h -``` diff --git a/docs/explanation/grasp-02-proactive-sync-v3.md b/docs/explanation/grasp-02-proactive-sync-v3.md deleted file mode 100644 index 30b3102..0000000 --- a/docs/explanation/grasp-02-proactive-sync-v3.md +++ /dev/null @@ -1,871 +0,0 @@ -# GRASP-02: Proactive Sync v3 - Event-Driven Design - -## Overview - -This document presents v3 of the proactive sync design. Key principles: - -1. **Self-subscription as the only mechanism** - No database initialization at startup -2. **Batch-based pending tracking** - Each batch confirms independently -3. **Single action type** - AddFilters only, auto-spawn connections -4. **Three-way state model** - RepoSyncIndex (want) → PendingSyncIndex (in-flight) → RelaySyncIndex (confirmed) - ---- - -## Data Model - -### RepoSyncIndex (Source of Truth) - -```rust -/// What we WANT to sync - derived from events received via self-subscription. -/// Updated immediately when self-subscriber batch fires. -/// Key: repo addressable ref ("30617:pubkey:identifier") -pub type RepoSyncIndex = Arc>>; - -#[derive(Debug, Clone, Default)] -pub struct RepoSyncNeeds { - /// Relay URLs listed in this repo's 30617 announcement - pub relays: HashSet, - /// Root event IDs (1617/1618/1619/1621) that reference this repo - pub root_events: HashSet, -} -``` - -### RelaySyncIndex (Confirmed State + Connection) - -```rust -/// What we've CONFIRMED syncing - includes connection state for integrated lifecycle. -/// Key: relay URL -pub type RelaySyncIndex = Arc>>; - -/// Connection status for a relay -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ConnectionStatus { - /// Not currently connected - Disconnected, - /// Connection attempt in progress - Connecting, - /// Successfully connected and subscribed - Connected, -} - -/// Complete state for a single relay - combines sync needs with connection lifecycle -#[derive(Debug)] -pub struct RelayState { - /// Repos we've confirmed syncing from this relay - pub repos: HashSet, - /// Root events we've confirmed tracking - pub root_events: HashSet, - /// If true, never disconnect this relay - pub is_bootstrap: bool, - /// Current connection status - pub connection_status: ConnectionStatus, - /// When we last successfully connected (for since filter on reconnect) - pub last_connected: Option, - /// When we disconnected (for 15-minute state retention rule) - pub disconnected_at: Option, - /// The active connection (None if disconnected) - pub connection: Option, -} - -impl RelayState { - /// Check if state should be cleared based on 15-minute rule - pub fn should_clear_state(&self) -> bool { - match self.disconnected_at { - Some(disconnected) => { - let now = Timestamp::now(); - now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes - } - None => false, // Still connected or never connected - } - } - - /// Clear repos and root_events (called when reconnect takes > 15 minutes) - pub fn clear_sync_state(&mut self) { - self.repos.clear(); - self.root_events.clear(); - } -} -``` - -### PendingSyncIndex (In-Flight Batches) - -```rust -/// Tracks batches of subscriptions that are in-flight, awaiting EOSE. -/// Each batch has its own ID and can confirm independently. -/// Key: relay URL -pub type PendingSyncIndex = Arc>>>; - -#[derive(Debug, Clone)] -pub struct PendingBatch { - /// Unique ID for this batch (for debugging/logging) - pub batch_id: u64, - /// The items this batch is syncing - pub items: PendingItems, - /// Subscription IDs that must ALL receive EOSE before confirming - pub outstanding_subs: HashSet, -} - -#[derive(Debug, Clone, Default)] -pub struct PendingItems { - pub repos: HashSet, - pub root_events: HashSet, -} -``` - ---- - -## State Flow - -```mermaid -flowchart TB - subgraph Input - SS[SelfSubscriber] - OWN[Own Relay] - end - - subgraph RepoSyncIndex - Want - RSI[HashMap of Repo to Relays+Events] - end - - subgraph Derived Target - DT[derive_relay_targets fn] - TGT[Per-relay: repos + events we should sync] - end - - subgraph PendingSyncIndex - In Flight - PSI[Vec of PendingBatch per relay] - end - - subgraph RelaySyncIndex - State + Connection - RLI[RelayState per relay] - CONN[connection: Option of RelayConnection] - STATUS[connection_status: Connected/Disconnected/Connecting] - REPOS[repos + root_events] - end - - SS -->|subscribe| OWN - OWN -->|events| SS - SS -->|batch fires| RSI - RSI --> DT - DT --> TGT - TGT -->|diff: target - pending - confirmed| DIFF[Compute new items] - PSI --> DIFF - RLI --> DIFF - DIFF -->|skip if disconnected| CHECK{Connected?} - CHECK -->|yes| AF[AddFilters] - CHECK -->|no| QUEUE[Queued in RelayState.repos] - AF -->|subscribe| CONN - AF -->|create batch| PSI - CONN -->|EOSE| PSI - PSI -->|batch complete| REPOS - CONN -->|disconnect event| DISC[Mark Disconnected + set disconnected_at] - DISC -->|reconnect| RECONN[On Reconnect] - RECONN -->|check 15min rule| RULE{disconnected > 15min?} - RULE -->|yes| CLEAR[Clear repos/root_events] - RULE -->|no| RETAIN[Keep retained state] - CLEAR --> REGEN[Regenerate AddFilters from RepoSyncIndex] - RETAIN --> RESUB[Resubscribe with since filter] -``` - -### Connection Lifecycle Integration - -The `RelayState` struct now owns both the connection and sync state: - -```rust -// On disconnect (detected via RelayPoolNotification::Shutdown or handle_notifications returning) -fn handle_disconnect(&mut self, relay_url: &str) { - if let Some(state) = self.relay_sync_index.write().await.get_mut(relay_url) { - state.connection_status = ConnectionStatus::Disconnected; - state.disconnected_at = Some(Timestamp::now()); - state.connection = None; - - // Clear any pending batches for this relay - self.pending_sync_index.write().await.remove(relay_url); - } -} - -// On reconnect -async fn handle_reconnect(&mut self, relay_url: &str) -> Result<(), Error> { - let mut index = self.relay_sync_index.write().await; - let state = index.get_mut(relay_url).ok_or("Relay not in index")?; - - // Apply 15-minute state retention rule - if state.should_clear_state() { - tracing::info!("Reconnect after >15min for {}, clearing state", relay_url); - state.clear_sync_state(); - } - - // Create new connection - state.connection_status = ConnectionStatus::Connecting; - let connection = RelayConnection::new(relay_url.to_string()); - - // Connect with since filter if we have last_connected - let since = state.last_connected.map(|ts| { - Timestamp::from(ts.as_u64().saturating_sub(900)) // -15 min buffer - }); - - connection.connect_and_subscribe_with_since(since).await?; - - state.connection = Some(connection); - state.connection_status = ConnectionStatus::Connected; - state.last_connected = Some(Timestamp::now()); - state.disconnected_at = None; - - drop(index); // Release lock - - // Regenerate AddFilters from current state (either retained or fresh from RepoSyncIndex) - self.regenerate_filters_for_relay(relay_url).await; - - Ok(()) -} - -/// Regenerate AddFilters for a relay after reconnection -async fn regenerate_filters_for_relay(&mut self, relay_url: &str) { - let repo_index = self.repo_sync_index.read().await; - let targets = derive_relay_targets(&repo_index); - - if let Some(target) = targets.get(relay_url) { - // Build filters for everything this relay should sync - let filters = build_filters(&target.repos, &target.root_events); - - // Create and process AddFilters action - let action = AddFilters { - relay_url: relay_url.to_string(), - repos: target.repos.clone(), - root_events: target.root_events.clone(), - filters, - }; - - self.handle_add_filters(action).await; - } -} -``` - ---- - -## Action Type - -```rust -/// Action sent from SelfSubscriber to SyncManager. -/// SyncManager auto-spawns relay connections if they don't exist. -pub struct AddFilters { - pub relay_url: String, - /// Items this action covers (for pending tracking) - pub repos: HashSet, - pub root_events: HashSet, - /// Pre-batched filters (each with <= 100 tags) - pub filters: Vec, -} -``` - ---- - -## Core Algorithms - -### 1. derive_relay_targets - -Transform RepoSyncIndex into per-relay sync targets: - -```rust -fn derive_relay_targets( - repo_index: &HashMap -) -> HashMap { - let mut targets: HashMap = HashMap::new(); - - for (repo_ref, needs) in repo_index { - for relay_url in &needs.relays { - let target = targets.entry(relay_url.clone()).or_default(); - target.repos.insert(repo_ref.clone()); - target.root_events.extend(needs.root_events.iter().cloned()); - } - } - - targets -} -``` - -### 2. compute_actions (Three-Way Diff) - -```rust -fn compute_actions( - targets: &HashMap, - pending: &HashMap>, - confirmed: &HashMap, -) -> Vec { - let mut actions = Vec::new(); - - for (relay_url, target) in targets { - // Skip disconnected relays - they'll get AddFilters on reconnect - if let Some(state) = confirmed.get(relay_url) { - if state.connection_status != ConnectionStatus::Connected { - continue; - } - } - - // Collect all pending items for this relay - let pending_repos: HashSet<_> = pending.get(relay_url) - .map(|batches| batches.iter() - .flat_map(|b| b.items.repos.iter().cloned()) - .collect()) - .unwrap_or_default(); - let pending_events: HashSet<_> = pending.get(relay_url) - .map(|batches| batches.iter() - .flat_map(|b| b.items.root_events.iter().cloned()) - .collect()) - .unwrap_or_default(); - - // Collect confirmed items for this relay - let confirmed_repos = confirmed.get(relay_url) - .map(|c| &c.repos) - .unwrap_or(&HashSet::new()); - let confirmed_events = confirmed.get(relay_url) - .map(|c| &c.root_events) - .unwrap_or(&HashSet::new()); - - // New = target - pending - confirmed - let new_repos: HashSet<_> = target.repos.iter() - .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) - .cloned() - .collect(); - let new_events: HashSet<_> = target.root_events.iter() - .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) - .cloned() - .collect(); - - if !new_repos.is_empty() || !new_events.is_empty() { - let filters = build_filters(&new_repos, &new_events); - actions.push(AddFilters { - relay_url: relay_url.clone(), - repos: new_repos, - root_events: new_events, - filters, - }); - } - } - - actions -} -``` - -### 3. handle_add_filters (SyncManager) - -```rust -impl SyncManager { - async fn handle_add_filters(&mut self, action: AddFilters) { - let AddFilters { relay_url, repos, root_events, filters } = action; - - // Auto-spawn connection if needed - if !self.connections.contains_key(&relay_url) { - self.spawn_connection(&relay_url).await; - } - - let conn = self.connections.get(&relay_url).unwrap(); - - // Subscribe and collect subscription IDs - // nostr-sdk 0.44: subscribe returns Output> - // since we're only subscribed to one relay per connection - let mut sub_ids = HashSet::new(); - for filter in filters { - // cloned filter for each subscription call - match conn.client.subscribe(filter, None).await { - Ok(output) => { - // Output contains subscription IDs for each relay - for sub_id in output.val { - sub_ids.insert(sub_id); - } - } - Err(e) => { - tracing::warn!("Failed to subscribe: {}", e); - } - } - } - - // Create pending batch - let batch = PendingBatch { - batch_id: self.next_batch_id(), - items: PendingItems { repos, root_events }, - outstanding_subs: sub_ids, - }; - - // Add to pending index - self.pending_sync_index.write().await - .entry(relay_url) - .or_default() - .push(batch); - } -} -``` - -### 4. handle_eose (Batch Completion) - -```rust -impl SyncManager { - async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { - let mut pending = self.pending_sync_index.write().await; - - if let Some(batches) = pending.get_mut(relay_url) { - // Find which batch this subscription belongs to - for batch in batches.iter_mut() { - if batch.outstanding_subs.remove(&sub_id) { - // Check if batch is now complete - if batch.outstanding_subs.is_empty() { - // Move items to confirmed - let items = batch.items.clone(); - drop(pending); // Release lock before acquiring another - - let mut confirmed = self.relay_sync_index.write().await; - let relay_confirmed = confirmed - .entry(relay_url.to_string()) - .or_default(); - relay_confirmed.repos.extend(items.repos); - relay_confirmed.root_events.extend(items.root_events); - - tracing::info!( - "Batch {} complete for {} - confirmed {} repos, {} events", - batch.batch_id, relay_url, - items.repos.len(), items.root_events.len() - ); - } - break; - } - } - - // Clean up completed batches - if let Some(batches) = pending.get_mut(relay_url) { - batches.retain(|b| !b.outstanding_subs.is_empty()); - } - } - } -} -``` - ---- - -## Self-Subscriber Flow - -### State Tracking - -```rust -pub struct SelfSubscriber { - own_relay_url: String, - relay_domain: String, - repo_sync_index: RepoSyncIndex, - pending_sync_index: PendingSyncIndex, - relay_sync_index: RelaySyncIndex, - action_tx: mpsc::Sender, - /// Timestamp of last successful connection - used for since filter on reconnection - last_connected: Option, - /// Is this the first connection attempt since startup? - is_initial_connect: bool, -} -``` - -### On Startup - -```rust -impl SelfSubscriber { - async fn run(mut self) { - // Connect to own relay - let client = Client::new(Keys::generate()); - client.add_relay(&self.own_relay_url).await?; - client.connect().await; - - // Track connection time - self.last_connected = Some(Timestamp::now()); - - // Subscribe WITHOUT since filter (get all historical) on first connect - let filter = Filter::new().kinds([ - Kind::Custom(30617), // Repository announcements - Kind::GitPatch, // 1617 - Kind::Custom(1618), // PRs - Kind::Custom(1619), // PR updates - Kind::GitIssue, // 1621 - ]); - - client.subscribe(filter, None).await?; - self.is_initial_connect = false; - - // Run event loop with batching - self.event_loop(&client).await; - } -} -``` - -### On Reconnection - -```rust -impl SelfSubscriber { - async fn reconnect(&mut self, client: &Client) -> Result<(), Error> { - // Reconnect to own relay - client.connect().await; - - // On reconnection ONLY, use since filter based on last_connected - let since = match self.last_connected { - Some(ts) => Timestamp::from(ts.as_u64().saturating_sub(900)), // -15 minutes buffer - None => Timestamp::from(0), // Shouldn't happen, but fall back to full sync - }; - - // Update last_connected AFTER computing since - self.last_connected = Some(Timestamp::now()); - - let filter = Filter::new() - .kinds([ - Kind::Custom(30617), - Kind::GitPatch, - Kind::Custom(1618), - Kind::Custom(1619), - Kind::GitIssue, - ]) - .since(since); - - client.subscribe(filter, None).await?; - Ok(()) - } -} -``` - -### Batching Logic - -```rust -impl SelfSubscriber { - async fn event_loop(&self, client: &Client) { - let mut pending_events: Vec = Vec::new(); - let mut batch_timer: Option = None; - let batch_window = Duration::from_secs(5); - - loop { - let timeout = batch_timer - .map(|t| batch_window.saturating_sub(t.elapsed())) - .unwrap_or(Duration::from_secs(60)); - - tokio::select! { - notification = client.notifications().recv() => { - if let Ok(RelayPoolNotification::Event { event, .. }) = notification { - pending_events.push(*event); - - // Start timer on first event (does NOT reset) - if batch_timer.is_none() { - batch_timer = Some(Instant::now()); - } - } - } - _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { - // Batch window elapsed - self.process_batch(pending_events.drain(..).collect()).await; - batch_timer = None; - } - } - } - } - - async fn process_batch(&self, events: Vec) { - // 1. Update RepoSyncIndex - for event in events { - match event.kind.as_u16() { - 30617 => self.handle_announcement(&event).await, - 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, - _ => {} - } - } - - // 2. Derive targets and compute actions - let repo_index = self.repo_sync_index.read().await; - let targets = derive_relay_targets(&repo_index); - - let pending = self.pending_sync_index.read().await; - let confirmed = self.relay_sync_index.read().await; - - let actions = compute_actions(&targets, &pending, &confirmed); - - drop(repo_index); - drop(pending); - drop(confirmed); - - // 3. Send actions to SyncManager - for action in actions { - let _ = self.action_tx.send(action).await; - } - } -} -``` - ---- - -## Bootstrap Relay - -```rust -impl SyncManager { - async fn initialize_bootstrap(&mut self) { - if let Some(url) = &self.config.bootstrap_relay_url { - // Pre-mark as bootstrap (never removed) - self.relay_sync_index.write().await.insert( - url.clone(), - RelaySyncNeeds { - repos: HashSet::new(), - root_events: HashSet::new(), - is_bootstrap: true, - } - ); - - // Send Layer 1 filter - let filters = vec![ - Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)]) - ]; - - self.handle_add_filters(AddFilters { - relay_url: url.clone(), - repos: HashSet::new(), // Layer 1 doesn't track specific repos - root_events: HashSet::new(), - filters, - }).await; - } - } -} -``` - ---- - -## Disconnect Handling - -Direct in SyncManager (not via action): - -```rust -impl SyncManager { - async fn check_disconnects(&mut self) { - let confirmed = self.relay_sync_index.read().await; - - for (relay_url, state) in confirmed.iter() { - if state.is_bootstrap { - continue; // Never disconnect bootstrap - } - - if state.repos.is_empty() && state.root_events.is_empty() { - // No repos - disconnect - self.disconnect_relay(relay_url).await; - } - } - } - - async fn disconnect_relay(&mut self, relay_url: &str) { - self.relay_sync_index.write().await.remove(relay_url); - self.pending_sync_index.write().await.remove(relay_url); - - if let Some(conn) = self.connections.remove(relay_url) { - conn.disconnect().await; - } - } -} -``` - ---- - -## Relay Connection Lifecycle - -### State Machine for External Relays - -```mermaid -stateDiagram-v2 - [*] --> Connecting: spawn_connection - Connecting --> Connected: success - Connecting --> Backoff: failure - Connected --> Disconnected: connection lost - Connected --> [*]: intentional disconnect - Disconnected --> Backoff: record_failure - Backoff --> Connecting: backoff elapsed - Backoff --> Dead: 24h continuous failures - Dead --> Connecting: daily retry -``` - -### Health Integration - -Uses `RelayHealthTracker` from [`src/sync/health.rs`](../../src/sync/health.rs): - -```rust -impl SyncManager { - /// Spawn a connection with health tracking - async fn spawn_connection(&mut self, relay_url: &str) { - // Check if we should attempt connection - if !self.health_tracker.should_attempt_connection(relay_url) { - let remaining = self.health_tracker.get_remaining_backoff(relay_url); - tracing::debug!( - "Skipping connection to {} - backoff {:?}", - relay_url, - remaining - ); - return; - } - - match self.try_connect(relay_url).await { - Ok(conn) => { - self.health_tracker.record_success(relay_url); - self.connections.insert(relay_url.to_string(), conn); - } - Err(e) => { - self.health_tracker.record_failure(relay_url); - tracing::warn!("Connection to {} failed: {}", relay_url, e); - } - } - } -} -``` - -### Reconnection Loop - -Each relay connection runs its own reconnection loop: - -```rust -impl RelayConnection { - async fn run_with_reconnection( - mut self, - health_tracker: Arc, - event_tx: mpsc::Sender, - ) { - loop { - // Check backoff before attempting - if !health_tracker.should_attempt_connection(&self.url) { - if let Some(remaining) = health_tracker.get_remaining_backoff(&self.url) { - tokio::time::sleep(remaining).await; - continue; - } - } - - // Attempt connection - match self.connect_and_subscribe().await { - Ok(()) => { - health_tracker.record_success(&self.url); - - // Track when we connected for since filter on reconnect - let connected_at = Timestamp::now(); - - // Run event loop until disconnection - self.run_event_loop(&event_tx).await; - - // Connection lost - will reconnect with since filter - health_tracker.record_failure(&self.url); - - // On reconnect, use since = connected_at - 15 minutes - self.set_reconnect_since(connected_at); - } - Err(e) => { - health_tracker.record_failure(&self.url); - tracing::warn!("Connection to {} failed: {}", self.url, e); - } - } - - // Get backoff duration and wait - let state = health_tracker.get_state(&self.url); - if state == HealthState::Dead { - // Dead relays retry once per 24 hours - tokio::time::sleep(Duration::from_secs(24 * 3600)).await; - } - // Otherwise, loop will check should_attempt_connection - } - } -} -``` - -### Backoff Configuration - -From existing [`RelayHealthTracker`](../../src/sync/health.rs:91): - -| Parameter | Value | Notes | -|-----------|-------|-------| -| Base backoff | 5 seconds | First failure | -| Backoff multiplier | 2x | Exponential increase | -| Max backoff | 1 hour (configurable) | `sync_max_backoff_secs` | -| Dead threshold | 24 hours | Continuous failures | -| Dead retry interval | 24 hours | Once per day | - ---- - -## Consolidation - -### Threshold-Based (70 filters) - -```rust -impl SyncManager { - async fn maybe_consolidate(&mut self, relay_url: &str) { - let filter_count = self.get_filter_count(relay_url).await; - - if filter_count > 70 { - self.consolidate(relay_url).await; - } - } - - async fn consolidate(&mut self, relay_url: &str) { - // 1. Wait for all pending batches to complete - self.wait_pending_complete(relay_url).await; - - // 2. Close all subscriptions - self.close_all_subs(relay_url).await; - - // 3. Rebuild filters from confirmed state - let confirmed = self.relay_sync_index.read().await; - let state = confirmed.get(relay_url)?; - let filters = build_filters(&state.repos, &state.root_events); - - // 4. Resubscribe with since = now - 15 minutes - let since = Timestamp::now() - 900; - for filter in filters { - self.subscribe(relay_url, filter.since(since)).await; - } - } -} -``` - -### Daily Timer (23-25h Random) - -```rust -impl SyncManager { - async fn run_daily_consolidation(&self) { - loop { - let hours = 23 + rand::random::() * 2.0; - tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; - - for relay_url in self.connections.keys() { - self.consolidate(relay_url).await; - } - } - } -} -``` - ---- - -## Key Design Decisions - -| Decision | Choice | Rationale | -|----------|--------|-----------| -| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | -| Since filter | Only on reconnection | Initial subscribe gets full history | -| Pending tracking | Per-batch with batch ID | Independent confirmation, no blocking | -| EOSE requirement | All subs in batch must complete | Single repo may need multiple filter subs | -| Action type | Struct not enum | Only one action type needed | -| Relay spawning | Auto-spawn on AddFilters | Simplifies action logic | -| Disconnect | Direct in SyncManager | Not worth an action type | -| Consolidation | 70 filters + daily timer | Threshold for growth, timer for staleness | -| Timestamps | In-memory only | Not critical for correctness | -| Health tracking | Reuse existing RelayHealthTracker | Already implements exponential backoff, dead relay detection | -| Reconnection backoff | Exponential to 1h max | Prevents hammering failed relays | -| Dead relay policy | 24h threshold, daily retry | Balance between giving up and resource waste | -| last_connected tracking | Per-connection in-memory | Enables 15-minute buffer on reconnect | -| Connection ownership | Inside RelayState | Ties connection lifecycle to sync state, simpler than separate maps | -| State retention rule | Clear if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | -| Skip disconnected | compute_actions skips disconnected | Prevents queuing AddFilters for offline relays | -| Reconnect triggers | handle_notifications returns or Shutdown | nostr-sdk signals disconnect via event loop exit | -| On-reconnect flow | Regenerate AddFilters from RepoSyncIndex | Fresh subscriptions for what we actually need | - ---- - -## Module Structure - -``` -src/sync/ -├── mod.rs # SyncManager, main loop -├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types -├── actions.rs # AddFilters struct, compute_actions -├── self_subscriber.rs # SelfSubscriber, batching logic -├── relay_connection.rs # Per-relay WebSocket connection -├── consolidation.rs # Consolidation logic, daily timer -├── health.rs # Health tracking (reuse from v2) -└── metrics.rs # Prometheus metrics (reuse from v2) \ No newline at end of file diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md deleted file mode 100644 index dd508b3..0000000 --- a/docs/explanation/grasp-02-proactive-sync-v4.md +++ /dev/null @@ -1,1330 +0,0 @@ -# GRASP-02: Proactive Sync v4 - Health & Reconnection Design - -## Overview - -This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles: - -1. **Self-subscription as the only mechanism** - No database initialization at startup -2. **compute_actions as single decision point** - Determines what NEW subscriptions to create -3. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) -4. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch -5. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary - ---- - -## Data Model - -### RepoSyncIndex (Source of Truth) - -```rust -/// What we WANT to sync - derived from events received via self-subscription. -/// Updated immediately when self-subscriber batch fires. -/// Key: repo addressable ref - 30617:pubkey:identifier -pub type RepoSyncIndex = Arc>>; - -#[derive(Debug, Clone, Default)] -pub struct RepoSyncNeeds { - /// Relay URLs listed in this repo's 30617 announcement - pub relays: HashSet, - /// Root event IDs - 1617/1618/1619/1621 - that reference this repo - pub root_events: HashSet, -} -``` - -### RelaySyncIndex (Confirmed State + Connection) - -```rust -/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. -/// Key: relay URL -pub type RelaySyncIndex = Arc>>; - -/// Connection status for a relay -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ConnectionStatus { - /// Not currently connected - Disconnected, - /// Connection attempt in progress - Connecting, - /// Successfully connected and subscribed - Connected, -} - -/// Complete state for a single relay - combines sync needs with connection lifecycle -#[derive(Debug)] -pub struct RelayState { - /// Repos we have confirmed syncing from this relay - pub repos: HashSet, - /// Root events we have confirmed tracking - pub root_events: HashSet, - /// If true, never disconnect this relay - pub is_bootstrap: bool, - /// Current connection status - pub connection_status: ConnectionStatus, - /// When we last successfully connected - used for since filter on reconnect - pub last_connected: Option, - /// When we disconnected - for 15-minute state retention rule - pub disconnected_at: Option, -} - -impl RelayState { - /// Check if state should be cleared based on 15-minute rule - pub fn should_clear_state(&self) -> bool { - match self.disconnected_at { - Some(disconnected) => { - let now = Timestamp::now(); - now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes - } - None => false, // Still connected or never connected - } - } - - /// Clear repos and root_events - called when reconnect takes > 15 minutes - pub fn clear_sync_state(&mut self) { - self.repos.clear(); - self.root_events.clear(); - } -} -``` - -### PendingSyncIndex (In-Flight Batches) - -```rust -/// Tracks batches of subscriptions that are in-flight, awaiting EOSE. -/// Each batch has its own ID and can confirm independently. -/// Key: relay URL -pub type PendingSyncIndex = Arc>>>; - -#[derive(Debug, Clone)] -pub struct PendingBatch { - /// Unique ID for this batch - for debugging/logging - pub batch_id: u64, - /// The items this batch is syncing - pub items: PendingItems, - /// Subscription IDs that must ALL receive EOSE before confirming - pub outstanding_subs: HashSet, -} - -#[derive(Debug, Clone, Default)] -pub struct PendingItems { - pub repos: HashSet, - pub root_events: HashSet, -} -``` - ---- - -## Connection Lifecycle State Machine - -```mermaid -stateDiagram-v2 - [*] --> Disconnected: discover relay via RepoSyncIndex - Disconnected --> Connecting: AddFilters triggers spawn_connection - Connecting --> Connected: success - Connecting --> Disconnected: failure + record in health tracker - Connected --> Disconnected: connection lost - Connected --> [*]: intentional disconnect via check_disconnects - - note right of Disconnected: disconnected_at set for 15min rule - note right of Connected: last_connected tracked for since filter -``` - ---- - -## Flow Scenarios - -### Scenario 1: Initial Connect via handle_connect_or_reconnect - -```mermaid -flowchart TB - START[Startup] --> SS[Self-subscribe to own relay] - SS --> |no since filter| EVENTS[Receive historical events] - EVENTS --> RSI[Update RepoSyncIndex] - RSI --> DT[derive_relay_targets] - DT --> CA[compute_actions with targets and empty confirmed] - CA --> AF[AddFilters for each relay] - AF --> SPAWN{Relay connected?} - SPAWN --> |no| CONN[spawn_connection] - CONN --> HC[handle_connect_or_reconnect] - SPAWN --> |yes| SUB - - subgraph handle_connect_or_reconnect - Fresh Sync - HC --> CHECK_FRESH{is_fresh_sync?} - CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] - L1 --> RCA[recompute_actions_for_relay] - end - - RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] - SUB --> PB[Create PendingBatch] - PB --> EOSE[Wait for EOSE] - EOSE --> CONFIRM[Move items to confirmed repos/root_events] -``` - -**Key points:** - -- No `since` filter on initial connect - get full history -- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` -- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since -- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking - -### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes - -```mermaid -flowchart TB - DISC[Connection lost] --> MARK[Set disconnected_at = now] - MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] - CLEAR_PEND --> WAIT[Wait for reconnection] - WAIT --> RECONN[Connection restored] - RECONN --> HC[handle_connect_or_reconnect] - - subgraph handle_connect_or_reconnect - Quick Reconnect - HC --> CHECK{is_fresh_sync?} - CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] - SINCE --> L1[build_announcement_filter - with since] - L1 --> L23[rebuild_layer2_and_layer3 - with since] - L23 --> RCA[recompute_actions_for_relay] - end - - RCA --> AF[AddFilters for new items only] - AF --> SUB[Subscribe] - SUB --> PB[Create PendingBatch] - PB --> EOSE[Wait for EOSE] - EOSE --> EXTEND[Extend confirmed state] -``` - -**Key points:** - -- PendingSyncIndex cleared on disconnect (not reconnect) -- `handle_connect_or_reconnect`: - 1. `build_announcement_filter(Some(since))` - Layer 1 with since - 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since - 3. `recompute_actions_for_relay` - check for new items -- since = last_connected - 15min ensures we catch events during disconnection - -### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes - -```mermaid -flowchart TB - RECONN[Connection restored] --> HC[handle_connect_or_reconnect] - - subgraph handle_connect_or_reconnect - Stale Reconnect - HC --> CHECK{is_fresh_sync?} - CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] - CLEAR --> L1[build_announcement_filter - no since] - L1 --> RCA[recompute_actions_for_relay] - end - - RCA --> CA[compute_actions with empty confirmed] - CA --> AF[AddFilters for everything] - AF --> SUB[Subscribe - no since filter] - SUB --> PB[Create PendingBatch] - PB --> EOSE[Wait for EOSE] - EOSE --> CONFIRM[Populate confirmed state fresh] -``` - -**Key points:** - -- `should_clear_state()` returns true → triggers fresh sync -- Same path as initial connect after clearing state -- Layer 1: `build_announcement_filter(None)` - full history -- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything - -### Scenario 4: Consolidation - Triggered on Filter Add - -```mermaid -flowchart TB - AF[handle_add_filters called] --> COUNT{current + new > 70?} - COUNT --> |yes| CONSOLIDATE[consolidate] - CONSOLIDATE --> WAIT_PEND[wait_pending_complete] - WAIT_PEND --> CLOSE[unsubscribe_all] - CLOSE --> SINCE[since = now - 15min] - SINCE --> L1[build_announcement_filter - with since] - L1 --> L23[rebuild_layer2_and_layer3 - with since] - COUNT --> |no| SUB[Subscribe new filters] - SUB --> PB[Create PendingBatch] -``` - -**Key points:** - -- Consolidation checked in `handle_add_filters` BEFORE adding new filters -- After closing all subscriptions, re-subscribe: - 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since - 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since -- `since = now - 15min` prevents re-fetching old events -- Keeps confirmed state, just reduces filter count - -### Scenario 5: Daily Timer - 23 to 25h Random - -```mermaid -flowchart TB - DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] - CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] - CLEAR_PEND --> CLEAR_STATE[clear_sync_state] - CLEAR_STATE --> L1[build_announcement_filter - no since] - L1 --> RCA[recompute_actions_for_relay] - RCA --> CA[compute_actions with empty confirmed] - CA --> AF[AddFilters for everything] - AF --> SUB[Subscribe - no since filter] - SUB --> PB[Create PendingBatch] - PB --> EOSE[Wait for EOSE] - EOSE --> CONFIRM[Repopulate confirmed state] -``` - -**Key points:** - -- Daily timer is a full fresh sync, NOT consolidation -- Clears both PendingSyncIndex and confirmed state -- Layer 1: `build_announcement_filter(None)` - full history -- Layer 2+3: via compute_actions with empty confirmed - full history -- Detects any state drift accumulated over 24 hours - ---- - -## Core Algorithms - -### 1. derive_relay_targets - -Transform RepoSyncIndex into per-relay sync targets: - -```rust -/// Inverts RepoSyncIndex to get per-relay view -fn derive_relay_targets( - repo_index: &HashMap -) -> HashMap { - let mut targets: HashMap = HashMap::new(); - - for (repo_ref, needs) in repo_index { - for relay_url in &needs.relays { - let target = targets.entry(relay_url.clone()).or_default(); - target.repos.insert(repo_ref.clone()); - target.root_events.extend(needs.root_events.iter().cloned()); - } - } - - targets -} -``` - -### 2. compute_actions (Three-Way Diff) - -**This is the ONLY decision point for what NEW subscriptions to create.** - -```rust -/// Computes AddFilters for items that are: -/// - In targets (what we want) -/// - NOT in pending (already in-flight) -/// - NOT in confirmed (already confirmed) -fn compute_actions( - targets: &HashMap, - pending: &HashMap>, - confirmed: &HashMap, -) -> Vec { - let mut actions = Vec::new(); - - for (relay_url, target) in targets { - // Skip disconnected relays - they will get AddFilters on reconnect - if let Some(state) = confirmed.get(relay_url) { - if state.connection_status != ConnectionStatus::Connected { - continue; - } - } - - // Collect all pending items for this relay - let pending_repos: HashSet<_> = pending.get(relay_url) - .map(|batches| batches.iter() - .flat_map(|b| b.items.repos.iter().cloned()) - .collect()) - .unwrap_or_default(); - let pending_events: HashSet<_> = pending.get(relay_url) - .map(|batches| batches.iter() - .flat_map(|b| b.items.root_events.iter().cloned()) - .collect()) - .unwrap_or_default(); - - // Collect confirmed items for this relay - let confirmed_repos = confirmed.get(relay_url) - .map(|c| &c.repos) - .unwrap_or(&HashSet::new()); - let confirmed_events = confirmed.get(relay_url) - .map(|c| &c.root_events) - .unwrap_or(&HashSet::new()); - - // New = target - pending - confirmed - let new_repos: HashSet<_> = target.repos.iter() - .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) - .cloned() - .collect(); - let new_events: HashSet<_> = target.root_events.iter() - .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) - .cloned() - .collect(); - - if !new_repos.is_empty() || !new_events.is_empty() { - let filters = build_filters(&new_repos, &new_events); - actions.push(AddFilters { - relay_url: relay_url.clone(), - repos: new_repos, - root_events: new_events, - filters, - }); - } - } - - actions -} -``` - -### 3. Filter Building Functions (Three-Layer Strategy) - -The filter strategy uses three layers: - -- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation -- **Layer 2**: Events tagging our repos -- **Layer 3**: Events tagging our root events - -**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). - -```rust -/// Layer 1: Announcements filter (kinds 30617 + 30618) -/// Subscribed ONCE on connect - NOT included in consolidation rebuilds. -/// Note: 30618 is ONLY synced from remote relays, not self-subscribed. -fn build_announcement_filter(since: Option) -> Filter { - let filter = Filter::new().kinds([ - Kind::Custom(30617), // Repository announcements - Kind::Custom(30618), // Maintainer lists - ]); - - match since { - Some(ts) => filter.since(ts), - None => filter, - } -} - -/// Layer 2: Events tagging one of our repos -/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. -/// Batched per 100 repo refs. -fn tagged_one_of_our_repo_event_filters( - repos: &HashSet, - since: Option, -) -> Vec { - let mut filters = Vec::new(); - let repo_refs: Vec<_> = repos.iter().collect(); - - for chunk in repo_refs.chunks(100) { - let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); - - // Lowercase 'a' tag - standard addressable reference - let mut f1 = Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); - // Uppercase 'A' tag - some clients use this - let mut f2 = Filter::new() - .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone()); - // Quote 'q' tag - NIP-10 quote references to addressable events - let mut f3 = Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); - - if let Some(ts) = since { - f1 = f1.since(ts); - f2 = f2.since(ts); - f3 = f3.since(ts); - } - - filters.push(f1); - filters.push(f2); - filters.push(f3); - } - - filters -} - -/// Layer 3: Events tagging one of our root events -/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage. -/// Batched per 100 event IDs. -fn tagged_one_of_our_root_event_filters( - root_events: &HashSet, - since: Option, -) -> Vec { - let mut filters = Vec::new(); - let event_ids: Vec = root_events.iter().map(|id| id.to_hex()).collect(); - - for chunk in event_ids.chunks(100) { - let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); - - // Lowercase 'e' tag - standard event reference - let mut f1 = Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); - // Uppercase 'E' tag - some clients use this - let mut f2 = Filter::new() - .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone()); - // Quote 'q' tag - NIP-10 quote references to events - let mut f3 = Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); - - if let Some(ts) = since { - f1 = f1.since(ts); - f2 = f2.since(ts); - f3 = f3.since(ts); - } - - filters.push(f1); - filters.push(f2); - filters.push(f3); - } - - filters -} - -/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1) -/// Used by: -/// - compute_actions for incremental subscriptions -/// - consolidation rebuilds (Layer 1 remains active) -fn build_layer2_and_layer3_filters( - repos: &HashSet, - root_events: &HashSet, - since: Option, -) -> Vec { - let mut filters = Vec::new(); - filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); - filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); - filters -} -``` - -**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently. - -### 4. handle_add_filters (SyncManager) - -```rust -impl SyncManager { - async fn handle_add_filters(&mut self, action: AddFilters) { - let AddFilters { relay_url, repos, root_events, filters } = action; - - // Auto-spawn connection if needed - let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); - match state { - None => { - // New relay discovered - create entry and spawn connection - self.relay_sync_index.write().await.insert( - relay_url.clone(), - RelayState { - repos: HashSet::new(), - root_events: HashSet::new(), - is_bootstrap: false, - connection_status: ConnectionStatus::Connecting, - last_connected: None, - disconnected_at: None, - connection: None, - } - ); - self.spawn_connection(&relay_url).await; - return; // Subscriptions will happen on connection success - } - Some(state) if state.connection_status != ConnectionStatus::Connected => { - // Not connected - subscriptions will happen on connection success - return; - } - Some(_) => { - // Already connected - proceed with subscription - } - } - - // Subscribe and collect subscription IDs - let conn = self.connections.get(&relay_url).unwrap(); - let mut sub_ids = HashSet::new(); - - for filter in filters { - match conn.client.subscribe(filter, None).await { - Ok(output) => { - for sub_id in output.val { - sub_ids.insert(sub_id); - } - } - Err(e) => { - tracing::warn!("Failed to subscribe: {}", e); - } - } - } - - // Create pending batch - let batch = PendingBatch { - batch_id: self.next_batch_id(), - items: PendingItems { repos, root_events }, - outstanding_subs: sub_ids, - }; - - // Add to pending index - self.pending_sync_index.write().await - .entry(relay_url) - .or_default() - .push(batch); - } -} -``` - -### 5. handle_disconnect - -```rust -impl SyncManager { - /// Called when connection to a relay is lost - async fn handle_disconnect(&mut self, relay_url: &str) { - let mut index = self.relay_sync_index.write().await; - - if let Some(state) = index.get_mut(relay_url) { - state.connection_status = ConnectionStatus::Disconnected; - state.disconnected_at = Some(Timestamp::now()); - state.connection = None; - } - - // Clear pending batches - these items were not confirmed - self.pending_sync_index.write().await.remove(relay_url); - - // Remove from active connections map - self.connections.remove(relay_url); - - // Health tracker records failure for backoff - self.health_tracker.record_failure(relay_url); - } -} -``` - -### 6. handle_connect_or_reconnect (Unified) - -This method handles BOTH initial connection AND reconnection with unified logic: - -```rust -impl SyncManager { - /// Called when connection to a relay succeeds - handles both initial connect and reconnect. - /// - /// Decision tree: - /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history - /// - Quick reconnect (<15min): since = last_connected - 15min - async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { - let mut index = self.relay_sync_index.write().await; - let state = match index.get_mut(relay_url) { - Some(s) => s, - None => return, // Relay was removed while disconnected - }; - - // Determine if this is a fresh sync or quick reconnect - let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); - let last_connected = state.last_connected; - - if is_fresh_sync && state.last_connected.is_some() { - // Stale reconnect (>15min) - clear state - tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); - state.clear_sync_state(); - } - - // Update connection state - state.connection_status = ConnectionStatus::Connected; - state.last_connected = Some(Timestamp::now()); - state.disconnected_at = None; - - // Record success in health tracker - self.health_tracker.record_success(relay_url); - - drop(index); // Release lock - - let conn = match self.connections.get(relay_url) { - Some(c) => c, - None => return, - }; - - if is_fresh_sync { - // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions - - // Step 1: Subscribe Layer 1 (announcements) without since - let layer1 = build_announcement_filter(None); - let _ = conn.client.subscribe(layer1, None).await; - - // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) - self.recompute_actions_for_relay(relay_url).await; - } else { - // Quick reconnect: Layer 1 with since, Layer 2+3 with since - let since = last_connected - .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) - .unwrap_or(Timestamp::from(0)); - - // Step 1: Subscribe Layer 1 (announcements) with since - let layer1 = build_announcement_filter(Some(since)); - let _ = conn.client.subscribe(layer1, None).await; - - // Step 2: Rebuild Layer 2+3 for confirmed items with since - self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; - - // Step 3: Check for NEW items via compute_actions - self.recompute_actions_for_relay(relay_url).await; - } - } - - /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). - /// Used by: - /// - Quick reconnect: rebuild confirmed items with since filter - /// - Consolidation: close and rebuild with since filter - async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option) { - let confirmed = self.relay_sync_index.read().await; - let state = match confirmed.get(relay_url) { - Some(s) => s, - None => return, - }; - - // Build Layer 2+3 filters WITH since - let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); - drop(confirmed); - - // Subscribe directly - no PendingBatch for catch-up (items already confirmed) - let conn = match self.connections.get(relay_url) { - Some(c) => c, - None => return, - }; - - for filter in filters { - let _ = conn.client.subscribe(filter, None).await; - } - } - - /// Rerun compute_actions for a specific relay and process resulting AddFilters. - /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. - async fn recompute_actions_for_relay(&mut self, relay_url: &str) { - let repo_index = self.repo_sync_index.read().await; - let targets = derive_relay_targets(&repo_index); - drop(repo_index); - - // Filter to just this relay - let target = match targets.get(relay_url) { - Some(t) => t.clone(), - None => return, // No repos reference this relay anymore - }; - - let pending = self.pending_sync_index.read().await; - let confirmed = self.relay_sync_index.read().await; - - let mut single_relay_targets = HashMap::new(); - single_relay_targets.insert(relay_url.to_string(), target); - - let actions = compute_actions(&single_relay_targets, &pending, &confirmed); - - drop(pending); - drop(confirmed); - - // Process AddFilters - for action in actions { - self.handle_add_filters(action).await; - } - } -} -``` - -### 7. Daily Timer - -```rust -impl SyncManager { - async fn run_daily_timer(&self) { - loop { - // Random 23-25 hours - let hours = 23.0 + rand::random::() * 2.0; - tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; - - let relay_urls: Vec<_> = self.relay_sync_index.read().await - .keys() - .cloned() - .collect(); - - for relay_url in relay_urls { - self.daily_sync(&relay_url).await; - } - } - } - - /// Perform daily fresh sync for a relay - async fn daily_sync(&mut self, relay_url: &str) { - tracing::info!("Daily sync triggered for {}", relay_url); - - // Close all subscriptions - if let Some(conn) = self.connections.get(relay_url) { - conn.client.unsubscribe_all().await; - } - - // Clear PendingSyncIndex - self.pending_sync_index.write().await.remove(relay_url); - - // Clear confirmed state - triggers fresh sync - { - let mut index = self.relay_sync_index.write().await; - if let Some(state) = index.get_mut(relay_url) { - state.clear_sync_state(); - } - } - - // Recompute actions - will generate AddFilters for everything - self.recompute_actions_for_relay(relay_url).await; - } -} -``` - -### 8. Consolidation (Threshold-Based, Triggered on Add) - -Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active. - -```rust -impl SyncManager { - /// Check filter count and consolidate if needed. - /// Called from handle_add_filters BEFORE adding new filters. - async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { - let current_count = self.get_filter_count(relay_url).await; - - if current_count + new_filter_count > 70 { - self.consolidate(relay_url).await; - } - } - - /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. - /// Does NOT clear state - just reduces filter count. - async fn consolidate(&mut self, relay_url: &str) { - tracing::info!("Consolidating filters for {} (count > 70)", relay_url); - - // Wait for all pending batches to complete first - self.wait_pending_complete(relay_url).await; - - // Close Layer 2+3 subscriptions only - Layer 1 remains active - // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately - // For simplicity, we close all and re-add Layer 1 - if let Some(conn) = self.connections.get(relay_url) { - conn.client.unsubscribe_all().await; - } - - // Re-subscribe Layer 1 with since (maintains announcements stream) - let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); - let conn = self.connections.get(relay_url).unwrap(); - let layer1 = build_announcement_filter(Some(since)); - let _ = conn.client.subscribe(layer1, None).await; - - // Rebuild Layer 2+3 only - self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; - } -} -``` - -**Updated handle_add_filters to check consolidation:** - -```rust -impl SyncManager { - async fn handle_add_filters(&mut self, action: AddFilters) { - let AddFilters { relay_url, repos, root_events, filters } = action; - - // Auto-spawn connection if needed (unchanged) - let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); - match state { - None => { - // New relay discovered - create entry and spawn connection - self.relay_sync_index.write().await.insert( - relay_url.clone(), - RelayState { - repos: HashSet::new(), - root_events: HashSet::new(), - is_bootstrap: false, - connection_status: ConnectionStatus::Connecting, - last_connected: None, - disconnected_at: None, - connection: None, - } - ); - self.spawn_connection(&relay_url).await; - return; // Subscriptions will happen on connection success - } - Some(state) if state.connection_status != ConnectionStatus::Connected => { - return; // Not connected - subscriptions will happen on connection success - } - Some(_) => { - // Already connected - proceed - } - } - - // CHECK CONSOLIDATION BEFORE ADDING - self.maybe_consolidate(&relay_url, filters.len()).await; - - // Subscribe and collect subscription IDs - let conn = self.connections.get(&relay_url).unwrap(); - let mut sub_ids = HashSet::new(); - - for filter in filters { - match conn.client.subscribe(filter, None).await { - Ok(output) => { - for sub_id in output.val { - sub_ids.insert(sub_id); - } - } - Err(e) => { - tracing::warn!("Failed to subscribe: {}", e); - } - } - } - - // Create pending batch (unchanged) - let batch = PendingBatch { - batch_id: self.next_batch_id(), - items: PendingItems { repos, root_events }, - outstanding_subs: sub_ids, - }; - - self.pending_sync_index.write().await - .entry(relay_url) - .or_default() - .push(batch); - } -} -``` - ---- - -## Disconnect (Relay Removal) Handling - -```rust -impl SyncManager { - /// Periodically check for relays that should be disconnected - async fn check_disconnects(&mut self) { - let confirmed = self.relay_sync_index.read().await; - let relays_to_disconnect: Vec<_> = confirmed.iter() - .filter(|(_, state)| { - !state.is_bootstrap && - state.repos.is_empty() && - state.root_events.is_empty() - }) - .map(|(url, _)| url.clone()) - .collect(); - drop(confirmed); - - for relay_url in relays_to_disconnect { - self.disconnect_relay(&relay_url).await; - } - } - - async fn disconnect_relay(&mut self, relay_url: &str) { - tracing::info!("Disconnecting relay {} (no repos)", relay_url); - - self.relay_sync_index.write().await.remove(relay_url); - self.pending_sync_index.write().await.remove(relay_url); - - if let Some(conn) = self.connections.remove(relay_url) { - let _ = conn.client.disconnect().await; - } - } -} -``` - ---- - -## State Flow Summary - -```mermaid -flowchart TB - subgraph Input - SS[SelfSubscriber] - OWN[Own Relay] - end - - subgraph RepoSyncIndex - What We Want - RSI[HashMap: Repo to Relays+Events] - end - - subgraph Derived Target - DT[derive_relay_targets fn] - TGT[Per-relay: repos + events we should sync] - end - - subgraph compute_actions - Decision Point - CA[Three-way diff: target - pending - confirmed] - end - - subgraph PendingSyncIndex - In Flight - PSI[Vec PendingBatch per relay] - end - - subgraph RelaySyncIndex - Confirmed State - RLI[RelayState per relay] - CONN[connection_status] - REPOS[repos + root_events] - TIMES[last_connected + disconnected_at] - end - - SS -->|subscribe| OWN - OWN -->|events| SS - SS -->|batch fires| RSI - RSI --> DT - DT --> TGT - TGT --> CA - PSI --> CA - RLI --> CA - CA -->|Layer 2+3 new items| AF[AddFilters] - AF -->|check filter count| CONSOL{count + new > 70?} - CONSOL -->|yes| CONSOLIDATE[consolidate] - CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] - L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] - CONSOL -->|no| SUB[subscribe] - AF -->|spawn if needed| CONN - SUB --> PSI - PSI -->|EOSE| REPOS - - CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] - DISC -->|any reconnect| HC[handle_connect_or_reconnect] - - subgraph handle_connect_or_reconnect - HC --> FRESH_CHECK{is_fresh_sync?} - FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] - FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since] - L1_FRESH --> RCA1[recompute_actions_for_relay] - L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since] - L23_QUICK --> RCA2[recompute_actions_for_relay] - end -``` - ---- - -## Key Design Decisions - -| Decision | Choice | Rationale | -| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | -| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | -| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | -| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | -| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | -| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | -| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | -| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | -| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | -| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | -| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | -| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | -| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | -| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | -| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | -| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | -| Connection spawning | Via AddFilters handler | Single path for new relay discovery | -| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | - ---- - -## Module Structure - -``` -src/sync/ -├── mod.rs # SyncManager, main loop -├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types -├── actions.rs # AddFilters struct, compute_actions, build_filters -├── self_subscriber.rs # SelfSubscriber, batching logic -├── relay_connection.rs # Per-relay WebSocket connection -├── consolidation.rs # Consolidation logic, daily timer -├── health.rs # Health tracking (reuse from v2) -└── metrics.rs # Prometheus metrics (reuse from v2) -``` - ---- - -## Comparison: v3 vs v4 - -| Aspect | v3 | v4 | -| ------------------------ | ----------------------------------------- | --------------------------------------------- | -| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | -| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | -| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | -| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | -| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | -| Since filter application | Applied in handle_reconnect | build_all_filters with optional since | -| PSI clearing | On disconnect | On disconnect (confirmed) | -| Daily timer | Consolidation-style | Fresh sync (different from consolidation) | - ---- - -## Self-Subscriber Flow - -The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions. - -### State Tracking - -```rust -pub struct SelfSubscriber { - own_relay_url: String, - relay_domain: String, - repo_sync_index: RepoSyncIndex, - pending_sync_index: PendingSyncIndex, - relay_sync_index: RelaySyncIndex, - action_tx: mpsc::Sender, - /// Timestamp of last successful connection - used for since filter on reconnection - last_connected: Option, - /// The active client connection - client: Option, -} -``` - -### On Startup / Reconnect (Unified) - -Both initial startup and reconnection use the same `connect_and_subscribe` method: - -```rust -impl SelfSubscriber { - async fn run(mut self) { - loop { - // Connect or reconnect - if let Err(e) = self.connect_and_subscribe().await { - tracing::warn!("Connection failed: {}, will retry", e); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - - // Run event loop until disconnection - self.event_loop().await; - - // Loop will retry connection - } - } - - async fn connect_and_subscribe(&mut self) -> Result<(), Error> { - let client = Client::new(Keys::generate()); - client.add_relay(&self.own_relay_url).await?; - client.connect().await; - - // Build filter - add since only on reconnect - let filter = Filter::new().kinds([ - Kind::Custom(30617), // Repository announcements - Kind::GitPatch, // 1617 - Kind::Custom(1618), // PRs - Kind::Custom(1619), // PR updates - Kind::GitIssue, // 1621 - ]); - - let filter = if let Some(ts) = self.last_connected { - // Reconnection: use since filter - let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer - filter.since(since) - } else { - // Initial connect: no since filter - get full history - filter - }; - - // Update last_connected AFTER computing since - self.last_connected = Some(Timestamp::now()); - - client.subscribe(filter, None).await?; - self.client = Some(client); - Ok(()) - } -} -``` - -### Event Loop with Batching - -```rust -impl SelfSubscriber { - async fn event_loop(&mut self) { - let client = self.client.as_ref().unwrap(); - let mut pending_events: Vec = Vec::new(); - let mut batch_timer: Option = None; - let batch_window = Duration::from_secs(5); - - loop { - let timeout = batch_timer - .map(|t| batch_window.saturating_sub(t.elapsed())) - .unwrap_or(Duration::from_secs(60)); - - tokio::select! { - notification = client.notifications().recv() => { - match notification { - Ok(RelayPoolNotification::Event { event, .. }) => { - pending_events.push(*event); - - // Start timer on first event - does NOT reset - if batch_timer.is_none() { - batch_timer = Some(Instant::now()); - } - } - Ok(RelayPoolNotification::Shutdown) => { - // Connection lost - break; - } - _ => {} - } - } - _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { - // Batch window elapsed - self.process_batch(pending_events.drain(..).collect()).await; - batch_timer = None; - } - } - } - } - - async fn process_batch(&self, events: Vec) { - // 1. Update RepoSyncIndex - for event in events { - match event.kind.as_u16() { - 30617 => self.handle_announcement(&event).await, - 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, - _ => {} - } - } - - // 2. Derive targets and compute actions - let repo_index = self.repo_sync_index.read().await; - let targets = derive_relay_targets(&repo_index); - - let pending = self.pending_sync_index.read().await; - let confirmed = self.relay_sync_index.read().await; - - let actions = compute_actions(&targets, &pending, &confirmed); - - drop(repo_index); - drop(pending); - drop(confirmed); - - // 3. Send actions to SyncManager - for action in actions { - let _ = self.action_tx.send(action).await; - } - } - - async fn handle_announcement(&self, event: &Event) { - // Extract repo_ref from event - 30617:pubkey:identifier - let d_tag = event.tags.iter() - .find_map(|tag| { - if tag.kind() == TagKind::D { - tag.content().map(|s| s.to_string()) - } else { - None - } - }) - .unwrap_or_default(); - - let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); - - // Extract relay URLs from 'r' tags - let relays: HashSet = event.tags.iter() - .filter_map(|tag| { - if tag.kind() == TagKind::Relay { - tag.content().map(|s| s.to_string()) - } else { - None - } - }) - .collect(); - - // Update RepoSyncIndex - let mut index = self.repo_sync_index.write().await; - let needs = index.entry(repo_ref).or_default(); - needs.relays = relays; - } - - async fn handle_root_event(&self, event: &Event) { - // Extract repo_ref from 'a' tag - let repo_ref = event.tags.iter() - .find_map(|tag| { - if tag.kind() == TagKind::A { - tag.content().map(|s| s.to_string()) - } else { - None - } - }); - - if let Some(repo_ref) = repo_ref { - let mut index = self.repo_sync_index.write().await; - let needs = index.entry(repo_ref).or_default(); - needs.root_events.insert(event.id); - } - } -} -``` - ---- - -## Implementation Notes - -This section documents the actual implementation details as of December 2024 (Phases 1-10 complete). - -### Architectural Decisions During Implementation - -**Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup: - -```rust -// 7. Wrap self in Arc for sharing with timer task -let sync_manager = Arc::new(Mutex::new(self)); -``` - -This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. - -**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: - -- `DashMap` for thread-safe concurrent access without external locking -- Three states: `Healthy`, `Degraded`, `Dead` -- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff -- Dead threshold: 24 hours of continuous failures -- Dead relay retry: Once per 24 hours - -### Implementation Constants - -| Constant | Value | Purpose | -| --------------------------------- | ---------- | ------------------------------------------------ | -| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | -| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | -| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | -| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | -| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | -| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | - -### Daily Timer Randomization - -The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running: - -```rust -let hours = 23.0 + rand::thread_rng().gen::() * 2.0; -``` - -### Bootstrap Relay Protection - -Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out: - -```rust -.filter(|(_, state)| { - !state.is_bootstrap && - state.repos.is_empty() && - state.root_events.is_empty() -}) -``` - -### Graceful Shutdown - -Shutdown uses a tokio broadcast channel for coordinated termination: - -```rust -let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); -``` - -Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop. - -### Actual Module Structure - -The implemented module structure differs from the original spec: - -``` -src/sync/ -├── mod.rs # SyncManager, main loop, index types, metrics -├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters -├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters -├── health.rs # RelayHealthTracker, HealthState, exponential backoff -├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling -└── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic -``` - -Key differences from spec: - -- No separate `state.rs` - types are defined in `mod.rs` -- No separate `actions.rs` - moved to `algorithms.rs` -- No separate `consolidation.rs` - consolidation logic in `mod.rs` -- No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs` - -### Deviations from Original v4 Spec - -1. **RelayState lacks `connection` field**: The spec showed `connection: Option` in `RelayState`, but the implementation stores connections in a separate `HashMap` in `SyncManager`. - -2. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct. - -3. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. - -4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index c07f07c..dd508b3 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md @@ -1,929 +1,1330 @@ -# GRASP-02: Proactive Sync - Design Document +# GRASP-02: Proactive Sync v4 - Health & Reconnection Design ## Overview -GRASP-02 Proactive Sync enables ngit-grasp to maintain live WebSocket connections to other relays listed in repository announcement events, synchronizing NIP-34 related events using both **live sync** (real-time subscriptions) and **negentropy catchup** (NIP-77 set reconciliation). +This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles: -This document covers **event syncing only**. Git data syncing is out of scope for this phase. +1. **Self-subscription as the only mechanism** - No database initialization at startup +2. **compute_actions as single decision point** - Determines what NEW subscriptions to create +3. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) +4. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch +5. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary -## Goals +--- -1. **Data Availability**: Ensure we have all relevant events for repositories we host -2. **Resilience**: Handle relay failures gracefully with backoff and health tracking -3. **Efficiency**: Minimize connections and bandwidth through filter consolidation -4. **Consistency**: Use unified filters for both live sync and negentropy catchup +## Data Model -## Architecture Overview +### RepoSyncIndex (Source of Truth) -```mermaid -flowchart TB - subgraph ngit-grasp - subgraph SyncManager - SS[Self-Subscriber] - RC[Remote Connections] - end - WS[WebSocket Server] - FS[FilterService] - RH[RelayHealthTracker] - DB[(Database)] - AP[AcceptancePolicy] - MET[Prometheus Metrics] - end +```rust +/// What we WANT to sync - derived from events received via self-subscription. +/// Updated immediately when self-subscriber batch fires. +/// Key: repo addressable ref - 30617:pubkey:identifier +pub type RepoSyncIndex = Arc>>; + +#[derive(Debug, Clone, Default)] +pub struct RepoSyncNeeds { + /// Relay URLs listed in this repo's 30617 announcement + pub relays: HashSet, + /// Root event IDs - 1617/1618/1619/1621 - that reference this repo + pub root_events: HashSet, +} +``` - subgraph External Relays - R1[relay.example.com] - R2[other-grasp.io] - R3[nostr.land] - end +### RelaySyncIndex (Confirmed State + Connection) - WS -->|broadcasts events| SS - SS -->|discovers relays| RC - RC -->|builds filters| FS - RC -->|tracks health| RH - RC -->|stores events| DB - RC -->|validates| AP +```rust +/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. +/// Key: relay URL +pub type RelaySyncIndex = Arc>>; + +/// Connection status for a relay +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionStatus { + /// Not currently connected + Disconnected, + /// Connection attempt in progress + Connecting, + /// Successfully connected and subscribed + Connected, +} + +/// Complete state for a single relay - combines sync needs with connection lifecycle +#[derive(Debug)] +pub struct RelayState { + /// Repos we have confirmed syncing from this relay + pub repos: HashSet, + /// Root events we have confirmed tracking + pub root_events: HashSet, + /// If true, never disconnect this relay + pub is_bootstrap: bool, + /// Current connection status + pub connection_status: ConnectionStatus, + /// When we last successfully connected - used for since filter on reconnect + pub last_connected: Option, + /// When we disconnected - for 15-minute state retention rule + pub disconnected_at: Option, +} - RC <-->|WebSocket + NEG| R1 - RC <-->|WebSocket + NEG| R2 - RC <-->|WebSocket + NEG| R3 +impl RelayState { + /// Check if state should be cleared based on 15-minute rule + pub fn should_clear_state(&self) -> bool { + match self.disconnected_at { + Some(disconnected) => { + let now = Timestamp::now(); + now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes + } + None => false, // Still connected or never connected + } + } - RH -->|exposes state| MET + /// Clear repos and root_events - called when reconnect takes > 15 minutes + pub fn clear_sync_state(&mut self) { + self.repos.clear(); + self.root_events.clear(); + } +} ``` -**Key Insight: Self-Subscribe Architecture** +### PendingSyncIndex (In-Flight Batches) -The SyncManager uses a "self-subscribe" pattern for relay discovery. Rather than polling the database periodically, it connects to its own WebSocket server as a client and subscribes to kind 30617 events. When new announcements are saved (from any source), the self-subscriber receives them instantly and can spawn connections to newly discovered relays. +```rust +/// Tracks batches of subscriptions that are in-flight, awaiting EOSE. +/// Each batch has its own ID and can confirm independently. +/// Key: relay URL +pub type PendingSyncIndex = Arc>>>; + +#[derive(Debug, Clone)] +pub struct PendingBatch { + /// Unique ID for this batch - for debugging/logging + pub batch_id: u64, + /// The items this batch is syncing + pub items: PendingItems, + /// Subscription IDs that must ALL receive EOSE before confirming + pub outstanding_subs: HashSet, +} -## Connection Management +#[derive(Debug, Clone, Default)] +pub struct PendingItems { + pub repos: HashSet, + pub root_events: HashSet, +} +``` -### Relay Discovery +--- -Relays to connect to are discovered using a **self-subscribe architecture** rather than periodic polling. The SyncManager connects to its own relay as a client and subscribes to kind 30617 (repository announcement) events. When a new announcement is saved to the database (from direct submission or sync), the self-subscriber receives it immediately and discovers new relays to connect to. +## Connection Lifecycle State Machine ```mermaid -flowchart LR - subgraph Relay - WS[WebSocket Server] - DB[(Database)] - end - - subgraph SyncManager - SS[Self-Subscribe Client] - RC[Remote Connections] - end - - WS -->|broadcast| SS - SS -->|extract relay URLs| RC - RC -->|sync events| WS +stateDiagram-v2 + [*] --> Disconnected: discover relay via RepoSyncIndex + Disconnected --> Connecting: AddFilters triggers spawn_connection + Connecting --> Connected: success + Connecting --> Disconnected: failure + record in health tracker + Connected --> Disconnected: connection lost + Connected --> [*]: intentional disconnect via check_disconnects + + note right of Disconnected: disconnected_at set for 15min rule + note right of Connected: last_connected tracked for since filter ``` -**Why Self-Subscribe vs Polling?** +--- -| Approach | Latency | Complexity | Resource Use | -|----------|---------|------------|--------------| -| Self-Subscribe | Instant | Low | Minimal (1 WS connection) | -| Periodic Polling | 30s+ delay | Higher | DB queries every N seconds | +## Flow Scenarios -The self-subscribe approach provides: -- **Immediate discovery**: New relays discovered instantly when announcement saved -- **No polling overhead**: No periodic database queries -- **Simple architecture**: Reuses existing WebSocket infrastructure +### Scenario 1: Initial Connect via handle_connect_or_reconnect -**Implementation Pattern:** +```mermaid +flowchart TB + START[Startup] --> SS[Self-subscribe to own relay] + SS --> |no since filter| EVENTS[Receive historical events] + EVENTS --> RSI[Update RepoSyncIndex] + RSI --> DT[derive_relay_targets] + DT --> CA[compute_actions with targets and empty confirmed] + CA --> AF[AddFilters for each relay] + AF --> SPAWN{Relay connected?} + SPAWN --> |no| CONN[spawn_connection] + CONN --> HC[handle_connect_or_reconnect] + SPAWN --> |yes| SUB + + subgraph handle_connect_or_reconnect - Fresh Sync + HC --> CHECK_FRESH{is_fresh_sync?} + CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] + L1 --> RCA[recompute_actions_for_relay] + end -```rust -// In SyncManager::run() -let self_client = Client::default(); -self_client.add_relay(&own_relay_url).await?; -self_client.connect().await; - -let filter = Filter::new().kind(Kind::Custom(30617)); -self_client.subscribe(filter, None).await?; - -// Handle notifications - when announcement arrives, extract relay URLs -client.handle_notifications(|notification| async { - if let RelayPoolNotification::Event { event, .. } = notification { - let new_urls = filter_service.extract_relay_urls_from_event(&event); - for url in new_urls { - if !active_relays.contains(&url) && !is_own_relay(&url) { - spawn_connection(url, tx.clone(), filter_service.clone()); - } - } - } - Ok(false) // Continue processing -}); + RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> CONFIRM[Move items to confirmed repos/root_events] ``` -**Startup Discovery:** At startup, existing announcements in the database are queried once to discover initial relays. After startup, all discovery is event-driven via self-subscribe. +**Key points:** -**Reconnection:** The self-subscriber has built-in exponential backoff reconnection (1s → 60s max) to handle temporary disconnections from our own relay. +- No `since` filter on initial connect - get full history +- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` +- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since +- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking -### Connection Lifecycle +### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes ```mermaid -stateDiagram-v2 - [*] --> Connecting: startup/new relay - Connecting --> Connected: success - Connecting --> Backoff: failure - Connected --> Disconnected: connection lost - Disconnected --> Backoff: reconnect failed - Backoff --> Connecting: backoff timer expires - Backoff --> Dead: 24h continuous failures - Dead --> Connecting: daily retry timer - Connected --> Updating: filter change - Updating --> Connected: complete +flowchart TB + DISC[Connection lost] --> MARK[Set disconnected_at = now] + MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] + CLEAR_PEND --> WAIT[Wait for reconnection] + WAIT --> RECONN[Connection restored] + RECONN --> HC[handle_connect_or_reconnect] + + subgraph handle_connect_or_reconnect - Quick Reconnect + HC --> CHECK{is_fresh_sync?} + CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] + SINCE --> L1[build_announcement_filter - with since] + L1 --> L23[rebuild_layer2_and_layer3 - with since] + L23 --> RCA[recompute_actions_for_relay] + end + + RCA --> AF[AddFilters for new items only] + AF --> SUB[Subscribe] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> EXTEND[Extend confirmed state] ``` -### Health Tracking & Backoff +**Key points:** -| State | Behavior | -| ----------- | --------------------------------------------------- | -| **Healthy** | Normal operation, immediate reconnect on disconnect | -| **Backoff** | Exponential backoff: 5s → 10s → 20s → ... → 1h max | -| **Dead** | 24h of continuous failures, retry once per day | +- PendingSyncIndex cleared on disconnect (not reconnect) +- `handle_connect_or_reconnect`: + 1. `build_announcement_filter(Some(since))` - Layer 1 with since + 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since + 3. `recompute_actions_for_relay` - check for new items +- since = last_connected - 15min ensures we catch events during disconnection -Health state is **kept in-memory** using a `DashMap` for lock-free concurrent access: +### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes -```rust -/// In-memory relay health tracking (NOT persisted to database) -/// -/// Design rationale: For <100 relays, persistence adds complexity without -/// significant benefit. Conservative initial backoff on restart avoids -/// thundering herd issues. -struct RelayHealthTracker { - health: DashMap, - metrics: SyncMetrics, // Prometheus metrics for operator visibility -} +```mermaid +flowchart TB + RECONN[Connection restored] --> HC[handle_connect_or_reconnect] -struct RelayHealth { - url: RelayUrl, - status: RelayStatus, // Healthy, Backoff, Dead - consecutive_failures: u32, - last_failure_at: Option, - last_success_at: Option, - next_retry_at: Instant, -} + subgraph handle_connect_or_reconnect - Stale Reconnect + HC --> CHECK{is_fresh_sync?} + CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] + CLEAR --> L1[build_announcement_filter - no since] + L1 --> RCA[recompute_actions_for_relay] + end -enum RelayStatus { - Healthy, - Backoff { attempt: u32 }, // backoff = min(5 * 2^attempt, 3600) seconds - Dead, // retry in 24h -} + RCA --> CA[compute_actions with empty confirmed] + CA --> AF[AddFilters for everything] + AF --> SUB[Subscribe - no since filter] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> CONFIRM[Populate confirmed state fresh] ``` -### Restart Behavior (Graceful Degradation) +**Key points:** -On restart, all relay health state is reset. To avoid thundering herd: +- `should_clear_state()` returns true → triggers fresh sync +- Same path as initial connect after clearing state +- Layer 1: `build_announcement_filter(None)` - full history +- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything -1. **Conservative initial backoff**: Start with 5s delay (not immediate) for all relays -2. **Staggered connection attempts**: Add random jitter (0-2s) per relay -3. **Health rebuilds organically**: Relays prove themselves healthy through successful connections +### Scenario 4: Consolidation - Triggered on Filter Add -```rust -impl RelayHealthTracker { - fn new(metrics: SyncMetrics) -> Self { - Self { - health: DashMap::new(), - metrics, - } - } +```mermaid +flowchart TB + AF[handle_add_filters called] --> COUNT{current + new > 70?} + COUNT --> |yes| CONSOLIDATE[consolidate] + CONSOLIDATE --> WAIT_PEND[wait_pending_complete] + WAIT_PEND --> CLOSE[unsubscribe_all] + CLOSE --> SINCE[since = now - 15min] + SINCE --> L1[build_announcement_filter - with since] + L1 --> L23[rebuild_layer2_and_layer3 - with since] + COUNT --> |no| SUB[Subscribe new filters] + SUB --> PB[Create PendingBatch] +``` - /// Called on startup for each discovered relay - fn initialize_relay(&self, url: RelayUrl) { - self.health.insert(url.clone(), RelayHealth { - url, - status: RelayStatus::Backoff { attempt: 0 }, // Start conservative - consecutive_failures: 0, - last_failure_at: None, - last_success_at: None, - next_retry_at: Instant::now() + Self::initial_backoff_with_jitter(), - }); - } +**Key points:** - fn initial_backoff_with_jitter() -> Duration { - Duration::from_secs(5) + Duration::from_millis(rand::random::() % 2000) - } -} +- Consolidation checked in `handle_add_filters` BEFORE adding new filters +- After closing all subscriptions, re-subscribe: + 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since + 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since +- `since = now - 15min` prevents re-fetching old events +- Keeps confirmed state, just reduces filter count + +### Scenario 5: Daily Timer - 23 to 25h Random + +```mermaid +flowchart TB + DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] + CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] + CLEAR_PEND --> CLEAR_STATE[clear_sync_state] + CLEAR_STATE --> L1[build_announcement_filter - no since] + L1 --> RCA[recompute_actions_for_relay] + RCA --> CA[compute_actions with empty confirmed] + CA --> AF[AddFilters for everything] + AF --> SUB[Subscribe - no since filter] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> CONFIRM[Repopulate confirmed state] ``` -**Trade-off**: We lose knowledge of chronically failing relays across restarts. This is acceptable because: +**Key points:** + +- Daily timer is a full fresh sync, NOT consolidation +- Clears both PendingSyncIndex and confirmed state +- Layer 1: `build_announcement_filter(None)` - full history +- Layer 2+3: via compute_actions with empty confirmed - full history +- Detects any state drift accumulated over 24 hours -- Scale is small (<100 relays) -- Conservative initial backoff prevents hammering bad relays -- Prometheus metrics preserve historical health data for operators +--- -## Filter Strategy +## Core Algorithms -### Unified Filters for Live Sync and Negentropy +### 1. derive_relay_targets -The same filter logic is used for both live subscriptions and negentropy reconciliation: +Transform RepoSyncIndex into per-relay sync targets: -```mermaid -flowchart LR - subgraph Filter Layers - F1[Layer 1: All 30617+30618] - F2[Layer 2: Events tagging repos via A/a/q] - F3[Layer 3: Events tagging PRs/Issues via E/e/q] - end +```rust +/// Inverts RepoSyncIndex to get per-relay view +fn derive_relay_targets( + repo_index: &HashMap +) -> HashMap { + let mut targets: HashMap = HashMap::new(); + + for (repo_ref, needs) in repo_index { + for relay_url in &needs.relays { + let target = targets.entry(relay_url.clone()).or_default(); + target.repos.insert(repo_ref.clone()); + target.root_events.extend(needs.root_events.iter().cloned()); + } + } - F1 -->|client-side| AP[Acceptance Policy] - F2 -->|server-side| Relay - F3 -->|server-side| Relay + targets +} ``` -### Layer 1: Repository Announcements & States +### 2. compute_actions (Three-Way Diff) -Get ALL kind 30617 and 30618 events with unified `since` timestamp, then filter client-side through acceptance policy: +**This is the ONLY decision point for what NEW subscriptions to create.** ```rust -// Use same since filter as other layers for consistency -let layer1_filter = Filter::new() - .kinds([Kind::from(30617), Kind::from(30618)]) - .since(since_timestamp); // Unified with Layer 2/3 +/// Computes AddFilters for items that are: +/// - In targets (what we want) +/// - NOT in pending (already in-flight) +/// - NOT in confirmed (already confirmed) +fn compute_actions( + targets: &HashMap, + pending: &HashMap>, + confirmed: &HashMap, +) -> Vec { + let mut actions = Vec::new(); + + for (relay_url, target) in targets { + // Skip disconnected relays - they will get AddFilters on reconnect + if let Some(state) = confirmed.get(relay_url) { + if state.connection_status != ConnectionStatus::Connected { + continue; + } + } + + // Collect all pending items for this relay + let pending_repos: HashSet<_> = pending.get(relay_url) + .map(|batches| batches.iter() + .flat_map(|b| b.items.repos.iter().cloned()) + .collect()) + .unwrap_or_default(); + let pending_events: HashSet<_> = pending.get(relay_url) + .map(|batches| batches.iter() + .flat_map(|b| b.items.root_events.iter().cloned()) + .collect()) + .unwrap_or_default(); + + // Collect confirmed items for this relay + let confirmed_repos = confirmed.get(relay_url) + .map(|c| &c.repos) + .unwrap_or(&HashSet::new()); + let confirmed_events = confirmed.get(relay_url) + .map(|c| &c.root_events) + .unwrap_or(&HashSet::new()); + + // New = target - pending - confirmed + let new_repos: HashSet<_> = target.repos.iter() + .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) + .cloned() + .collect(); + let new_events: HashSet<_> = target.root_events.iter() + .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) + .cloned() + .collect(); + + if !new_repos.is_empty() || !new_events.is_empty() { + let filters = build_filters(&new_repos, &new_events); + actions.push(AddFilters { + relay_url: relay_url.clone(), + repos: new_repos, + root_events: new_events, + filters, + }); + } + } + + actions +} ``` -**Client-side validation**: Only store events that pass our [`Nip34WritePolicy`](src/nostr/builder.rs:51). +### 3. Filter Building Functions (Three-Layer Strategy) -### Layer 2: Events Tagging Repositories +The filter strategy uses three layers: -For repo announcements **that list BOTH this relay AND our service**: +- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation +- **Layer 2**: Events tagging our repos +- **Layer 3**: Events tagging our root events + +**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). ```rust -// Build addressable references: 30617:: -let repo_refs: Vec = announcements - .iter() - .filter(|a| a.relays.contains(&this_relay) && a.lists_service(&our_domain)) - .map(|a| format!("30617:{}:{}", a.pubkey.to_hex(), a.identifier)) - .collect(); - -let layer2_filter = Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_refs.clone()) - .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo_refs)); -``` +/// Layer 1: Announcements filter (kinds 30617 + 30618) +/// Subscribed ONCE on connect - NOT included in consolidation rebuilds. +/// Note: 30618 is ONLY synced from remote relays, not self-subscribed. +fn build_announcement_filter(since: Option) -> Filter { + let filter = Filter::new().kinds([ + Kind::Custom(30617), // Repository announcements + Kind::Custom(30618), // Maintainer lists + ]); + + match since { + Some(ts) => filter.since(ts), + None => filter, + } +} -### Layer 3: Events Tagging Issues/PRs/Patches +/// Layer 2: Events tagging one of our repos +/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. +/// Batched per 100 repo refs. +fn tagged_one_of_our_repo_event_filters( + repos: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + let repo_refs: Vec<_> = repos.iter().collect(); + + for chunk in repo_refs.chunks(100) { + let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); + + // Lowercase 'a' tag - standard addressable reference + let mut f1 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); + // Uppercase 'A' tag - some clients use this + let mut f2 = Filter::new() + .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone()); + // Quote 'q' tag - NIP-10 quote references to addressable events + let mut f3 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); + + if let Some(ts) = since { + f1 = f1.since(ts); + f2 = f2.since(ts); + f3 = f3.since(ts); + } -For events that reference PRs, Patches, or Issues from repos we track: + filters.push(f1); + filters.push(f2); + filters.push(f3); + } -```rust -// Collect event IDs of PRs, Patches, Issues we've stored -let tagged_event_ids: Vec = database - .query(Filter::new().kinds([1618, 1619, 1621, 1622, 1630])) // PR, PR Update, Issue, Patch, etc. - .iter() - .filter(|e| references_tracked_repo(e, &announcements)) - .map(|e| e.id) - .collect(); - -let layer3_filter = Filter::new() - .custom_tag(SingleLetterTag::lowercase(Alphabet::E), tagged_event_ids.clone()) - .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), tagged_event_ids)); -``` + filters +} -### Filter Size Management +/// Layer 3: Events tagging one of our root events +/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage. +/// Batched per 100 event IDs. +fn tagged_one_of_our_root_event_filters( + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + let event_ids: Vec = root_events.iter().map(|id| id.to_hex()).collect(); + + for chunk in event_ids.chunks(100) { + let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); + + // Lowercase 'e' tag - standard event reference + let mut f1 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); + // Uppercase 'E' tag - some clients use this + let mut f2 = Filter::new() + .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone()); + // Quote 'q' tag - NIP-10 quote references to events + let mut f3 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); + + if let Some(ts) = since { + f1 = f1.since(ts); + f2 = f2.since(ts); + f3 = f3.since(ts); + } -When the tag list exceeds a threshold, split into batches: + filters.push(f1); + filters.push(f2); + filters.push(f3); + } -```rust -const MAX_TAGS_PER_FILTER: usize = 100; + filters +} -fn build_filters(tag_values: Vec) -> Vec { - tag_values - .chunks(MAX_TAGS_PER_FILTER) - .map(|chunk| Filter::new().custom_tag(tag, chunk.to_vec())) - .collect() +/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1) +/// Used by: +/// - compute_actions for incremental subscriptions +/// - consolidation rebuilds (Layer 1 remains active) +fn build_layer2_and_layer3_filters( + repos: &HashSet, + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); + filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); + filters } ``` -**Consolidation**: When total filter count exceeds ~150 across a connection, consolidate by rebuilding from scratch. +**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently. -### Filter Generation vs. Policy Validation +### 4. handle_add_filters (SyncManager) -The filter strategy and acceptance policies serve **different purposes** even though they share conceptual knowledge: +```rust +impl SyncManager { + async fn handle_add_filters(&mut self, action: AddFilters) { + let AddFilters { relay_url, repos, root_events, filters } = action; + + // Auto-spawn connection if needed + let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); + match state { + None => { + // New relay discovered - create entry and spawn connection + self.relay_sync_index.write().await.insert( + relay_url.clone(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connecting, + last_connected: None, + disconnected_at: None, + connection: None, + } + ); + self.spawn_connection(&relay_url).await; + return; // Subscriptions will happen on connection success + } + Some(state) if state.connection_status != ConnectionStatus::Connected => { + // Not connected - subscriptions will happen on connection success + return; + } + Some(_) => { + // Already connected - proceed with subscription + } + } -| Concern | Filters | Policies | -|---------|---------|----------| -| **Direction** | What to request FROM remote relays | What to accept INTO local database | -| **Input** | Stored events (announcements, PRs, etc.) | Single incoming event | -| **Output** | Filter specification | Accept/Reject decision | + // Subscribe and collect subscription IDs + let conn = self.connections.get(&relay_url).unwrap(); + let mut sub_ids = HashSet::new(); + + for filter in filters { + match conn.client.subscribe(filter, None).await { + Ok(output) => { + for sub_id in output.val { + sub_ids.insert(sub_id); + } + } + Err(e) => { + tracing::warn!("Failed to subscribe: {}", e); + } + } + } -The modular sub-policies ([`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24), [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25), etc.) encode knowledge about event kinds and tag types, but this knowledge is applied differently: + // Create pending batch + let batch = PendingBatch { + batch_id: self.next_batch_id(), + items: PendingItems { repos, root_events }, + outstanding_subs: sub_ids, + }; -- **In filters**: We enumerate **all** addressable refs (`30617:pubkey:id`) from stored announcements -- **In policies**: [`RelatedEventPolicy::check_references()`](../../src/nostr/policy/related.rs:39) checks if incoming event references **any** accepted event + // Add to pending index + self.pending_sync_index.write().await + .entry(relay_url) + .or_default() + .push(batch); + } +} +``` -Because of this fundamental difference, filter generation logic stays in `src/sync/filter.rs` rather than being delegated to policy modules. Both share the understanding of NIP-34 event relationships, but they answer different questions. +### 5. handle_disconnect -## Subscription Updates +```rust +impl SyncManager { + /// Called when connection to a relay is lost + async fn handle_disconnect(&mut self, relay_url: &str) { + let mut index = self.relay_sync_index.write().await; + + if let Some(state) = index.get_mut(relay_url) { + state.connection_status = ConnectionStatus::Disconnected; + state.disconnected_at = Some(Timestamp::now()); + state.connection = None; + } -### Dynamic Subscription Management + // Clear pending batches - these items were not confirmed + self.pending_sync_index.write().await.remove(relay_url); -When new events arrive that affect our filter criteria: + // Remove from active connections map + self.connections.remove(relay_url); -```mermaid -sequenceDiagram - participant LocalRelay - participant SyncManager - participant RemoteRelay - - LocalRelay->>SyncManager: New PR event accepted - SyncManager->>SyncManager: Extract event ID - SyncManager->>SyncManager: Build new filter for E/e/q tags - SyncManager->>RemoteRelay: REQ with new filter - RemoteRelay-->>SyncManager: Events matching new filter + // Health tracker records failure for backoff + self.health_tracker.record_failure(relay_url); + } +} ``` -**Events that trigger subscription updates**: +### 6. handle_connect_or_reconnect (Unified) -- New repository announcement accepted (adds to Layer 2) -- New PR/Issue/Patch accepted (adds to Layer 3) +This method handles BOTH initial connection AND reconnection with unified logic: -### When to Consolidate +```rust +impl SyncManager { + /// Called when connection to a relay succeeds - handles both initial connect and reconnect. + /// + /// Decision tree: + /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history + /// - Quick reconnect (<15min): since = last_connected - 15min + async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { + let mut index = self.relay_sync_index.write().await; + let state = match index.get_mut(relay_url) { + Some(s) => s, + None => return, // Relay was removed while disconnected + }; -Track subscription count per connection: + // Determine if this is a fresh sync or quick reconnect + let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); + let last_connected = state.last_connected; -```rust -struct ConnectionState { - relay_url: RelayUrl, - subscriptions: Vec, - total_filter_count: usize, -} + if is_fresh_sync && state.last_connected.is_some() { + // Stale reconnect (>15min) - clear state + tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); + state.clear_sync_state(); + } -impl ConnectionState { - fn should_consolidate(&self) -> bool { - self.total_filter_count > 150 - } + // Update connection state + state.connection_status = ConnectionStatus::Connected; + state.last_connected = Some(Timestamp::now()); + state.disconnected_at = None; - async fn consolidate(&mut self) { - // Close all subscriptions - // Rebuild from scratch with current database state - } -} -``` + // Record success in health tracker + self.health_tracker.record_success(relay_url); -## Negentropy Catchup + drop(index); // Release lock -### NIP-77 Reconciliation Protocol + let conn = match self.connections.get(relay_url) { + Some(c) => c, + None => return, + }; -Negentropy enables efficient set reconciliation - discovering which events we're missing without transferring full event lists. + if is_fresh_sync { + // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions -### Timing + // Step 1: Subscribe Layer 1 (announcements) without since + let layer1 = build_announcement_filter(None); + let _ = conn.client.subscribe(layer1, None).await; -| Trigger | Behavior | -| ------------------- | -------------------------------------------------------------------- | -| **Initial startup** | Warm-up delay, staggered if many filters, initializes daily schedule | -| **After reconnect** | Delay to avoid rate limiting, limited to events from last 3 days | -| **Daily** | Staggered batches, max 100 tagged events per filter | + // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) + self.recompute_actions_for_relay(relay_url).await; + } else { + // Quick reconnect: Layer 1 with since, Layer 2+3 with since + let since = last_connected + .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) + .unwrap_or(Timestamp::from(0)); -### Startup Flow + // Step 1: Subscribe Layer 1 (announcements) with since + let layer1 = build_announcement_filter(Some(since)); + let _ = conn.client.subscribe(layer1, None).await; -```mermaid -sequenceDiagram - participant Server - participant SyncManager - participant Relay - - Server->>SyncManager: Start - SyncManager->>SyncManager: Wait warm-up delay - SyncManager->>SyncManager: Build batched filters - - loop For each relay with stagger delay - SyncManager->>Relay: NEG-OPEN with filter batch 1 - Relay-->>SyncManager: NEG-MSG with differences - SyncManager->>Relay: NEG-MSG response - Note over SyncManager,Relay: Reconciliation rounds - Relay-->>SyncManager: NEG-CLOSE or events - SyncManager->>SyncManager: Validate + store events - - alt More batches - SyncManager->>SyncManager: Wait stagger delay - SyncManager->>Relay: NEG-OPEN with next batch - end - end + // Step 2: Rebuild Layer 2+3 for confirmed items with since + self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; - SyncManager->>SyncManager: Schedule daily catchup -``` + // Step 3: Check for NEW items via compute_actions + self.recompute_actions_for_relay(relay_url).await; + } + } -### Reconnection Catchup + /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). + /// Used by: + /// - Quick reconnect: rebuild confirmed items with since filter + /// - Consolidation: close and rebuild with since filter + async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option) { + let confirmed = self.relay_sync_index.read().await; + let state = match confirmed.get(relay_url) { + Some(s) => s, + None => return, + }; -After connection reestablished: + // Build Layer 2+3 filters WITH since + let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); + drop(confirmed); -```rust -async fn catchup_after_reconnect(&self, relay: &RelayUrl) { - // Delay to avoid immediate disconnect for too many requests - tokio::time::sleep(RECONNECT_CATCHUP_DELAY).await; + // Subscribe directly - no PendingBatch for catch-up (items already confirmed) + let conn = match self.connections.get(relay_url) { + Some(c) => c, + None => return, + }; + + for filter in filters { + let _ = conn.client.subscribe(filter, None).await; + } + } + + /// Rerun compute_actions for a specific relay and process resulting AddFilters. + /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. + async fn recompute_actions_for_relay(&mut self, relay_url: &str) { + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + drop(repo_index); + + // Filter to just this relay + let target = match targets.get(relay_url) { + Some(t) => t.clone(), + None => return, // No repos reference this relay anymore + }; - // Only catch up on recent events (last 3 days) - let since = Timestamp::now() - Duration::from_secs(3 * 24 * 60 * 60); + let pending = self.pending_sync_index.read().await; + let confirmed = self.relay_sync_index.read().await; - let filters = self.build_filters_for_relay(relay) - .into_iter() - .map(|f| f.since(since)) - .collect(); + let mut single_relay_targets = HashMap::new(); + single_relay_targets.insert(relay_url.to_string(), target); - self.run_negentropy(relay, filters).await; + let actions = compute_actions(&single_relay_targets, &pending, &confirmed); + + drop(pending); + drop(confirmed); + + // Process AddFilters + for action in actions { + self.handle_add_filters(action).await; + } + } } ``` -### Daily Catchup Schedule +### 7. Daily Timer ```rust -// Daily catchup runs at consistent time, staggered across relays -async fn schedule_daily_catchup(&self) { - let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60)); +impl SyncManager { + async fn run_daily_timer(&self) { + loop { + // Random 23-25 hours + let hours = 23.0 + rand::random::() * 2.0; + tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; + + let relay_urls: Vec<_> = self.relay_sync_index.read().await + .keys() + .cloned() + .collect(); + + for relay_url in relay_urls { + self.daily_sync(&relay_url).await; + } + } + } - loop { - interval.tick().await; + /// Perform daily fresh sync for a relay + async fn daily_sync(&mut self, relay_url: &str) { + tracing::info!("Daily sync triggered for {}", relay_url); - for (i, relay) in self.healthy_relays().enumerate() { - // Stagger: 5 minute delay between relays - tokio::time::sleep(Duration::from_secs(i as u64 * 300)).await; + // Close all subscriptions + if let Some(conn) = self.connections.get(relay_url) { + conn.client.unsubscribe_all().await; + } - // Batch filters to max 100 tagged events each - let batches = self.build_batched_filters(&relay, 100); + // Clear PendingSyncIndex + self.pending_sync_index.write().await.remove(relay_url); - for batch in batches { - self.run_negentropy(&relay, batch).await; - tokio::time::sleep(Duration::from_secs(60)).await; // 1 min between batches + // Clear confirmed state - triggers fresh sync + { + let mut index = self.relay_sync_index.write().await; + if let Some(state) = index.get_mut(relay_url) { + state.clear_sync_state(); } } + + // Recompute actions - will generate AddFilters for everything + self.recompute_actions_for_relay(relay_url).await; } } ``` -## Event Processing +### 8. Consolidation (Threshold-Based, Triggered on Add) -### Acceptance Policy +Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active. -All synced events go through our acceptance policy, reusing the same [`Nip34WritePolicy`](../../src/nostr/builder.rs:36) validation logic used for direct client submissions. +```rust +impl SyncManager { + /// Check filter count and consolidate if needed. + /// Called from handle_add_filters BEFORE adding new filters. + async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { + let current_count = self.get_filter_count(relay_url).await; -#### Design: Reusing admit_event() + if current_count + new_filter_count > 70 { + self.consolidate(relay_url).await; + } + } -The [`WritePolicy::admit_event()`](../../src/nostr/builder.rs:256-269) trait method takes a `SocketAddr` parameter designed for client connections: + /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. + /// Does NOT clear state - just reduces filter count. + async fn consolidate(&mut self, relay_url: &str) { + tracing::info!("Consolidating filters for {} (count > 70)", relay_url); -```rust -// From nostr-relay-builder WritePolicy trait -fn admit_event<'a>( - &'a self, - event: &'a Event, - _addr: &'a SocketAddr, // Unused in our implementation -) -> BoxedFuture<'a, PolicyResult>; + // Wait for all pending batches to complete first + self.wait_pending_complete(relay_url).await; + + // Close Layer 2+3 subscriptions only - Layer 1 remains active + // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately + // For simplicity, we close all and re-add Layer 1 + if let Some(conn) = self.connections.get(relay_url) { + conn.client.unsubscribe_all().await; + } + + // Re-subscribe Layer 1 with since (maintains announcements stream) + let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); + let conn = self.connections.get(relay_url).unwrap(); + let layer1 = build_announcement_filter(Some(since)); + let _ = conn.client.subscribe(layer1, None).await; + + // Rebuild Layer 2+3 only + self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; + } +} ``` -For synced events from remote relays, we pass a **synthetic localhost address** since: -1. The `_addr` parameter is currently unused in our [`Nip34WritePolicy`](../../src/nostr/builder.rs:259) -2. All meaningful validation is done by the modular sub-policies (see below) -3. This allows reusing 100% of the existing validation logic +**Updated handle_add_filters to check consolidation:** ```rust -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - -/// Synthetic address for synced events (not from a direct client connection) -const SYNC_SOURCE_ADDR: SocketAddr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 0 -); - -async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> { - // Apply our Nip34WritePolicy using synthetic address - // The SocketAddr is unused - all validation is by the modular sub-policies - let result = self.acceptance_policy - .admit_event(&event, &SYNC_SOURCE_ADDR) - .await; - - match result { - PolicyResult::Accept => { - self.database.save_event(&event).await?; - tracing::debug!( - "Accepted synced event {} from {}", - event.id.to_hex(), - source_relay - ); - self.trigger_subscription_updates(&event).await; +impl SyncManager { + async fn handle_add_filters(&mut self, action: AddFilters) { + let AddFilters { relay_url, repos, root_events, filters } = action; + + // Auto-spawn connection if needed (unchanged) + let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); + match state { + None => { + // New relay discovered - create entry and spawn connection + self.relay_sync_index.write().await.insert( + relay_url.clone(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connecting, + last_connected: None, + disconnected_at: None, + connection: None, + } + ); + self.spawn_connection(&relay_url).await; + return; // Subscriptions will happen on connection success + } + Some(state) if state.connection_status != ConnectionStatus::Connected => { + return; // Not connected - subscriptions will happen on connection success + } + Some(_) => { + // Already connected - proceed + } } - PolicyResult::Reject(reason) => { - tracing::debug!( - "Rejected synced event {} from {}: {}", - event.id.to_hex(), - source_relay, - reason - ); + + // CHECK CONSOLIDATION BEFORE ADDING + self.maybe_consolidate(&relay_url, filters.len()).await; + + // Subscribe and collect subscription IDs + let conn = self.connections.get(&relay_url).unwrap(); + let mut sub_ids = HashSet::new(); + + for filter in filters { + match conn.client.subscribe(filter, None).await { + Ok(output) => { + for sub_id in output.val { + sub_ids.insert(sub_id); + } + } + Err(e) => { + tracing::warn!("Failed to subscribe: {}", e); + } + } } - } - Ok(()) + // Create pending batch (unchanged) + let batch = PendingBatch { + batch_id: self.next_batch_id(), + items: PendingItems { repos, root_events }, + outstanding_subs: sub_ids, + }; + + self.pending_sync_index.write().await + .entry(relay_url) + .or_default() + .push(batch); + } } ``` -#### Modular Sub-Policies - -The [`Nip34WritePolicy`](../../src/nostr/builder.rs:36-42) delegates to specialized sub-policies in [`src/nostr/policy/`](../../src/nostr/policy/mod.rs:1-41): +--- -| Sub-Policy | Kinds | Responsibility | -|------------|-------|----------------| -| [`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24-27) | 30617 | Validates service listing, maintainer exception, creates bare repos | -| [`StatePolicy`](../../src/nostr/policy/state.rs:43-46) | 30618 | Validates state structure, aligns git refs with authorized state | -| [`PrEventPolicy`](../../src/nostr/policy/pr_event.rs) | 1618, 1619 | Validates PR/PR Update events, manages refs/nostr/* | -| [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25-29) | All others | Checks forward/backward references to accepted repos/events | +## Disconnect (Relay Removal) Handling -All sub-policies share a common [`PolicyContext`](../../src/nostr/policy/mod.rs:22-27) containing: -- `domain`: Our service domain for validation -- `database`: For querying existing events -- `git_data_path`: For git operations +```rust +impl SyncManager { + /// Periodically check for relays that should be disconnected + async fn check_disconnects(&mut self) { + let confirmed = self.relay_sync_index.read().await; + let relays_to_disconnect: Vec<_> = confirmed.iter() + .filter(|(_, state)| { + !state.is_bootstrap && + state.repos.is_empty() && + state.root_events.is_empty() + }) + .map(|(url, _)| url.clone()) + .collect(); + drop(confirmed); + + for relay_url in relays_to_disconnect { + self.disconnect_relay(&relay_url).await; + } + } -#### Why Not Call Sub-Policies Directly? + async fn disconnect_relay(&mut self, relay_url: &str) { + tracing::info!("Disconnecting relay {} (no repos)", relay_url); -While we could bypass `admit_event()` and call sub-policies directly: + self.relay_sync_index.write().await.remove(relay_url); + self.pending_sync_index.write().await.remove(relay_url); -```rust -// Alternative: Direct sub-policy calls (NOT recommended) -match event.kind.as_u16() { - 30617 => self.announcement_policy.validate(&event).await, - 30618 => self.state_policy.validate(&event), - 1618 | 1619 => self.pr_event_policy.validate_nostr_ref(&event).await, - _ => self.related_event_policy.check_references(&event).await, + if let Some(conn) = self.connections.remove(relay_url) { + let _ = conn.client.disconnect().await; + } + } } ``` -This is **not recommended** because: -1. Duplicates the kind-routing logic from [`admit_event()`](../../src/nostr/builder.rs:261-268) -2. Misses important post-validation steps (e.g., `handle_announcement()` also calls `ensure_bare_repository()`) -3. Creates maintenance burden when policy logic changes +--- -## Module Structure +## State Flow Summary -### New `src/sync/` Module +```mermaid +flowchart TB + subgraph Input + SS[SelfSubscriber] + OWN[Own Relay] + end -``` -src/ -├── sync/ -│ ├── mod.rs # Module exports -│ ├── manager.rs # SyncManager - main coordinator -│ ├── connection.rs # Per-relay connection handling -│ ├── filter.rs # Filter building and batching -│ ├── health.rs # RelayHealth tracking -│ ├── negentropy.rs # NIP-77 reconciliation logic -│ └── subscription.rs # Dynamic subscription management -├── nostr/ -│ └── ... (existing) -└── ... -``` + subgraph RepoSyncIndex - What We Want + RSI[HashMap: Repo to Relays+Events] + end -### Integration with Main Binary + subgraph Derived Target + DT[derive_relay_targets fn] + TGT[Per-relay: repos + events we should sync] + end -```rust -// In main.rs -async fn main() -> Result<()> { - // ... existing setup ... + subgraph compute_actions - Decision Point + CA[Three-way diff: target - pending - confirmed] + end - // Start sync manager as background task - let sync_manager = SyncManager::new( - database.clone(), - config.domain.clone(), - ); + subgraph PendingSyncIndex - In Flight + PSI[Vec PendingBatch per relay] + end - tokio::spawn(async move { - sync_manager.run().await - }); + subgraph RelaySyncIndex - Confirmed State + RLI[RelayState per relay] + CONN[connection_status] + REPOS[repos + root_events] + TIMES[last_connected + disconnected_at] + end - // ... rest of server startup ... -} + SS -->|subscribe| OWN + OWN -->|events| SS + SS -->|batch fires| RSI + RSI --> DT + DT --> TGT + TGT --> CA + PSI --> CA + RLI --> CA + CA -->|Layer 2+3 new items| AF[AddFilters] + AF -->|check filter count| CONSOL{count + new > 70?} + CONSOL -->|yes| CONSOLIDATE[consolidate] + CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] + L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] + CONSOL -->|no| SUB[subscribe] + AF -->|spawn if needed| CONN + SUB --> PSI + PSI -->|EOSE| REPOS + + CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] + DISC -->|any reconnect| HC[handle_connect_or_reconnect] + + subgraph handle_connect_or_reconnect + HC --> FRESH_CHECK{is_fresh_sync?} + FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] + FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since] + L1_FRESH --> RCA1[recompute_actions_for_relay] + L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since] + L23_QUICK --> RCA2[recompute_actions_for_relay] + end ``` -## Metrics & Observability +--- + +## Key Design Decisions + +| Decision | Choice | Rationale | +| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | +| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | +| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | +| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | +| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | +| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | +| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | +| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | +| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | +| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | +| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | +| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | +| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | +| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | +| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | +| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | +| Connection spawning | Via AddFilters handler | Single path for new relay discovery | +| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | -All sync metrics are exposed via Prometheus at `/metrics`. For <100 relays, per-relay labels are acceptable cardinality. +--- -### Prometheus Metrics +## Module Structure -```rust -/// Sync module metrics registered with the global Prometheus registry -pub struct SyncMetrics { - // === Connection Metrics (per relay) === - /// Active outbound connections: ngit_sync_relay_connected{relay="wss://..."} - relay_connected: IntGaugeVec, // labels: [relay] +``` +src/sync/ +├── mod.rs # SyncManager, main loop +├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types +├── actions.rs # AddFilters struct, compute_actions, build_filters +├── self_subscriber.rs # SelfSubscriber, batching logic +├── relay_connection.rs # Per-relay WebSocket connection +├── consolidation.rs # Consolidation logic, daily timer +├── health.rs # Health tracking (reuse from v2) +└── metrics.rs # Prometheus metrics (reuse from v2) +``` - /// Connection attempts: ngit_sync_connection_attempts_total{relay="wss://...", result="success|failure"} - connection_attempts: CounterVec, // labels: [relay, result] +--- - // === Relay Health Status === - /// Current status: ngit_sync_relay_status{relay="wss://...", status="healthy|backoff|dead"} - relay_status: IntGaugeVec, // labels: [relay, status] +## Comparison: v3 vs v4 - /// Consecutive failures: ngit_sync_relay_failures{relay="wss://..."} - relay_failures: IntGaugeVec, // labels: [relay] +| Aspect | v3 | v4 | +| ------------------------ | ----------------------------------------- | --------------------------------------------- | +| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | +| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | +| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | +| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | +| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | +| Since filter application | Applied in handle_reconnect | build_all_filters with optional since | +| PSI clearing | On disconnect | On disconnect (confirmed) | +| Daily timer | Consolidation-style | Fresh sync (different from consolidation) | - // === Event Source Tracking === - /// Events received by source: ngit_sync_events_total{source="direct|live_sync|catchup|daily_catchup"} - events_total: CounterVec, // labels: [source] +--- - /// Sync gap events (should have been live synced): ngit_sync_gap_events_total{relay="wss://..."} - sync_gap_events: CounterVec, // labels: [relay] +## Self-Subscriber Flow - // === Aggregate Metrics === - /// Total relays being tracked - relays_tracked_total: IntGauge, +The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions. - /// Relays currently connected - relays_connected_total: IntGauge, +### State Tracking - /// Relays in dead state - relays_dead_total: IntGauge, +```rust +pub struct SelfSubscriber { + own_relay_url: String, + relay_domain: String, + repo_sync_index: RepoSyncIndex, + pending_sync_index: PendingSyncIndex, + relay_sync_index: RelaySyncIndex, + action_tx: mpsc::Sender, + /// Timestamp of last successful connection - used for since filter on reconnection + last_connected: Option, + /// The active client connection + client: Option, } ``` -### Metric Definitions - -| Metric | Type | Labels | Description | -| ------------------------------------- | ------- | ------------- | ------------------------------------------------------ | -| `ngit_sync_relay_connected` | Gauge | relay | 1 if connected, 0 if not | -| `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes | -| `ngit_sync_relay_status` | Gauge | relay, status | 1 for current status, 0 otherwise | -| `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count | -| `ngit_sync_events_total` | Counter | source | Events received by source type | -| `ngit_sync_gap_events_total` | Counter | relay | Events found during catchup that should have been live | -| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered from announcements | -| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count | -| `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead | +### On Startup / Reconnect (Unified) -**Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time. The `ngit_sync_gap_events_total` metric tracks this per relay. - -### Observability Integration +Both initial startup and reconnection use the same `connect_and_subscribe` method: ```rust -impl SyncManager { - fn record_event_received(&self, event: &Event, source: EventSource) { - match source { - EventSource::DirectSubmission => { - self.metrics.events_total.with_label_values(&["direct"]).inc(); +impl SelfSubscriber { + async fn run(mut self) { + loop { + // Connect or reconnect + if let Err(e) = self.connect_and_subscribe().await { + tracing::warn!("Connection failed: {}, will retry", e); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; } - EventSource::LiveSync(relay) => { - self.metrics.events_total.with_label_values(&["live_sync"]).inc(); - } - EventSource::Catchup(relay) => { - // This is a sync gap - we should have gotten it via live sync - self.metrics.events_total.with_label_values(&["catchup"]).inc(); - self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc(); - tracing::warn!( - relay = %relay, - event_id = %event.id.to_hex(), - "Sync gap detected: event found during catchup" - ); - } - EventSource::DailyCatchup(relay) => { - // Sustained sync gap - missed by both live sync and initial catchup - self.metrics.events_total.with_label_values(&["daily_catchup"]).inc(); - self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc(); - tracing::error!( - relay = %relay, - event_id = %event.id.to_hex(), - "Sustained sync gap: event found during daily catchup" - ); - } - } - } - fn record_connection_attempt(&self, relay: &RelayUrl, success: bool) { - let result = if success { "success" } else { "failure" }; - self.metrics.connection_attempts - .with_label_values(&[relay.as_str(), result]) - .inc(); - } + // Run event loop until disconnection + self.event_loop().await; - fn update_relay_status(&self, relay: &RelayUrl, status: &RelayStatus) { - // Reset all status labels for this relay - for s in ["healthy", "backoff", "dead"] { - self.metrics.relay_status - .with_label_values(&[relay.as_str(), s]) - .set(0); + // Loop will retry connection } - // Set current status - let status_label = match status { - RelayStatus::Healthy => "healthy", - RelayStatus::Backoff { .. } => "backoff", - RelayStatus::Dead => "dead", - }; - self.metrics.relay_status - .with_label_values(&[relay.as_str(), status_label]) - .set(1); } -} -``` - -### Example Grafana Queries -```promql -# Relay health overview - count by status -sum by (status) (ngit_sync_relay_status == 1) - -# Connection success rate over last hour -sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) -/ sum(rate(ngit_sync_connection_attempts_total[1h])) - -# Sync gap detection - events that should have been live synced -sum(rate(ngit_sync_gap_events_total[1h])) by (relay) - -# Live sync effectiveness (lower is better - fewer gaps) -sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) -/ sum(rate(ngit_sync_events_total[1h])) + async fn connect_and_subscribe(&mut self) -> Result<(), Error> { + let client = Client::new(Keys::generate()); + client.add_relay(&self.own_relay_url).await?; + client.connect().await; + + // Build filter - add since only on reconnect + let filter = Filter::new().kinds([ + Kind::Custom(30617), // Repository announcements + Kind::GitPatch, // 1617 + Kind::Custom(1618), // PRs + Kind::Custom(1619), // PR updates + Kind::GitIssue, // 1621 + ]); + + let filter = if let Some(ts) = self.last_connected { + // Reconnection: use since filter + let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer + filter.since(since) + } else { + // Initial connect: no since filter - get full history + filter + }; -# Relays with high failure counts (potential issues) -topk(10, ngit_sync_relay_failures) + // Update last_connected AFTER computing since + self.last_connected = Some(Timestamp::now()); -# Alert: relay stuck in dead state -ngit_sync_relay_status{status="dead"} == 1 + client.subscribe(filter, None).await?; + self.client = Some(client); + Ok(()) + } +} ``` -### Log Levels for Sync Events +### Event Loop with Batching -| Event | Level | Context | -| ----------------------- | ----- | ----------------------------- | -| Event via live sync | DEBUG | Normal operation | -| Event via catchup | WARN | Sync gap detected | -| Event via daily catchup | ERROR | Sustained gap | -| Connection established | INFO | Relay URL | -| Connection failed | WARN | Relay URL, attempt #, backoff | -| Relay marked dead | ERROR | Relay URL, failure duration | -| Peer missing events | WARN | Relay URL, repo, count | +```rust +impl SelfSubscriber { + async fn event_loop(&mut self) { + let client = self.client.as_ref().unwrap(); + let mut pending_events: Vec = Vec::new(); + let mut batch_timer: Option = None; + let batch_window = Duration::from_secs(5); + + loop { + let timeout = batch_timer + .map(|t| batch_window.saturating_sub(t.elapsed())) + .unwrap_or(Duration::from_secs(60)); + + tokio::select! { + notification = client.notifications().recv() => { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + pending_events.push(*event); + + // Start timer on first event - does NOT reset + if batch_timer.is_none() { + batch_timer = Some(Instant::now()); + } + } + Ok(RelayPoolNotification::Shutdown) => { + // Connection lost + break; + } + _ => {} + } + } + _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { + // Batch window elapsed + self.process_batch(pending_events.drain(..).collect()).await; + batch_timer = None; + } + } + } + } -## Configuration + async fn process_batch(&self, events: Vec) { + // 1. Update RepoSyncIndex + for event in events { + match event.kind.as_u16() { + 30617 => self.handle_announcement(&event).await, + 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, + _ => {} + } + } -```rust -pub struct SyncConfig { - /// Warm-up delay before starting initial catchup - pub startup_delay: Duration, // Default: 30s + // 2. Derive targets and compute actions + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); - /// Delay between filter batches during catchup - pub batch_delay: Duration, // Default: 60s + let pending = self.pending_sync_index.read().await; + let confirmed = self.relay_sync_index.read().await; - /// Delay after reconnect before catchup - pub reconnect_delay: Duration, // Default: 10s + let actions = compute_actions(&targets, &pending, &confirmed); - /// Maximum events in last N days for reconnect catchup - pub reconnect_lookback_days: u32, // Default: 3 + drop(repo_index); + drop(pending); + drop(confirmed); - /// Maximum tagged event IDs per filter - pub max_tags_per_filter: usize, // Default: 100 + // 3. Send actions to SyncManager + for action in actions { + let _ = self.action_tx.send(action).await; + } + } - /// Consolidate subscriptions when count exceeds - pub max_subscriptions: usize, // Default: 150 + async fn handle_announcement(&self, event: &Event) { + // Extract repo_ref from event - 30617:pubkey:identifier + let d_tag = event.tags.iter() + .find_map(|tag| { + if tag.kind() == TagKind::D { + tag.content().map(|s| s.to_string()) + } else { + None + } + }) + .unwrap_or_default(); + + let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); + + // Extract relay URLs from 'r' tags + let relays: HashSet = event.tags.iter() + .filter_map(|tag| { + if tag.kind() == TagKind::Relay { + tag.content().map(|s| s.to_string()) + } else { + None + } + }) + .collect(); + + // Update RepoSyncIndex + let mut index = self.repo_sync_index.write().await; + let needs = index.entry(repo_ref).or_default(); + needs.relays = relays; + } - /// Backoff configuration - pub max_backoff: Duration, // Default: 1h - pub dead_threshold: Duration, // Default: 24h - pub dead_retry_interval: Duration, // Default: 24h + async fn handle_root_event(&self, event: &Event) { + // Extract repo_ref from 'a' tag + let repo_ref = event.tags.iter() + .find_map(|tag| { + if tag.kind() == TagKind::A { + tag.content().map(|s| s.to_string()) + } else { + None + } + }); + + if let Some(repo_ref) = repo_ref { + let mut index = self.repo_sync_index.write().await; + let needs = index.entry(repo_ref).or_default(); + needs.root_events.insert(event.id); + } + } } ``` -## Summary - -| Component | Responsibility | -| ---------------------- | -------------------------------------------------------------- | -| **SyncManager** | Orchestrates connections, triggers catchup, processes events | -| **FilterService** | Builds unified filters from database state | -| **RelayHealthTracker** | Manages backoff, dead relay detection (in-memory + Prometheus) | -| **ConnectionState** | Per-relay WebSocket + subscription management | -| **SyncMetrics** | Prometheus metrics for operator visibility | - -### Key Design Decisions - -1. **Unified filters** for live sync and negentropy - same criteria, different delivery mechanism -2. **Exclude ourselves** from relay list to prevent loops -3. **One connection per relay** with combined filters for efficiency -4. **In-memory health state** with Prometheus metrics for visibility (no database persistence needed for <100 relays) -5. **Graceful degradation on restart** - conservative initial backoff with jitter avoids thundering herd -6. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up -7. **Client-side filtering** for 30617/30618, server-side for Layer 2/3 -8. **Dynamic subscription addition** with periodic consolidation -9. **Custom acceptance policy** excluding rate limiting defaults -10. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps, tracked in Prometheus - --- -## Implementation Notes (Phase 6) - -This section documents the final implementation as of Phase 6 (Observability & Production Readiness). +## Implementation Notes -### What Was Actually Built +This section documents the actual implementation details as of December 2024 (Phases 1-10 complete). -The implementation closely follows the design document with the following completed components: +### Architectural Decisions During Implementation -#### Phase 1: Basic Sync (commit b167f1b) -- [`SyncManager`](../../src/sync/manager.rs) - Main coordinator for proactive sync -- Bootstrap relay sync via `NGIT_SYNC_BOOTSTRAP_RELAY_URL` configuration -- Dynamic relay discovery from repository announcements that list our service -- Event validation through existing [`Nip34WritePolicy`](../../src/nostr/builder.rs) +**Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup: -#### Phase 2: Three-Layer Filters (commit bf558b0) -- [`FilterService`](../../src/sync/filter.rs) - Builds three-layer filter strategy -- Layer 1: All kind 30617+30618 (announcements) -- Layer 2: A/a tag filters for repository events -- Layer 3: E/e tag filters for related events (PRs, Issues) -- Multi-relay discovery from stored announcements - -#### Phase 3: Health Tracking (commit f639ecf) -- [`RelayHealthTracker`](../../src/sync/health.rs) - DashMap-based health tracking -- Three states: Healthy → Degraded → Dead -- Exponential backoff: 5s → 10s → 20s → ... → max (default 1h) -- Dead relay detection after 24h continuous failures -- Startup jitter (0-10s) to prevent thundering herd +```rust +// 7. Wrap self in Arc for sharing with timer task +let sync_manager = Arc::new(Mutex::new(self)); +``` -#### Phase 4: Dynamic Subscriptions (commit a19ff57) -- [`SubscriptionManager`](../../src/sync/subscription.rs) - Per-connection subscription tracking -- Dynamic Layer 2 subscriptions when new announcements arrive -- Dynamic Layer 3 subscriptions when new PRs/Issues arrive -- Filter consolidation at threshold (150 filters) +This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. -#### Phase 5: Catchup & Gap Detection (commit 950c2e4) -- [`NegentropyService`](../../src/sync/negentropy.rs) - Gap-filling catchup operations -- Startup catchup (configurable delay) -- Reconnection catchup (limited lookback) -- Daily catchup (not yet implemented - placeholder) +**Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: -#### Phase 6: Observability (this phase) -- [`SyncMetrics`](../../src/sync/metrics.rs) - Full Prometheus integration -- Grafana dashboard panels for sync monitoring -- Documentation updates +- `DashMap` for thread-safe concurrent access without external locking +- Three states: `Healthy`, `Degraded`, `Dead` +- Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff +- Dead threshold: 24 hours of continuous failures +- Dead relay retry: Once per 24 hours -### Differences from Original Design +### Implementation Constants -1. **Negentropy (NIP-77)**: Simplified gap-filling was used instead of full NIP-77 negentropy reconciliation, as nostr-sdk 0.44 lacks built-in negentropy support. The current implementation uses timestamp-based catchup queries. +| Constant | Value | Purpose | +| --------------------------------- | ---------- | ------------------------------------------------ | +| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | +| `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | +| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | +| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | +| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | +| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | -2. **Filter Consolidation Threshold**: Set at 150 filters (as designed) based on typical relay filter limits. +### Daily Timer Randomization -3. **Health Tracking**: Implemented exactly as designed - in-memory only (not persisted to database), which is acceptable for production as health state rebuilds quickly on restart. +The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running: -4. **Metric Label Strategy**: Used simpler numeric encoding for health status (1=healthy, 2=degraded, 3=dead) instead of multiple label values per relay, reducing cardinality. +```rust +let hours = 23.0 + rand::thread_rng().gen::() * 2.0; +``` -5. **Event Source Tracking**: Implemented four source types (`live`, `startup`, `reconnect`, `daily`) instead of the original (`direct`, `live_sync`, `catchup`, `daily_catchup`). +### Bootstrap Relay Protection -### Three-Layer Filter Strategy (As Implemented) +Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out: +```rust +.filter(|(_, state)| { + !state.is_bootstrap && + state.repos.is_empty() && + state.root_events.is_empty() +}) ``` -Layer 1: Discovery Layer -├── Query: kinds [30617, 30618] (announcements) -├── Applied: At startup and during sync -└── Purpose: Discover all repositories across network - -Layer 2: Repository Events -├── Query: Events with A/a tags pointing to tracked repos -├── Format: A tag = "30617::" -├── Triggered: When new announcement is accepted -└── Purpose: Get PRs, issues, patches for repositories - -Layer 3: Related Events -├── Query: Events with E/e tags pointing to tracked PRs/Issues -├── Triggered: When new PR/Issue is accepted -└── Purpose: Get comments, reviews, status updates -``` - -### Prometheus Metrics (As Implemented) -| Metric | Type | Labels | Description | -|--------|------|--------|-------------| -| `ngit_sync_relay_connected` | Gauge | relay | Connection status (1/0) | -| `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome | -| `ngit_sync_relay_status` | Gauge | relay | Health state (1/2/3) | -| `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures | -| `ngit_sync_events_total` | Counter | source | Events by source type | -| `ngit_sync_gap_events_total` | Counter | relay | Gap events filled | -| `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | -| `ngit_sync_relays_connected_total` | Gauge | - | Currently connected | -| `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count | +### Graceful Shutdown -### Configuration Options (As Implemented) +Shutdown uses a tokio broadcast channel for coordinated termination: -All configuration via environment variables or CLI flags: +```rust +let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); +``` -| Option | Type | Default | Description | -|--------|------|---------|-------------| -| `NGIT_SYNC_BOOTSTRAP_RELAY_URL` | String | None | Bootstrap relay URL for initial sync | -| `NGIT_SYNC_MAX_BACKOFF_SECS` | u64 | 3600 | Max backoff delay (seconds) | -| `NGIT_SYNC_STARTUP_DELAY_SECS` | u64 | 30 | Catchup delay after startup | -| `NGIT_SYNC_RECONNECT_DELAY_SECS` | u64 | 10 | Catchup delay after reconnect | -| `NGIT_SYNC_RECONNECT_LOOKBACK_DAYS` | u64 | 3 | Days to look back on reconnect | +Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop. -**Note:** Additional relays are automatically discovered from repository announcements (kind 30617) that list our service domain. The bootstrap relay provides an initial sync source but is not required - sync will discover relays from stored announcements. +### Actual Module Structure -### Module Structure (As Implemented) +The implemented module structure differs from the original spec: ``` src/sync/ -├── mod.rs # Module exports, constants -├── manager.rs # SyncManager - orchestrates sync -├── connection.rs # SyncConnection - per-relay WebSocket -├── filter.rs # FilterService - three-layer filters -├── health.rs # RelayHealthTracker - health states -├── metrics.rs # SyncMetrics - Prometheus integration -├── negentropy.rs # NegentropyService - gap-filling -└── subscription.rs # SubscriptionManager - dynamic subs +├── mod.rs # SyncManager, main loop, index types, metrics +├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters +├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters +├── health.rs # RelayHealthTracker, HealthState, exponential backoff +├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling +└── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic ``` -### Production Readiness Checklist +Key differences from spec: + +- No separate `state.rs` - types are defined in `mod.rs` +- No separate `actions.rs` - moved to `algorithms.rs` +- No separate `consolidation.rs` - consolidation logic in `mod.rs` +- No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs` + +### Deviations from Original v4 Spec + +1. **RelayState lacks `connection` field**: The spec showed `connection: Option` in `RelayState`, but the implementation stores connections in a separate `HashMap` in `SyncManager`. + +2. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct. + +3. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. -- [x] All metrics exposed at `/metrics` endpoint -- [x] Health state tracking with configurable backoff -- [x] Dead relay detection and minimal retry -- [x] Startup jitter to prevent thundering herd -- [x] Grafana dashboard with sync panels -- [x] Configuration documented -- [x] Integration tests passing +4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. -- cgit v1.2.3