From 7821b107190cc116a30a4c339f935bc16a1d5197 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 16 Dec 2025 15:26:55 +0000 Subject: proactive sync prep - some helper functions written but not enabled --- docs/explanation/grasp-02-proactive-sync.md | 729 ++++++++++++++++------------ 1 file changed, 432 insertions(+), 297 deletions(-) (limited to 'docs/explanation/grasp-02-proactive-sync.md') diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index 2a86126..34b7bb6 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md @@ -4,12 +4,11 @@ This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles: -1. **Self-subscription as the only mechanism** - No database initialization at startup -2. **compute_actions as single decision point** - Determines what NEW subscriptions to create -3. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) -4. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch -5. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary -6. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported +1. **Triggers call compute_actions → sync_computed_filters** - Self-subscriber batches and connect/reconnect events trigger this flow +2. **Clear separation of live vs historic sync** - Two distinct primitives with different purposes +3. **Layer 1 on connect, Layer 2+3 via AddFilters** - L1 handled at connection time, L2+L3 flow through compute_actions +4. **Always clear PendingSyncIndex first** - Before any reconnect/consolidate operation +5. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported --- @@ -90,7 +89,6 @@ impl RelayState { ### PendingSyncIndex (In-Flight Batches) ```rust - /// Method used for synchronization #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SyncMethod { @@ -100,7 +98,6 @@ pub enum SyncMethod { Negentropy, } - /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. /// Each batch has its own ID and can confirm independently. /// Key: relay URL @@ -118,7 +115,6 @@ pub struct PendingBatch { pub sync_method: SyncMethod, } - #[derive(Debug, Clone, Default)] pub struct PendingItems { pub repos: HashSet, @@ -145,152 +141,202 @@ stateDiagram-v2 --- -## Flow Scenarios +## Core Architecture: Live vs Historic Sync + +The sync system is built on two fundamental primitives that are clearly separated: + +### Sync Primitives + +| Primitive | Purpose | Filter Modifier | Tracking | +| ----------------- | ----------------------- | ---------------- | ---------------- | +| `sync_live()` | Ongoing event stream | `limit: 0` | Not tracked | +| `historic_sync()` | Catch up on past events | Optional `since` | PendingSyncIndex | + +### Why `limit: 0` for Live Sync? + +| Approach | Pros | Cons | +| ------------ | --------------------------------------- | --------------------------------- | +| `since: now` | Intuitive | Time-sensitive, clock skew issues | +| `limit: 0` | Deterministic, mirrors filter structure | Less intuitive name | + +`limit: 0` is better because: + +1. **No time dependency**: Doesn't depend on synchronized clocks +2. **Mirrors historic filters**: Same tag structure, just different limit +3. **State reconstruction**: Can rebuild from repo/event lists without timestamps + +### Layer Strategy + +| Layer | Content | When Subscribed | Managed By | +| ------- | --------------------------------------- | --------------------- | -------------------- | +| Layer 1 | 30617 Announcements, 30618 Maintainers | On connect (any type) | Connection lifecycle | +| Layer 2 | Events tagging our repos (a/A/q tags) | Via AddFilters | compute_actions | +| Layer 3 | Events tagging root events (e/E/q tags) | Via AddFilters | compute_actions | -### Scenario 1: Initial Connect +**Key insight**: Layer 1 is connection-level (handled at connect time), Layer 2+3 are item-level (flow through compute_actions → sync_computed_filters). + +--- + +## Triggers and Flow + +### What Triggers compute_actions → sync_computed_filters? + +| Trigger | When | What Happens | +| --------------------------- | -------------------------------------- | ---------------------------------------- | +| Self-subscriber batch fires | New events discovered on own relay | Update RepoSyncIndex → compute_actions | +| fresh_start() | Initial connect, long_reconnect, daily | After L1 setup → compute_actions | +| quick_reconnect() | Reconnect < 15 minutes | After L1+L2+L3 catchup → compute_actions | +| consolidate() | Filter count > threshold | After live rebuild → compute_actions | + +### The Core Flow ```mermaid flowchart TB - START[Startup] --> SS[Self-subscribe to own relay] - SS --> |no since filter| EVENTS[Receive historical events] - EVENTS --> RSI[Update RepoSyncIndex] - RSI --> DT[derive_relay_targets] - DT --> CA[compute_actions with targets and empty confirmed] - CA --> AF[AddFilters for each relay] - AF --> SPAWN{Relay connected?} - SPAWN --> |no| CONN[spawn_connection] - CONN --> HC[handle_connect_or_reconnect] - SPAWN --> |yes| SUB - - subgraph handle_connect_or_reconnect - Fresh Sync - HC --> CHECK_FRESH{is_fresh_sync?} - CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] - L1 --> RCA[recompute_actions_for_relay] - end + TRIGGER[Trigger fires] --> CA[compute_actions] + CA --> |derives from| RSI[RepoSyncIndex] + CA --> |subtracts| RLI[RelaySyncIndex] + CA --> |subtracts| PSI[PendingSyncIndex] + CA --> |produces| AF[AddFilters actions] + AF --> SFRE[sync_computed_filters] + SFRE --> LIVE[sync_live - L2+L3] + SFRE --> HIST[historic_sync - L2+L3] + HIST --> PSI_UPDATE[Update PendingSyncIndex] + PSI_UPDATE --> |EOSE received| CONFIRM[Move to RelaySyncIndex] +``` + +--- + +## Flow Scenarios - RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] - SUB --> PB[Create PendingBatch] +### Scenario 1: Fresh Start (Initial Connect / Long Reconnect / Daily Sync) + +```mermaid +flowchart TB + START[fresh_start called] --> CLEAR_PSI[Clear PendingSyncIndex] + CLEAR_PSI --> CLEAR_RSI[Clear RelaySyncIndex] + CLEAR_RSI --> L1_LIVE[L1: sync_live - announcements] + L1_LIVE --> L1_HIST[L1: historic_sync - no since] + L1_HIST --> NEG{NIP-77 supported?} + NEG --> |yes| NEGENTROPY[negentropy sync] + NEG --> |no| REQ[REQ+EOSE] + NEGENTROPY --> CA[compute_actions] + REQ --> CA + CA --> |empty RelaySyncIndex| AF[AddFilters for ALL repos] + AF --> SFRE[sync_computed_filters] + SFRE --> L23_LIVE[L2+L3: sync_live] + SFRE --> L23_HIST[L2+L3: historic_sync] + L23_HIST --> PB[Create PendingBatch] PB --> EOSE[Wait for EOSE] - EOSE --> CONFIRM[Move items to confirmed repos/root_events] + EOSE --> CONFIRM[Move items to RelaySyncIndex] ``` **Key points:** -- No `since` filter on initial connect - get full history -- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` -- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since -- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking +- Always clear PendingSyncIndex first, then RelaySyncIndex +- L1 live + L1 historic (uses negentropy if available) +- Empty RelaySyncIndex means diff produces AddFilters for everything +- L2+L3 flow through sync_computed_filters with proper pending tracking -### Scenario 2: Quick Reconnect (less than 15 minutes) +### Scenario 2: Quick Reconnect (< 15 minutes) ```mermaid flowchart TB DISC[Connection lost] --> MARK[Set disconnected_at = now] - MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] - CLEAR_PEND --> WAIT[Wait for reconnection] + MARK --> WAIT[Wait for reconnection < 15min] WAIT --> RECONN[Connection restored] - RECONN --> HC[handle_connect_or_reconnect] - - subgraph handle_connect_or_reconnect - Quick Reconnect - HC --> CHECK{is_fresh_sync?} - CHECK --> |no - last_connected exists AND less than 15min| SINCE[since = last_connected - 15min] - SINCE --> L1[build_announcement_filter - with since] - L1 --> L23[rebuild_layer2_and_layer3 - with since] - L23 --> RCA[recompute_actions_for_relay] - end - - RCA --> AF[AddFilters for new items only] - AF --> SUB[Subscribe] - SUB --> PB[Create PendingBatch] - PB --> EOSE[Wait for EOSE] - EOSE --> EXTEND[Extend confirmed state] + RECONN --> CLEAR_PSI[Clear PendingSyncIndex] + CLEAR_PSI --> L1_LIVE[L1: sync_live - announcements] + L1_LIVE --> L1_HIST[L1: historic_sync WITH since] + L1_HIST --> RECON[reconstruct_filters from RelaySyncIndex] + RECON --> L23_LIVE[L2+L3: sync_live] + RECON --> L23_HIST[L2+L3: historic_sync WITH since] + L23_HIST --> CA[compute_actions] + CA --> |check for new items| AF{New items?} + AF --> |yes| SFRE[sync_computed_filters] + AF --> |no| DONE[Done] + SFRE --> PB[Create PendingBatch] ``` **Key points:** -- PendingSyncIndex cleared on disconnect (not reconnect) -- `handle_connect_or_reconnect`: - 1. `build_announcement_filter(Some(since))` - Layer 1 with since - 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since - 3. `recompute_actions_for_relay` - check for new items -- since = last_connected - 15min ensures we catch events during disconnection +- Clear PendingSyncIndex first (old subscriptions are dead) +- L1 live (always on any connection) +- L1 historic WITH since (catches up missed announcements) +- L2+L3 rebuilt from RelaySyncIndex (confirmed state preserved) +- compute_actions checks for any NEW items discovered during catchup -### Scenario 3: Stale Reconnect (greater than 15 minutes) +### Scenario 3: Long Reconnect (> 15 minutes) ```mermaid flowchart TB - RECONN[Connection restored] --> HC[handle_connect_or_reconnect] + RECONN[Connection restored > 15min] --> METRIC[Record disconnect/reconnect metric] + METRIC --> FRESH[fresh_start] + FRESH --> |same as initial connect| DONE[Full sync initiated] +``` - subgraph handle_connect_or_reconnect - Stale Reconnect - HC --> CHECK{is_fresh_sync?} - CHECK --> |yes - disconnected greater than 15min| CLEAR[clear_sync_state] - CLEAR --> L1[build_announcement_filter - no since] - L1 --> RCA[recompute_actions_for_relay] - end +**Key points:** - RCA --> CA[compute_actions with empty confirmed] - CA --> AF[AddFilters for everything] - AF --> SUB[Subscribe - no since filter] - SUB --> PB[Create PendingBatch] - PB --> EOSE[Wait for EOSE] - EOSE --> CONFIRM[Populate confirmed state fresh] +- Records disconnect/reconnect as a metric +- Delegates to fresh_start() - same as initial connect +- State too stale to trust, start fresh + +### Scenario 4: Consolidation (Filter Count > Threshold) + +```mermaid +flowchart TB + CHECK[Filter count check] --> THRESHOLD{count > 70?} + THRESHOLD --> |yes| CLEAR_PSI[Clear PendingSyncIndex] + CLEAR_PSI --> UNSUB[unsubscribe_all] + UNSUB --> RECON[reconstruct_filters from RelaySyncIndex] + RECON --> L1_LIVE[L1: sync_live] + RECON --> L23_LIVE[L2+L3: sync_live] + L23_LIVE --> CA[compute_actions] + CA --> |check for new items| AF{New items?} + AF --> |yes| SFRE[sync_computed_filters] + AF --> |no| DONE[Done] + THRESHOLD --> |no| SKIP[Continue normally] ``` **Key points:** -- `should_clear_state()` returns true → triggers fresh sync -- Same path as initial connect after clearing state -- Layer 1: `build_announcement_filter(None)` - full history -- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything +- Clear PendingSyncIndex first +- NO historic sync needed - items already synced/syncing +- Only rebuilds live subscriptions from confirmed state +- compute_actions catches any new items that need syncing -### Scenario 4: Consolidation (Triggered on Filter Add) +### Scenario 5: Daily Sync (23-25h Random Timer) ```mermaid flowchart TB - AF[handle_add_filters called] --> COUNT{current + new > 70?} - COUNT --> |yes| CONSOLIDATE[consolidate] - CONSOLIDATE --> WAIT_PEND[wait_pending_complete] - WAIT_PEND --> CLOSE[unsubscribe_all] - CLOSE --> SINCE[since = now - 15min] - SINCE --> L1[build_announcement_filter - with since] - L1 --> L23[rebuild_layer2_and_layer3 - with since] - COUNT --> |no| SUB[Subscribe new filters] - SUB --> PB[Create PendingBatch] + TIMER[Daily timer fires] --> FRESH[fresh_start] + FRESH --> |NO disconnect metric| DONE[Full sync initiated] ``` **Key points:** -- Consolidation checked in `handle_add_filters` BEFORE adding new filters -- After closing all subscriptions, re-subscribe: - 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since - 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since -- `since = now - 15min` prevents re-fetching old events -- Keeps confirmed state, just reduces filter count +- Same as fresh_start() but WITHOUT recording disconnect/reconnect metric +- Ensures consistency, detects any drift accumulated over 24 hours -### Scenario 5: Daily Timer (23-25h Random) +### Scenario 6: Self-Subscriber Batch ```mermaid flowchart TB - DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] - CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] - CLEAR_PEND --> CLEAR_STATE[clear_sync_state] - CLEAR_STATE --> L1[build_announcement_filter - no since] - L1 --> RCA[recompute_actions_for_relay] - RCA --> CA[compute_actions with empty confirmed] - CA --> AF[AddFilters for everything] - AF --> SUB[Subscribe - no since filter] - SUB --> PB[Create PendingBatch] - PB --> EOSE[Wait for EOSE] - EOSE --> CONFIRM[Repopulate confirmed state] + EVENTS[Events from own relay] --> QUEUE[Queue to pending batch] + QUEUE --> TIMER[Batch timer fires - 5 seconds] + TIMER --> UPDATE[Update RepoSyncIndex] + UPDATE --> CA[compute_actions] + CA --> |new repos/events discovered| AF[AddFilters] + AF --> SFRE[sync_computed_filters] + SFRE --> LIVE[sync_live - L2+L3] + SFRE --> HIST[historic_sync - L2+L3] ``` **Key points:** -- Daily timer is a full fresh sync, NOT consolidation -- Clears both PendingSyncIndex and confirmed state -- Layer 1: `build_announcement_filter(None)` - full history -- Layer 2+3: via compute_actions with empty confirmed - full history -- Detects any state drift accumulated over 24 hours +- Self-subscriber monitors own relay for 30617, 1617, 1618, 1619, 1621 +- Batches events (5 second window) +- Updates RepoSyncIndex, then compute_actions finds new work +- New items flow through sync_computed_filters --- @@ -300,8 +346,6 @@ flowchart TB Transforms the repo-centric `RepoSyncIndex` into a relay-centric view. For each relay URL mentioned in any repo's announcements, collects all the repos and root events that should be synced from that relay. -**Implementation:** [`derive_relay_targets()`](../../src/sync/algorithms.rs:61) - ```rust // Conceptual: inverts repo → relays to relay → repos fn derive_relay_targets(repo_index: &HashMap) @@ -320,104 +364,262 @@ Performs a three-way diff: `target - pending - confirmed = new` Only creates `AddFilters` actions for items not already pending or confirmed. Skips disconnected relays (they will get AddFilters on reconnect). -**Implementation:** [`compute_actions()`](../../src/sync/algorithms.rs:96) +```rust +fn compute_actions( + targets: &HashMap, + pending: &PendingSyncIndex, + confirmed: &RelaySyncIndex, +) -> Vec +``` --- -## Filter Building (Three-Layer Strategy) +## Method Specifications + +### Primitives + +#### `sync_live()` - Live Subscriptions + +```rust +/// Set up live subscription (filters with limit: 0) +/// +/// - Uses `limit: 0` to receive only new events +/// - NOT tracked in PendingSyncIndex (state reconstructable) +async fn sync_live(&self, relay_url: &str, filters: &[Filter]) +``` + +#### `historic_sync()` - Historical Sync Dispatcher -The filter strategy uses three layers to ensure comprehensive event coverage: +```rust +/// Dispatch to appropriate historic sync method based on relay capabilities +/// +/// Both paths update PendingSyncIndex to ensure consistent lifecycle tracking. +async fn historic_sync( + &mut self, + relay_url: &str, + filters: Vec, + items: PendingItems, + since: Option, +) -> Option // Returns batch_id +``` + +Dispatches to: + +- `historic_sync_negentropy()` - NIP-77 parallel sync (if supported) +- `historic_sync_legacy()` - REQ+EOSE fallback + +### Building Blocks + +#### `reconstruct_filters()` - Rebuild from Confirmed State + +```rust +/// Reconstruct filters from RelaySyncIndex (confirmed state ONLY) +/// +/// Returns raw Vec for L1+L2+L3. +/// Used by: quick_reconnect, consolidate +/// Does NOT include pending items - those flow through AddFilters path. +async fn reconstruct_filters(&self, relay_url: &str) -> Vec +``` + +#### `sync_computed_filters()` - Handle New AddFilters + +```rust +/// Process AddFilters action (from compute_actions) +/// +/// Orchestrates both live and historic sync for NEW items: +/// 1. sync_live() - set up permanent L2+L3 subscriptions +/// 2. historic_sync() - catch up on past events +/// +/// This is specifically for NEW filter discovery. +async fn sync_computed_filters( + &mut self, + action: AddFilters, + since: Option, +) -> Option +``` + +### Top-Level Entry Points + +#### `fresh_start()` - Clean Slate Sync + +```rust +/// Fresh start - clears state and does full sync +/// +/// Called by: initial connect, long_reconnect, daily_sync +/// +/// Flow: +/// 1. Clear PendingSyncIndex +/// 2. Clear RelaySyncIndex +/// 3. L1 live + L1 historic (negentropy if available) +/// 4. compute_actions → AddFilters → sync_computed_filters for L2+L3 +async fn fresh_start(&mut self, relay_url: &str) +``` + +#### `quick_reconnect()` - Short Disconnection Recovery + +```rust +/// Quick reconnect - for disconnections < 15 minutes +/// +/// Flow: +/// 1. Clear PendingSyncIndex +/// 2. L1 live + L1 historic(since) +/// 3. reconstruct_filters → L2+L3 live + L2+L3 historic(since) +/// 4. compute_actions for any new items +async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) +``` + +#### `long_reconnect()` - Extended Disconnection Recovery + +```rust +/// Long reconnect - for disconnections > 15 minutes +/// +/// Flow: +/// 1. Record disconnect/reconnect metric +/// 2. fresh_start() +async fn long_reconnect(&mut self, relay_url: &str) +``` + +#### `daily_sync()` - Scheduled Full Refresh + +```rust +/// Daily sync - full refresh without disconnect metrics +/// +/// Flow: fresh_start() (no disconnect metric recorded) +async fn daily_sync(&mut self, relay_url: &str) +``` + +#### `consolidate()` - Filter Count Reduction + +```rust +/// Consolidate subscriptions when filter count exceeds threshold +/// +/// Flow: +/// 1. Clear PendingSyncIndex +/// 2. unsubscribe_all +/// 3. reconstruct_filters → sync_live only (L1+L2+L3) +/// 4. compute_actions for any new items +/// +/// NO historic sync - items already synced, just reducing subscriptions +async fn consolidate(&mut self, relay_url: &str) +``` + +#### `handle_new_sync_filters()` - New Filter Discovery + +```rust +/// Handle AddFilters action from compute_actions +/// +/// Flow: +/// 1. Check/spawn connection if needed +/// 2. maybe_consolidate (check filter threshold) +/// 3. sync_computed_filters +async fn handle_new_sync_filters(&mut self, action: AddFilters) +``` + +--- + +## Method Relationships Summary + +``` +fresh_start(relay_url) // Initial/long_reconnect/daily + ├──> Clear PendingSyncIndex + ├──> Clear RelaySyncIndex + ├──> L1: sync_live(announcement_filter) + ├──> L1: historic_sync(announcement_filter, None) + └──> compute_actions → AddFilters → sync_computed_filters (L2+L3) + +quick_reconnect(relay_url, since) // Disconnected < 15 min + ├──> Clear PendingSyncIndex + ├──> L1: sync_live(announcement_filter) + ├──> L1: historic_sync(announcement_filter, since) + ├──> reconstruct_filters() → L2+L3 filters + ├──> L2+L3: sync_live(filters) + ├──> L2+L3: historic_sync(filters, since) + └──> compute_actions → AddFilters → sync_computed_filters (new items only) + +long_reconnect(relay_url) // Disconnected > 15 min + ├──> Record disconnect/reconnect metric + └──> fresh_start() + +daily_sync(relay_url) // Timer fires + └──> fresh_start() // No disconnect metric + +consolidate(relay_url) // Filter count > threshold + ├──> Clear PendingSyncIndex + ├──> unsubscribe_all() + ├──> reconstruct_filters() → L1+L2+L3 filters + ├──> sync_live(filters) // Live only, NO historic + └──> compute_actions → AddFilters → sync_computed_filters (new items only) + +handle_new_sync_filters(action) // New filter discovery + ├──> Check/spawn connection + ├──> maybe_consolidate() + └──> sync_computed_filters(action, None) + +sync_computed_filters(action, since) // Process AddFilters + ├──> sync_live(action.filters) // L2+L3 live + └──> historic_sync(action.filters, since) // L2+L3 historic + ├── historic_sync_negentropy() // Parallel, updates Pending + └── historic_sync_legacy() // REQ+EOSE, updates Pending +``` + +--- + +## Filter Building (Three-Layer Strategy) ### Layer 1: Announcements - **Kinds**: 30617 (Repository Announcements), 30618 (Maintainer Lists) -- **When subscribed**: ONCE on connect, NOT rebuilt during consolidation -- **Function**: [`build_announcement_filter()`](../../src/sync/filters.rs:20) +- **When subscribed**: On connect (any type) - handled by connection lifecycle +- **Function**: `build_announcement_filter(since: Option)` - 30618 is ONLY synced from remote relays, not self-subscribed ### Layer 2: Events Tagging Our Repos - **Tags**: lowercase `a`, uppercase `A`, and `q` tags for comprehensive coverage - **Batching**: Per 100 repo refs -- **Function**: [`tagged_one_of_our_repo_event_filters()`](../../src/sync/filters.rs:43) +- **Function**: `build_repo_tag_filters(repos, since)` ### Layer 3: Events Tagging Our Root Events - **Tags**: lowercase `e`, uppercase `E`, and `q` tags for comprehensive coverage - **Batching**: Per 100 event IDs -- **Function**: [`tagged_one_of_our_root_event_filters()`](../../src/sync/filters.rs:98) +- **Function**: `build_root_event_tag_filters(root_events, since)` ### Combined Layer 2+3 -The [`build_layer2_and_layer3_filters()`](../../src/sync/filters.rs:152) function combines both layers. Used by: - -- `compute_actions` for incremental subscriptions -- `rebuild_layer2_and_layer3` during reconnection -- Consolidation rebuilds (Layer 1 remains active separately) +The `build_layer2_and_layer3_filters()` function combines both layers. Used by: -**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). +- `sync_computed_filters` for new item subscriptions +- `reconstruct_filters` for rebuilding from confirmed state --- -## SyncManager Key Methods - -The [`SyncManager`](../../src/sync/mod.rs:308) orchestrates all sync operations. Key methods: - -### Connection Lifecycle - -| Method | Purpose | -| ------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- | -| `handle_connect_or_reconnect()` | Unified handler for initial connect and reconnect. Determines fresh vs quick reconnect based on `last_connected` and 15-minute rule | -| `handle_disconnect()` | Updates RelayState to Disconnected, sets disconnected_at, clears pending batches, records failure in health tracker | -| `spawn_relay_connection()` | Creates RelayConnection, subscribes to Layer 1, spawns event loop task | - -### Sync Operations - -| Method | Purpose | -| ------------------------------- | ------------------------------------------------------------------------------------------------------------------- | -| `handle_add_filters()` | Auto-spawns connection if needed, checks consolidation threshold (>70 filters), subscribes and creates PendingBatch | -| `handle_eose()` | Processes EOSE for subscription, moves items from pending to confirmed when batch completes | -| `recompute_actions_for_relay()` | Runs derive_relay_targets → compute_actions for a specific relay to find new items | -| `rebuild_layer2_and_layer3()` | Rebuilds subscriptions from confirmed state with optional since filter | - -### Maintenance - -| Method | Purpose | -| --------------------- | -------------------------------------------------------------------------- | -| `daily_sync()` | Full fresh sync - unsubscribes all, clears state, recomputes actions | -| `consolidate()` | Reduces filter count by unsubscribing and rebuilding with combined filters | -| `check_disconnects()` | Periodic check for empty relays (no repos) to disconnect | -| `check_reconnects()` | Attempts reconnection for disconnected relays with pending work | +## NIP-77 Negentropy Sync ---- +### What is Negentropy? -## Self-Subscriber +NIP-77 defines the negentropy protocol for efficient event set comparison. Instead of requesting all events matching a filter (REQ+EOSE), negentropy allows relays to compare fingerprints of their event sets and only transfer the differences. -The [`SelfSubscriber`](../../src/sync/self_subscriber.rs:86) monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`. +### When Negentropy is Used -### Event Kinds Monitored +Negentropy sync is attempted for: -- **30617** - Repository Announcements (triggers discovery of repos listing our relay) -- **1617** - Patches (root events referencing repos) -- **1618** - Issues -- **1619** - Replies/Status -- **1621** - Pull Requests +- **fresh_start()** - Full sync without `since` +- **daily_sync()** - Periodic full refresh (via fresh_start) +- **long_reconnect()** - Via fresh_start -Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays. +Negentropy is NOT used for: -### Batching Flow +- **quick_reconnect()** - Uses REQ with `since` (more efficient for small gaps) +- **Live subscriptions** - Always use REQ with `limit: 0` -1. **Receive events** from own relay subscription -2. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID -3. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events -4. **Process batch**: - - Update `RepoSyncIndex` with discovered repos and root events - - Call `derive_relay_targets()` → `compute_actions()` - - Send `AddFilters` actions to SyncManager +### Fallback Behavior -### Reconnection +If negentropy fails (relay doesn't support NIP-77, network error, etc.): -Uses `last_connected` timestamp to apply since filter on reconnect (15-minute buffer), similar to external relay reconnection logic. +1. A warning is logged (once per relay to avoid spam) +2. The sync falls back to traditional REQ+EOSE +3. No error is raised - fallback is automatic --- @@ -431,12 +633,14 @@ flowchart TB end subgraph RepoSyncIndex - What We Want - RSI[HashMap: Repo to Relays+Events] + RSI[HashMap: Repo → Relays+Events] end - subgraph Derived Target - DT[derive_relay_targets fn] - TGT[Per-relay: repos + events we should sync] + subgraph Triggers + T1[Self-subscriber batch] + T2[fresh_start after L1] + T3[quick_reconnect after catchup] + T4[consolidate after live rebuild] end subgraph compute_actions - Decision Point @@ -449,136 +653,41 @@ flowchart TB subgraph RelaySyncIndex - Confirmed State RLI[RelayState per relay] - CONN[connection_status] - REPOS[repos + root_events] - TIMES[last_connected + disconnected_at] end SS -->|subscribe| OWN OWN -->|events| SS SS -->|batch fires| RSI - RSI --> DT - DT --> TGT - TGT --> CA + RSI --> T1 + T1 --> CA + T2 --> CA + T3 --> CA + T4 --> CA PSI --> CA RLI --> CA - CA -->|Layer 2+3 new items| AF[AddFilters] - AF -->|check filter count| CONSOL{count + new > 70?} - CONSOL -->|yes| CONSOLIDATE[consolidate] - CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] - L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] - CONSOL -->|no| SUB[subscribe] - AF -->|spawn if needed| CONN - SUB --> PSI - PSI -->|EOSE| REPOS - - CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] - DISC -->|any reconnect| HC[handle_connect_or_reconnect] - - subgraph handle_connect_or_reconnect - HC --> FRESH_CHECK{is_fresh_sync?} - FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] - FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since] - L1_FRESH --> RCA1[recompute_actions_for_relay] - L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since] - L23_QUICK --> RCA2[recompute_actions_for_relay] - end + CA -->|new items| AF[AddFilters] + AF --> SFRE[sync_computed_filters] + SFRE --> LIVE[sync_live L2+L3] + SFRE --> HIST[historic_sync L2+L3] + HIST --> PSI + PSI -->|EOSE| RLI ``` --- ## Key Design Decisions -| Decision | Choice | Rationale | -| -------------------------- | -------------------------------------- | --------------------------------------------------------------------------- | -| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | -| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | -| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | -| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | -| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | -| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | -| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | -| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | -| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | -| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | -| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | -| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | -| NIP-77 negentropy | Try first, fallback to REQ | Efficient set reconciliation when supported | - ---- - -## NIP-77 Negentropy Sync - -The sync system supports NIP-77 negentropy for efficient set reconciliation when syncing with external relays. - -### What is Negentropy? - -NIP-77 defines the negentropy protocol for efficient event set comparison. Instead of requesting all events matching a filter (REQ+EOSE), negentropy allows relays to compare fingerprints of their event sets and only transfer the differences. - -### When Negentropy is Used - -Negentropy sync is attempted for: - -- **Initial connect** - Fresh sync without `last_connected` -- **Daily sync** - Periodic full refresh (23-25 hour timer) -- **Stale reconnect** - Disconnected for more than 15 minutes - -Negentropy is NOT used for: - -- **Quick reconnect** - Less than 15 minutes disconnected (uses REQ with `since`) -- **Live subscriptions** - Ongoing event streams always use REQ - -### Implementation - -The [`RelayConnection`](../../src/sync/relay_connection.rs:71) now includes NIP-77 methods: - -```rust -/// Check if negentropy sync should be attempted -pub async fn supports_negentropy(&self) -> bool { - // Always returns true - we try negentropy and handle failure gracefully - true -} - -/// Perform negentropy synchronization for a filter -pub async fn negentropy_sync_filter(&self, filter: Filter) - -> Result { - // Uses nostr-sdk's client.sync() method -} -``` - -### Sync Flow with Negentropy - -```mermaid -flowchart TB - CONNECT[Connect to relay] --> NEG{Try negentropy} - NEG --> |success| L1[Layer 1 synced via negentropy] - NEG --> |failure| FALLBACK[Fall back to REQ+EOSE] - - L1 --> SINCE[Record timestamp = now] - FALLBACK --> EOSE[Wait for EOSE] - EOSE --> SINCE - - SINCE --> LIVE[Open live REQ with since=now] -``` - -### Fallback Behavior - -If negentropy fails (relay doesn't support NIP-77, network error, etc.): - -1. A warning is logged (once per relay to avoid spam) -2. The sync falls back to traditional REQ+EOSE -3. No error is raised - fallback is automatic - -**Implementation:** [`negentropy_sync_and_process()`](../../src/sync/mod.rs:1549) - -### Key Design Decisions for Negentropy - -| Decision | Choice | Rationale | -| ------------------ | --------------------------- | ------------------------------------------------- | -| Detection approach | Try and fallback | More reliable than NIP-11 document detection | -| When to use | Fresh/daily/stale sync only | Quick reconnect with `since` is already efficient | -| Error handling | Log once, fallback silently | Avoid log spam while maintaining visibility | -| Layer application | Layer 1 first | Announcements are highest priority | +| Decision | Choice | Rationale | +| ----------------------------- | ------------------------------------------- | ------------------------------------------------------------------ | +| Live vs Historic separation | Two distinct primitives | Clear responsibilities, easier reasoning about state | +| Live sync method | `limit: 0` not `since: now` | No clock dependency, deterministic, mirrors filter structure | +| Layer 1 handling | On connect, separate from AddFilters | Connection-level concern, not item-level | +| Layer 2+3 handling | Via compute_actions → sync_computed_filters | Item-level, proper pending tracking | +| Clear PendingSyncIndex | Always first | Old subscriptions are dead, must clear before any operation | +| fresh_start vs long_reconnect | Same flow, different metrics | Reuse logic, distinguish intentional refresh from failure recovery | +| Consolidation | Live only, no historic | Items already synced, just reducing subscription count | +| compute_actions role | ONLY decision point for new work | Single place to reason about what needs syncing | +| NIP-77 negentropy | Try first on full sync, fallback | Efficient for large sets, graceful degradation | --- @@ -586,8 +695,8 @@ If negentropy fails (relay doesn't support NIP-77, network error, etc.): ``` src/sync/ -├── mod.rs # SyncManager, main loop, data structures (RepoSyncNeeds, RelayState, etc.) -├── algorithms.rs # derive_relay_targets(), compute_actions(), AddFilters +├── mod.rs # SyncManager, main loop, data structures +├── algorithms.rs # derive_relay_targets(), compute_actions() ├── filters.rs # build_announcement_filter(), build_layer2_and_layer3_filters() ├── health.rs # RelayHealthTracker with exponential backoff ├── relay_connection.rs # RelayConnection, RelayEvent handling @@ -599,7 +708,7 @@ src/sync/ ## Health Tracking -The [`RelayHealthTracker`](../../src/sync/health.rs:93) manages connection health with exponential backoff: +The `RelayHealthTracker` manages connection health with exponential backoff: - **States**: Healthy, Degraded, Dead - **Backoff**: `base * 2^(failures-1)`, capped at max_backoff @@ -610,6 +719,32 @@ Bootstrap relays are never disconnected by the cleanup system, even if empty. --- +## Self-Subscriber + +The `SelfSubscriber` monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`. + +### Event Kinds Monitored + +- **30617** - Repository Announcements (triggers discovery of repos listing our relay) +- **1617** - Patches (root events referencing repos) +- **1618** - Issues +- **1619** - Replies/Status +- **1621** - Pull Requests + +Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays. + +### Batching Flow + +1. **Receive events** from own relay subscription +2. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID +3. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events +4. **Process batch**: + - Update `RepoSyncIndex` with discovered repos and root events + - Call `compute_actions()` + - Send `AddFilters` actions to SyncManager → `sync_computed_filters()` + +--- + ## Disconnect Handling The disconnect checker runs periodically (default: 60 seconds) to clean up empty relays: -- cgit v1.2.3