From b10a6cc91dab4c3d83d62fe8cb357c78f2cd4d1e Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 20:03:23 +0000 Subject: docs: sync updates to reflect changes --- docs/explanation/grasp-02-proactive-sync.md | 207 +++++++++++++++++----------- 1 file changed, 130 insertions(+), 77 deletions(-) diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index 64fa096..64193d3 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md @@ -4,9 +4,9 @@ This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles: -1. **Triggers call compute_actions → sync_computed_filters** - Self-subscriber batches and connect/reconnect events trigger this flow +1. **Two paths to AddFilters → handle_new_sync_filters** - Self-subscriber sends directly via channel; connect/reconnect uses `recompute_new_sync_filters_for_relay` 2. **Clear separation of live vs historic sync** - Two distinct primitives with different purposes -3. **Layer 1 on connect, Layer 2+3 via AddFilters** - L1 handled at connection time, L2+L3 flow through compute_actions +3. **Layer 1 on connect, Layer 2+3 via AddFilters** - L1 handled at connection time, L2+L3 flow through AddFilters 4. **Always clear PendingSyncIndex first** - Before any reconnect/consolidate operation 5. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported @@ -238,43 +238,90 @@ The sync system is built on two fundamental primitives that are clearly separate ### Layer Strategy -| Layer | Content | When Subscribed | Managed By | -| ------- | --------------------------------------- | --------------------- | -------------------- | -| Layer 1 | 30617 Announcements, 30618 Maintainers | On connect (any type) | Connection lifecycle | -| Layer 2 | Events tagging our repos (a/A/q tags) | Via AddFilters | compute_actions | -| Layer 3 | Events tagging root events (e/E/q tags) | Via AddFilters | compute_actions | +| Layer | Content | When Subscribed | Managed By | +| ------- | --------------------------------------- | --------------------- | ----------------------- | +| Layer 1 | 30617 Announcements, 30618 Maintainers | On connect (any type) | Connection lifecycle | +| Layer 2 | Events tagging our repos (a/A/q tags) | Via AddFilters | handle_new_sync_filters | +| Layer 3 | Events tagging root events (e/E/q tags) | Via AddFilters | handle_new_sync_filters | -**Key insight**: Layer 1 is connection-level (handled at connect time), Layer 2+3 are item-level (flow through compute_actions → sync_computed_filters). +**Key insight**: Layer 1 is connection-level (handled at connect time), Layer 2+3 are item-level (flow through AddFilters → handle_new_sync_filters via two paths). --- ## Triggers and Flow -### What Triggers compute_actions → sync_computed_filters? +### Two Paths to AddFilters -| Trigger | When | What Happens | -| --------------------------- | -------------------------------------- | ---------------------------------------- | -| Self-subscriber batch fires | New events discovered on own relay | Update RepoSyncIndex → compute_actions | -| fresh_start() | Initial connect, long_reconnect, daily | After L1 setup → compute_actions | -| quick_reconnect() | Reconnect < 15 minutes | After L1+L2+L3 catchup → compute_actions | -| consolidate() | Filter count > threshold | After live rebuild → compute_actions | +The system has **two independent paths** that create and process AddFilters actions: -### The Core Flow +| Source | When | Flow | +| -------------------------- | ----------------------------------- | -------------------------------------------------------------------------------- | +| Self-subscriber batch | New events discovered on own relay | Build AddFilters directly → send via channel → handle_new_sync_filters | +| Connect/reconnect triggers | fresh_start, quick_reconnect, daily | recompute_new_sync_filters_for_relay → compute_actions → handle_new_sync_filters | + +**Path 1: Self-Subscriber (direct AddFilters construction)** + +The [`SelfSubscriber::process_batch()`](src/sync/self_subscriber.rs:452) method: + +1. Updates `RepoSyncIndex` with discovered repos +2. Calls `derive_relay_targets()` to get per-relay targets +3. Builds `AddFilters` directly using `build_layer2_and_layer3_filters()` +4. Sends via `action_tx` channel to SyncManager +5. SyncManager receives via `action_rx` and calls `handle_new_sync_filters()` + +**Path 2: Connect/Reconnect (via compute_actions)** + +The [`SyncManager::recompute_new_sync_filters_for_relay()`](src/sync/mod.rs:1374) method: + +1. Calls `derive_relay_targets()` from `RepoSyncIndex` +2. Calls `compute_actions(targets, pending, confirmed)` - three-way diff +3. Calls `handle_new_sync_filters()` for each resulting AddFilters action + +### When Each Path is Used + +| Trigger | Path Used | Why | +| --------------------------- | --------------------------- | -------------------------------------------- | +| Self-subscriber batch fires | Direct (no compute_actions) | Building from scratch, no diff needed | +| fresh_start() | compute_actions | Diff against pending/confirmed state | +| quick_reconnect() | compute_actions | Check for NEW items discovered while offline | +| consolidate() | compute_actions | Check for new items during filter rebuild | + +### The Core Flow (Path 2: Connect/Reconnect) ```mermaid flowchart TB - TRIGGER[Trigger fires] --> CA[compute_actions] - CA --> |derives from| RSI[RepoSyncIndex] - CA --> |subtracts| RLI[RelaySyncIndex] + TRIGGER[Connect/Reconnect trigger] --> RECOMPUTE[recompute_new_sync_filters_for_relay] + RECOMPUTE --> DRT[derive_relay_targets] + DRT --> |derives from| RSI[RepoSyncIndex] + DRT --> CA[compute_actions] CA --> |subtracts| PSI[PendingSyncIndex] + CA --> |subtracts| RLI[RelaySyncIndex] CA --> |produces| AF[AddFilters actions] - AF --> SFRE[sync_computed_filters] - SFRE --> LIVE[sync_live - L2+L3] - SFRE --> HIST[historic_sync - L2+L3] + AF --> HNSF[handle_new_sync_filters] + HNSF --> LIVE[sync_live - L2+L3] + HNSF --> HIST[historic_sync - L2+L3] HIST --> PSI_UPDATE[Update PendingSyncIndex] PSI_UPDATE --> |EOSE received| CONFIRM[Move to RelaySyncIndex] ``` +### The Self-Subscriber Flow (Path 1: Direct) + +```mermaid +flowchart TB + EVENTS[Events from own relay] --> QUEUE[Queue to PendingUpdates] + QUEUE --> TIMER[Batch timer fires - 5 seconds] + TIMER --> PB[process_batch] + PB --> UPDATE[Update RepoSyncIndex] + UPDATE --> DRT[derive_relay_targets] + DRT --> BUILD[build_layer2_and_layer3_filters] + BUILD --> AF[Create AddFilters] + AF --> CHAN[Send via action_tx channel] + CHAN --> RX[SyncManager receives via action_rx] + RX --> HNSF[handle_new_sync_filters] + HNSF --> LIVE[sync_live - L2+L3] + HNSF --> HIST[historic_sync - L2+L3] +``` + --- ## Flow Scenarios @@ -302,12 +349,13 @@ flowchart TB L1_HIST --> NEG{NIP-77 supported?} NEG --> |yes| NEGENTROPY[negentropy sync] NEG --> |no| REQ[REQ+EOSE] - NEGENTROPY --> CA[compute_actions] - REQ --> CA + NEGENTROPY --> RECOMPUTE[recompute_new_sync_filters_for_relay] + REQ --> RECOMPUTE + RECOMPUTE --> CA[compute_actions] CA --> |empty RelaySyncIndex| AF[AddFilters for ALL repos] - AF --> SFRE[sync_computed_filters] - SFRE --> L23_LIVE[L2+L3: sync_live] - SFRE --> L23_HIST[L2+L3: historic_sync] + AF --> HNSF[handle_new_sync_filters] + HNSF --> L23_LIVE[L2+L3: sync_live] + HNSF --> L23_HIST[L2+L3: historic_sync] L23_HIST --> PB[Create PendingBatch] PB --> EOSE[Wait for EOSE] EOSE --> CONFIRM[Move items to RelaySyncIndex] @@ -318,7 +366,7 @@ flowchart TB - Always clear PendingSyncIndex first, then RelaySyncIndex - L1 live + L1 historic (uses negentropy if available) - Empty RelaySyncIndex means diff produces AddFilters for everything -- L2+L3 flow through sync_computed_filters with proper pending tracking +- L2+L3 flow through `recompute_new_sync_filters_for_relay` → `handle_new_sync_filters` with proper pending tracking ### Scenario 2: Quick Reconnect (< 15 minutes) @@ -348,11 +396,12 @@ flowchart TB L1_HIST --> RECON[reconstruct_filters from RelaySyncIndex] RECON --> L23_LIVE[L2+L3: sync_live] RECON --> L23_HIST[L2+L3: historic_sync WITH since] - L23_HIST --> CA[compute_actions] + L23_HIST --> RECOMPUTE[recompute_new_sync_filters_for_relay] + RECOMPUTE --> CA[compute_actions] CA --> |check for new items| AF{New items?} - AF --> |yes| SFRE[sync_computed_filters] + AF --> |yes| HNSF[handle_new_sync_filters] AF --> |no| DONE[Done] - SFRE --> PB[Create PendingBatch] + HNSF --> PB[Create PendingBatch] ``` **Key points:** @@ -361,7 +410,7 @@ flowchart TB - L1 live (always on any connection) - L1 historic WITH since (catches up missed announcements) - L2+L3 rebuilt from RelaySyncIndex (confirmed state preserved) -- compute_actions checks for any NEW items discovered during catchup +- `recompute_new_sync_filters_for_relay` → `compute_actions` checks for any NEW items discovered during catchup ### Scenario 3: Long Reconnect (> 15 minutes) @@ -388,9 +437,10 @@ flowchart TB UNSUB --> RECON[reconstruct_filters from RelaySyncIndex] RECON --> L1_LIVE[L1: sync_live] RECON --> L23_LIVE[L2+L3: sync_live] - L23_LIVE --> CA[compute_actions] + L23_LIVE --> RECOMPUTE[recompute_new_sync_filters_for_relay] + RECOMPUTE --> CA[compute_actions] CA --> |check for new items| AF{New items?} - AF --> |yes| SFRE[sync_computed_filters] + AF --> |yes| HNSF[handle_new_sync_filters] AF --> |no| DONE[Done] THRESHOLD --> |no| SKIP[Continue normally] ``` @@ -400,7 +450,7 @@ flowchart TB - Clear PendingSyncIndex first - NO historic sync needed - items already synced/syncing - Only rebuilds live subscriptions from confirmed state -- compute_actions catches any new items that need syncing +- `recompute_new_sync_filters_for_relay` → `compute_actions` catches any new items that need syncing ### Scenario 5: Daily Sync (23-25h Random Timer) @@ -419,22 +469,27 @@ flowchart TB ```mermaid flowchart TB - EVENTS[Events from own relay] --> QUEUE[Queue to pending batch] + EVENTS[Events from own relay] --> QUEUE[Queue to PendingUpdates] QUEUE --> TIMER[Batch timer fires - 5 seconds] - TIMER --> UPDATE[Update RepoSyncIndex] - UPDATE --> CA[compute_actions] - CA --> |new repos/events discovered| AF[AddFilters] - AF --> SFRE[sync_computed_filters] - SFRE --> LIVE[sync_live - L2+L3] - SFRE --> HIST[historic_sync - L2+L3] + TIMER --> PB[process_batch] + PB --> UPDATE[Update RepoSyncIndex] + UPDATE --> DRT[derive_relay_targets] + DRT --> BUILD[build_layer2_and_layer3_filters] + BUILD --> AF[Create AddFilters directly] + AF --> CHAN[Send via action_tx channel] + CHAN --> RX[SyncManager receives] + RX --> HNSF[handle_new_sync_filters] + HNSF --> LIVE[sync_live - L2+L3] + HNSF --> HIST[historic_sync - L2+L3] ``` **Key points:** -- Self-subscriber monitors own relay for 30617, 1617, 1618, 1619, 1621 -- Batches events (5 second window) -- Updates RepoSyncIndex, then compute_actions finds new work -- New items flow through sync_computed_filters +- Self-subscriber monitors own relay for 30617, 1617, 1618, 1621 (NOT 1619 or 30618) +- Batches events in `PendingUpdates` (5 second window via interval timer) +- `process_batch()` updates RepoSyncIndex, then builds AddFilters **directly** (no compute_actions) +- AddFilters sent via channel to SyncManager, which calls `handle_new_sync_filters()` +- This path does NOT use compute_actions because it's building fresh filters from the updated index --- @@ -508,21 +563,19 @@ Dispatches to: ### Building Blocks -#### `sync_computed_filters()` - Handle New AddFilters +#### `handle_new_sync_filters()` - Handle New AddFilters ```rust -/// Process AddFilters action (from compute_actions) +/// Handle AddFilters action (from self-subscriber channel OR compute_actions) /// /// Orchestrates both live and historic sync for NEW items: -/// 1. sync_live() - set up permanent L2+L3 subscriptions -/// 2. historic_sync() - catch up on past events +/// 1. Check/spawn connection if needed (for unknown relays) +/// 2. maybe_consolidate() - check filter threshold +/// 3. sync_live() - set up permanent L2+L3 subscriptions +/// 4. historic_sync() - catch up on past events /// -/// This is specifically for NEW filter discovery. -async fn sync_computed_filters( - &mut self, - action: AddFilters, - since: Option, -) -> Option +/// This is the SINGLE entry point for processing AddFilters from BOTH paths. +async fn handle_new_sync_filters(&mut self, action: AddFilters) ``` ### Top-Level Entry Points @@ -538,7 +591,7 @@ async fn sync_computed_filters( /// 1. Clear PendingSyncIndex /// 2. Clear RelaySyncIndex /// 3. L1 live + L1 historic (negentropy if available) -/// 4. compute_actions → AddFilters → sync_computed_filters for L2+L3 +/// 4. recompute_new_sync_filters_for_relay → compute_actions → handle_new_sync_filters for L2+L3 async fn fresh_start(&mut self, relay_url: &str) ``` @@ -598,7 +651,7 @@ async fn consolidate(&mut self, relay_url: &str) /// Flow: /// 1. Check/spawn connection if needed /// 2. maybe_consolidate (check filter threshold) -/// 3. sync_computed_filters +/// 3. recompute_new_sync_filters_for_relay async fn handle_new_sync_filters(&mut self, action: AddFilters) ``` @@ -612,7 +665,7 @@ fresh_start(relay_url) // Initial/long_reconnect/daily ├──> Clear RelaySyncIndex ├──> L1: sync_live(announcement_filter) ├──> L1: historic_sync(announcement_filter, None) - └──> compute_actions → AddFilters → sync_computed_filters (L2+L3) + └──> compute_actions → AddFilters → recompute_new_sync_filters_for_relay (L2+L3) quick_reconnect(relay_url, since) // Disconnected < 15 min ├──> Clear PendingSyncIndex @@ -621,7 +674,7 @@ quick_reconnect(relay_url, since) // Disconnected < 15 min ├──> reconstruct_filters() → L2+L3 filters ├──> L2+L3: sync_live(filters) ├──> L2+L3: historic_sync(filters, since) - └──> compute_actions → AddFilters → sync_computed_filters (new items only) + └──> compute_actions → AddFilters → recompute_new_sync_filters_for_relay (new items only) long_reconnect(relay_url) // Disconnected > 15 min ├──> Record disconnect/reconnect metric @@ -635,14 +688,14 @@ consolidate(relay_url) // Filter count > threshold ├──> unsubscribe_all() ├──> reconstruct_filters() → L1+L2+L3 filters ├──> sync_live(filters) // Live only, NO historic - └──> compute_actions → AddFilters → sync_computed_filters (new items only) + └──> compute_actions → AddFilters → recompute_new_sync_filters_for_relay (new items only) handle_new_sync_filters(action) // New filter discovery ├──> Check/spawn connection ├──> maybe_consolidate() - └──> sync_computed_filters(action, None) + └──> recompute_new_sync_filters_for_relay(action, None) -sync_computed_filters(action, since) // Process AddFilters +recompute_new_sync_filters_for_relay(action, since) // Process AddFilters ├──> sync_live(action.filters) // L2+L3 live └──> historic_sync(action.filters, since) // L2+L3 historic ├── historic_sync_negentropy() // Parallel, updates Pending @@ -676,7 +729,7 @@ sync_computed_filters(action, since) // Process AddFilters The `build_layer2_and_layer3_filters()` function combines both layers. Used by: -- `sync_computed_filters` for new item subscriptions +- `recompute_new_sync_filters_for_relay` for new item subscriptions - `reconstruct_filters` for rebuilding from confirmed state --- @@ -802,7 +855,7 @@ flowchart TB PSI --> CA RLI --> CA CA -->|new items| AF[AddFilters] - AF --> SFRE[sync_computed_filters] + AF --> SFRE[recompute_new_sync_filters_for_relay] SFRE --> LIVE[sync_live L2+L3] SFRE --> HIST[historic_sync L2+L3] HIST --> PSI @@ -813,17 +866,17 @@ flowchart TB ## Key Design Decisions -| Decision | Choice | Rationale | -| ----------------------------- | ------------------------------------------- | ------------------------------------------------------------------ | -| Live vs Historic separation | Two distinct primitives | Clear responsibilities, easier reasoning about state | -| Live sync method | `limit: 0` not `since: now` | No clock dependency, deterministic, mirrors filter structure | -| Layer 1 handling | On connect, separate from AddFilters | Connection-level concern, not item-level | -| Layer 2+3 handling | Via compute_actions → sync_computed_filters | Item-level, proper pending tracking | -| Clear PendingSyncIndex | Always first | Old subscriptions are dead, must clear before any operation | -| fresh_start vs long_reconnect | Same flow, different metrics | Reuse logic, distinguish intentional refresh from failure recovery | -| Consolidation | Live only, no historic | Items already synced, just reducing subscription count | -| compute_actions role | ONLY decision point for new work | Single place to reason about what needs syncing | -| NIP-77 negentropy | Try first on full sync, fallback | Efficient for large sets, graceful degradation | +| Decision | Choice | Rationale | +| ----------------------------- | ---------------------------------------------------------- | ------------------------------------------------------------------ | +| Live vs Historic separation | Two distinct primitives | Clear responsibilities, easier reasoning about state | +| Live sync method | `limit: 0` not `since: now` | No clock dependency, deterministic, mirrors filter structure | +| Layer 1 handling | On connect, separate from AddFilters | Connection-level concern, not item-level | +| Layer 2+3 handling | Via compute_actions → recompute_new_sync_filters_for_relay | Item-level, proper pending tracking | +| Clear PendingSyncIndex | Always first | Old subscriptions are dead, must clear before any operation | +| fresh_start vs long_reconnect | Same flow, different metrics | Reuse logic, distinguish intentional refresh from failure recovery | +| Consolidation | Live only, no historic | Items already synced, just reducing subscription count | +| compute_actions role | ONLY decision point for new work | Single place to reason about what needs syncing | +| NIP-77 negentropy | Try first on full sync, fallback | Efficient for large sets, graceful degradation | --- @@ -877,7 +930,7 @@ Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote 4. **Process batch**: - Update `RepoSyncIndex` with discovered repos and root events - Call `compute_actions()` - - Send `AddFilters` actions to SyncManager → `sync_computed_filters()` + - Send `AddFilters` actions to SyncManager → `recompute_new_sync_filters_for_relay()` --- -- cgit v1.2.3