From 777509bb03f5233b3ef470329d23cec371baf53c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 22 Dec 2025 15:38:54 +0000 Subject: docs: proactive sync hand written overview rewrite and AI update of rest --- docs/explanation/grasp-02-proactive-sync.md | 402 +++++++++++----------------- 1 file changed, 160 insertions(+), 242 deletions(-) (limited to 'docs/explanation/grasp-02-proactive-sync.md') diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index 64193d3..f13050e 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md @@ -2,17 +2,44 @@ ## Overview -This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles: +Proactively Sync Nostr Events from other relays listed in accepted repository announcements. + +Features: + +- Fetches all repository announcements from connected relays to discover new repos listing our service +- Discovers and dynamically connects to new relays listed by repository announcements we have accepted (with optional bootstrap relay to get started) +- Fetches events tagging repositories we are interested in, as well as events tagging Issues, Patches and PRs of these repositories +- Supports live sync and historic sync (tries NIP-77 negentropy but falls back to REQ+EOSE with 'until' based pagination) +- Plays nicely with other relays - connection backoff and rate-limiting detection with cooldown +- Does a full reconciliation daily +- Prometheus metrics + +Key Architectural Points: + +- **Simple data model** for tracking target, pending and actual filter state against relays +- **Self-subscription** enables a deduplicated feed of all accepted events which leads to an updated target sync state +- **Clear separation** between Live sync (using `limit:0`) and Historic Sync (handled via negentropy falling back to REQ+EOSE with 'until' based pagination support) +- **Discovery management**: The nature of discovery inherently leads to a drip feed of root_events (e.g., Repo Announcements, Issues, Patches and PRs) that require additional subscriptions. Without careful management this can lead to large numbers of subscriptions and potentially rate limiting. Mitigation strategies: + - Self-subscriber waits for 5s to batch updates before creating new filters / subscriptions, allowing time for most events to be received from outstanding subscriptions from connected relays + - PendingBatch tracks each new set of filters that may require pagination until they are complete + - Avoid long awaits - recompute desired filters when connection is established to ensure filters are as consolidated as possible + - Consolidation function ensures number of live_sync subscriptions don't reach rate-limiting limits (threshold: 70 filters) +- **Quick Reconnect** (< 15mins) - doesn't do a full reconciliation vs fresh start (longer disconnect or relaunch binary) +- **Background timers** handle relay connection health and metrics, handling reconnects after backoff and recovery after rate-limiting + +Sections: + +- Data Model +- Connection Lifecycle +- Live vs Historic Sync +- Triggers and Flow +- Background Tasks -1. **Two paths to AddFilters → handle_new_sync_filters** - Self-subscriber sends directly via channel; connect/reconnect uses `recompute_new_sync_filters_for_relay` -2. **Clear separation of live vs historic sync** - Two distinct primitives with different purposes -3. **Layer 1 on connect, Layer 2+3 via AddFilters** - L1 handled at connection time, L2+L3 flow through AddFilters -4. **Always clear PendingSyncIndex first** - Before any reconnect/consolidate operation -5. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported +## Data Model ---- +The state of which relays we want to connect to, the progress of historic sync, and the active live filters is captures in this simple data model. -## Data Model +This state starts afresh when the binary loads. ### RepoSyncIndex (Source of Truth) @@ -26,7 +53,7 @@ pub type RepoSyncIndex = Arc>>; pub struct RepoSyncNeeds { /// Relay URLs listed in this repo's 30617 announcement pub relays: HashSet, - /// Root event IDs - 1617/1618/1619/1621 - that reference this repo + /// Root event IDs - 1617/1618/1621 - that reference this repo pub root_events: HashSet, } ``` @@ -180,13 +207,13 @@ stateDiagram-v2 ### Connection Flow Methods -| Method | Purpose | When Called | Actions | -| ------------------------------- | ------------------------- | ------------------------------- | --------------------------------------------------------------- | -| `register_relay()` | Initialize relay tracking | Discovery via RepoSyncIndex | Creates RelayConnection, stores in HashMap, returns immediately | -| `try_connect_relay()` | Attempt connection | Periodic retry (500ms) | Calls connect_and_subscribe, sends notification on success | -| `handle_connect_or_reconnect()` | Setup after connection | ConnectNotification received | Spawns event loop, updates state, decides sync strategy | -| `handle_disconnect()` | Cleanup after disconnect | DisconnectNotification received | Updates state, clears pending, KEEPS RelayConnection | -| `retry_disconnected_relays()` | Periodic reconnection | Every 500ms | For each ready relay: try_connect_relay() | +| Method | Purpose | When Called | Actions | +| ------------------------------- | ------------------------- | --------------------------------- | --------------------------------------------------------------- | +| `register_relay()` | Initialize relay tracking | Discovery via RepoSyncIndex | Creates RelayConnection, stores in HashMap, returns immediately | +| `try_connect_relay()` | Attempt connection | Health tracker allows retry | Calls connection.connect(), sends notification on success | +| `handle_connect_or_reconnect()` | Setup after connection | ConnectNotification received | Spawns event loop, updates state, decides sync strategy | +| `handle_disconnect()` | Cleanup after disconnect | DisconnectNotification received | Updates state, clears pending, KEEPS RelayConnection | +| `retry_disconnected_relays()` | Periodic reconnection | Every 2s (health & metrics timer) | For each ready relay: try_connect_relay() | ### Event Loop Lifecycle @@ -212,6 +239,53 @@ flowchart LR --- +## Background Tasks + +The sync system uses three background tasks that run continuously: + +### 1. Daily Timer (`run_daily_timer`) + +**Purpose**: Periodic full reconciliation to detect state drift + +**Interval**: Random 23-25 hours (prevents thundering herd) + +**Actions**: + +- Triggers `daily_sync()` for all connected relays +- Same as `fresh_start()` but without recording disconnect metrics +- Ensures consistency over time + +### 2. Health and Metrics Checker (`run_health_and_metrics_checker`) + +**Purpose**: Combined health management and metrics updates + +**Interval**: 2 seconds + +**Actions**: + +1. **Disconnect checking**: Calls `check_disconnects()` to remove relays with no repos/events (except bootstrap) +2. **Retry disconnected**: Calls `retry_disconnected_relays()` to attempt reconnection per health tracker backoff +3. **Rate limit recovery**: Calls `check_rate_limit_recovery()` to clear expired rate limits +4. **Metrics update**: Updates Prometheus metrics with current health states + +**Why combined**: The 2-second interval provides good responsiveness for health changes while minimizing overhead. All operations are lightweight (index checks, no I/O except actual connection attempts). + +### 3. Self-Subscriber (`SelfSubscriber::run`) + +**Purpose**: Monitor own relay for repository announcements and root events + +**Subscribed kinds**: 30617, 1617, 1618, 1621 (NOT 30618) + +**Batching**: 5-second window (configurable via `NGIT_SYNC_BATCH_WINDOW_MS`) + +**Flow**: + +1. Queue events to `PendingUpdates` +2. Timer fires (interval, does not reset on events) +3. Process batch: update RepoSyncIndex → derive targets → send AddFilters to SyncManager + +--- + ## Core Architecture: Live vs Historic Sync The sync system is built on two fundamental primitives that are clearly separated: @@ -223,19 +297,6 @@ The sync system is built on two fundamental primitives that are clearly separate | `sync_live()` | Ongoing event stream | `limit: 0` | Not tracked | | `historic_sync()` | Catch up on past events | Optional `since` | PendingSyncIndex | -### Why `limit: 0` for Live Sync? - -| Approach | Pros | Cons | -| ------------ | --------------------------------------- | --------------------------------- | -| `since: now` | Intuitive | Time-sensitive, clock skew issues | -| `limit: 0` | Deterministic, mirrors filter structure | Less intuitive name | - -`limit: 0` is better because: - -1. **No time dependency**: Doesn't depend on synchronized clocks -2. **Mirrors historic filters**: Same tag structure, just different limit -3. **State reconstruction**: Can rebuild from repo/event lists without timestamps - ### Layer Strategy | Layer | Content | When Subscribed | Managed By | @@ -261,7 +322,7 @@ The system has **two independent paths** that create and process AddFilters acti **Path 1: Self-Subscriber (direct AddFilters construction)** -The [`SelfSubscriber::process_batch()`](src/sync/self_subscriber.rs:452) method: +The [`SelfSubscriber::process_batch()`](src/sync/self_subscriber.rs:448) method: 1. Updates `RepoSyncIndex` with discovered repos 2. Calls `derive_relay_targets()` to get per-relay targets @@ -271,7 +332,7 @@ The [`SelfSubscriber::process_batch()`](src/sync/self_subscriber.rs:452) method: **Path 2: Connect/Reconnect (via compute_actions)** -The [`SyncManager::recompute_new_sync_filters_for_relay()`](src/sync/mod.rs:1374) method: +The `SyncManager::recompute_new_sync_filters_for_relay()` method: 1. Calls `derive_relay_targets()` from `RepoSyncIndex` 2. Calls `compute_actions(targets, pending, confirmed)` - three-way diff @@ -527,183 +588,37 @@ fn compute_actions( --- -## Method Specifications - -### Primitives - -#### `sync_live()` - Live Subscriptions - -```rust -/// Set up live subscription (filters with limit: 0) -/// -/// - Uses `limit: 0` to receive only new events -/// - NOT tracked in PendingSyncIndex (state reconstructable) -async fn sync_live(&self, relay_url: &str, filters: &[Filter]) -``` - -#### `historic_sync()` - Historical Sync Dispatcher - -```rust -/// Dispatch to appropriate historic sync method based on relay capabilities -/// -/// Both paths update PendingSyncIndex to ensure consistent lifecycle tracking. -async fn historic_sync( - &mut self, - relay_url: &str, - filters: Vec, - items: PendingItems, - since: Option, -) -> Option // Returns batch_id -``` - -Dispatches to: - -- `historic_sync_negentropy()` - NIP-77 parallel sync (if supported) - no pagination needed -- `historic_sync_legacy()` - REQ+EOSE fallback with automatic pagination for large result sets +## Key Implementation Methods -### Building Blocks +### Connection Lifecycle -#### `handle_new_sync_filters()` - Handle New AddFilters +- **`register_relay()`**: Creates RelayConnection object, stores in HashMap, returns immediately +- **`try_connect_relay()`**: Attempts connection using `connection.connect()` with timeout +- **`handle_connect_or_reconnect()`**: Spawns event loop, updates state, decides sync strategy (fresh_start/quick_reconnect) +- **`handle_disconnect()`**: Updates state to Disconnected, clears pending batches, keeps RelayConnection object +- **`retry_disconnected_relays()`**: Called every 2s, retries relays that pass health tracker checks -```rust -/// Handle AddFilters action (from self-subscriber channel OR compute_actions) -/// -/// Orchestrates both live and historic sync for NEW items: -/// 1. Check/spawn connection if needed (for unknown relays) -/// 2. maybe_consolidate() - check filter threshold -/// 3. sync_live() - set up permanent L2+L3 subscriptions -/// 4. historic_sync() - catch up on past events -/// -/// This is the SINGLE entry point for processing AddFilters from BOTH paths. -async fn handle_new_sync_filters(&mut self, action: AddFilters) -``` +### Sync Entry Points -### Top-Level Entry Points +- **`fresh_start()`**: Full sync - clears all state, L1 historic (with negentropy if available), then L2+L3 via recompute +- **`quick_reconnect()`**: Incremental sync - preserves confirmed state, L1 historic with `since`, L2+L3 rebuild with `since`, then recompute for new items +- **`daily_sync()`**: Wrapper around `fresh_start()` without disconnect metrics +- **`consolidate()`**: Reduces filter count - clears pending, unsubscribes all, rebuilds live subscriptions only, then recompute for new items -#### `fresh_start()` - Clean Slate Sync - -```rust -/// Fresh start - clears state and does full sync -/// -/// Called by: initial connect, long_reconnect, daily_sync -/// -/// Flow: -/// 1. Clear PendingSyncIndex -/// 2. Clear RelaySyncIndex -/// 3. L1 live + L1 historic (negentropy if available) -/// 4. recompute_new_sync_filters_for_relay → compute_actions → handle_new_sync_filters for L2+L3 -async fn fresh_start(&mut self, relay_url: &str) -``` - -#### `quick_reconnect()` - Short Disconnection Recovery - -```rust -/// Quick reconnect - for disconnections < 15 minutes -/// -/// Flow: -/// 1. Clear PendingSyncIndex -/// 2. L1 live + L1 historic(since) -/// 3. reconstruct_filters → L2+L3 live + L2+L3 historic(since) -/// 4. compute_actions for any new items -async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) -``` - -#### `long_reconnect()` - Extended Disconnection Recovery - -```rust -/// Long reconnect - for disconnections > 15 minutes -/// -/// Flow: -/// 1. Record disconnect/reconnect metric -/// 2. fresh_start() -async fn long_reconnect(&mut self, relay_url: &str) -``` - -#### `daily_sync()` - Scheduled Full Refresh - -```rust -/// Daily sync - full refresh without disconnect metrics -/// -/// Flow: fresh_start() (no disconnect metric recorded) -async fn daily_sync(&mut self, relay_url: &str) -``` +### Sync Primitives -#### `consolidate()` - Filter Count Reduction +- **`sync_live()`**: Creates subscriptions with `limit: 0` for ongoing event stream (not tracked in PendingSyncIndex) +- **`historic_sync()`**: Dispatches to negentropy or REQ+EOSE based on relay capability, creates PendingBatch, returns batch_id -```rust -/// Consolidate subscriptions when filter count exceeds threshold -/// -/// Flow: -/// 1. Clear PendingSyncIndex -/// 2. unsubscribe_all -/// 3. reconstruct_filters → sync_live only (L1+L2+L3) -/// 4. compute_actions for any new items -/// -/// NO historic sync - items already synced, just reducing subscriptions -async fn consolidate(&mut self, relay_url: &str) -``` +### Filter Processing -#### `handle_new_sync_filters()` - New Filter Discovery - -```rust -/// Handle AddFilters action from compute_actions -/// -/// Flow: -/// 1. Check/spawn connection if needed -/// 2. maybe_consolidate (check filter threshold) -/// 3. recompute_new_sync_filters_for_relay -async fn handle_new_sync_filters(&mut self, action: AddFilters) -``` +- **`handle_new_sync_filters()`**: Single entry point for AddFilters from both paths (self-subscriber OR recompute), orchestrates live+historic sync +- **`recompute_new_sync_filters_for_relay()`**: Calls derive_relay_targets → compute_actions → handle_new_sync_filters for each resulting action --- ## Method Relationships Summary -``` -fresh_start(relay_url) // Initial/long_reconnect/daily - ├──> Clear PendingSyncIndex - ├──> Clear RelaySyncIndex - ├──> L1: sync_live(announcement_filter) - ├──> L1: historic_sync(announcement_filter, None) - └──> compute_actions → AddFilters → recompute_new_sync_filters_for_relay (L2+L3) - -quick_reconnect(relay_url, since) // Disconnected < 15 min - ├──> Clear PendingSyncIndex - ├──> L1: sync_live(announcement_filter) - ├──> L1: historic_sync(announcement_filter, since) - ├──> reconstruct_filters() → L2+L3 filters - ├──> L2+L3: sync_live(filters) - ├──> L2+L3: historic_sync(filters, since) - └──> compute_actions → AddFilters → recompute_new_sync_filters_for_relay (new items only) - -long_reconnect(relay_url) // Disconnected > 15 min - ├──> Record disconnect/reconnect metric - └──> fresh_start() - -daily_sync(relay_url) // Timer fires - └──> fresh_start() // No disconnect metric - -consolidate(relay_url) // Filter count > threshold - ├──> Clear PendingSyncIndex - ├──> unsubscribe_all() - ├──> reconstruct_filters() → L1+L2+L3 filters - ├──> sync_live(filters) // Live only, NO historic - └──> compute_actions → AddFilters → recompute_new_sync_filters_for_relay (new items only) - -handle_new_sync_filters(action) // New filter discovery - ├──> Check/spawn connection - ├──> maybe_consolidate() - └──> recompute_new_sync_filters_for_relay(action, None) - -recompute_new_sync_filters_for_relay(action, since) // Process AddFilters - ├──> sync_live(action.filters) // L2+L3 live - └──> historic_sync(action.filters, since) // L2+L3 historic - ├── historic_sync_negentropy() // Parallel, updates Pending - └── historic_sync_legacy() // REQ+EOSE, updates Pending -``` - ---- - ## Filter Building (Three-Layer Strategy) ### Layer 1: Announcements @@ -746,7 +661,6 @@ Negentropy sync is attempted for: - **fresh_start()** - Full sync without `since` - **daily_sync()** - Periodic full refresh (via fresh_start) -- **long_reconnect()** - Via fresh_start Negentropy is NOT used for: @@ -864,22 +778,6 @@ flowchart TB --- -## Key Design Decisions - -| Decision | Choice | Rationale | -| ----------------------------- | ---------------------------------------------------------- | ------------------------------------------------------------------ | -| Live vs Historic separation | Two distinct primitives | Clear responsibilities, easier reasoning about state | -| Live sync method | `limit: 0` not `since: now` | No clock dependency, deterministic, mirrors filter structure | -| Layer 1 handling | On connect, separate from AddFilters | Connection-level concern, not item-level | -| Layer 2+3 handling | Via compute_actions → recompute_new_sync_filters_for_relay | Item-level, proper pending tracking | -| Clear PendingSyncIndex | Always first | Old subscriptions are dead, must clear before any operation | -| fresh_start vs long_reconnect | Same flow, different metrics | Reuse logic, distinguish intentional refresh from failure recovery | -| Consolidation | Live only, no historic | Items already synced, just reducing subscription count | -| compute_actions role | ONLY decision point for new work | Single place to reason about what needs syncing | -| NIP-77 negentropy | Try first on full sync, fallback | Efficient for large sets, graceful degradation | - ---- - ## Module Structure ``` @@ -897,50 +795,70 @@ src/sync/ ## Health Tracking -The `RelayHealthTracker` manages connection health with exponential backoff: +The [`RelayHealthTracker`](src/sync/health.rs:209) manages connection health with exponential backoff and state transitions: -- **States**: Healthy, Degraded, Dead -- **Backoff**: `base * 2^(failures-1)`, capped at max_backoff +### Health States + +1. **Healthy**: Working connection, no recent failures, proven stable (past 5-minute stability period) +2. **Disconnected**: Not currently connected, but no recent failures or issues +3. **Degraded**: Connection problems (actively failing to connect) OR recently recovered but not yet stable +4. **Dead**: 24+ hours of continuous failures, minimal retry (once per 24 hours) +5. **RateLimited**: Rate limited by relay, 65-second cooldown active + +### State Transitions + +``` +Healthy <-> Disconnected: Normal connection/disconnection +Disconnected -> Degraded: Connection failure +Degraded -> Dead: 24h+ of continuous failures +Degraded -> Disconnected: Recovery (enters 5min stability period) +Disconnected -> Healthy: Stable for 5 minutes after recovery +Any -> RateLimited: NOTICE message from relay indicating rate limiting +RateLimited -> previous state: After 65-second cooldown expires +``` + +### Backoff Configuration + +- **Formula**: `base_backoff * 2^(failures-1)`, capped at `max_backoff` +- **Default base**: 5 seconds (configurable via `sync_base_backoff_secs`) +- **Default max**: 1 hour (configurable via `sync_max_backoff_secs`) - **Dead threshold**: 24 hours of continuous failures -- **Dead relay retry**: Once per 24 hours +- **Dead retry interval**: Once per 24 hours +- **Rate limit cooldown**: Fixed 65 seconds (60s typical limit + 5s buffer) +- **Stability period**: 5 minutes after recovery before marking as Healthy + +### Special Behaviors -Bootstrap relays are never disconnected by the cleanup system, even if empty. +- **Bootstrap relays**: Never disconnected by cleanup system, even if empty +- **Rate limiting**: Distinct from connection failures - triggered by relay NOTICE messages +- **Connection timeout**: Set to `base_backoff_secs` to ensure retry timing works correctly --- -## Self-Subscriber +## Prometheus Metrics -The `SelfSubscriber` monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`. +The [`SyncMetrics`](src/sync/metrics.rs:18) module provides comprehensive monitoring via Prometheus: -### Event Kinds Monitored +### Connection Metrics -- **30617** - Repository Announcements (triggers discovery of repos listing our relay) -- **1617** - Patches (root events referencing repos) -- **1618** - Issues -- **1619** - Replies/Status -- **1621** - Pull Requests +- `ngit_sync_relay_connected`: Per-relay connection status (1=connected, 0=disconnected) +- `ngit_sync_connection_attempts_total`: Total connection attempts by relay and result (success/failure) -Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays. +### Health Metrics -### Batching Flow +- `ngit_sync_relay_status`: Per-relay health status (1=healthy, 2=disconnected, 3=degraded, 4=dead, 5=rate_limited) +- `ngit_sync_relay_failures`: Consecutive failure count per relay -1. **Receive events** from own relay subscription -2. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID -3. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events -4. **Process batch**: - - Update `RepoSyncIndex` with discovered repos and root events - - Call `compute_actions()` - - Send `AddFilters` actions to SyncManager → `recompute_new_sync_filters_for_relay()` +### Event Metrics ---- +- `ngit_sync_events_synced_total`: Total events synced (newly saved events only, not duplicates or rejected) -## Disconnect Handling +### Summary Metrics -The disconnect checker runs periodically (default: 60 seconds) to clean up empty relays: +- `ngit_sync_relays_tracked_total`: Total number of relays discovered and tracked +- `ngit_sync_relays_connected_total`: Number of currently connected relays +- `ngit_sync_relays_dead_total`: Number of relays marked as dead -- Finds relays with `repos.is_empty() && root_events.is_empty()` -- Skips bootstrap relays (`is_bootstrap == true`) -- Removes from relay_sync_index, pending_sync_index, and connections -- Disconnects the WebSocket connection +All metrics follow the `ngit_sync_` prefix convention and are updated by the health and metrics checker every 2 seconds. -Also triggers reconnection attempts for disconnected relays that have pending work. +--- -- cgit v1.2.3