From 4e5a937a4ef5288e702ba2bae3daf2a78398b690 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 14:18:05 +0000 Subject: fix docs --- docs/explanation/grasp-02-proactive-sync-v4.md | 314 +++++++++++++------------ 1 file changed, 160 insertions(+), 154 deletions(-) (limited to 'docs') diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md index 5ac92cd..dd508b3 100644 --- a/docs/explanation/grasp-02-proactive-sync-v4.md +++ b/docs/explanation/grasp-02-proactive-sync-v4.md @@ -64,8 +64,6 @@ pub struct RelayState { pub last_connected: Option, /// When we disconnected - for 15-minute state retention rule pub disconnected_at: Option, - /// The active connection - None if disconnected - pub connection: Option, } impl RelayState { @@ -79,7 +77,7 @@ impl RelayState { None => false, // Still connected or never connected } } - + /// Clear repos and root_events - called when reconnect takes > 15 minutes pub fn clear_sync_state(&mut self) { self.repos.clear(); @@ -125,7 +123,7 @@ stateDiagram-v2 Connecting --> Disconnected: failure + record in health tracker Connected --> Disconnected: connection lost Connected --> [*]: intentional disconnect via check_disconnects - + note right of Disconnected: disconnected_at set for 15min rule note right of Connected: last_connected tracked for since filter ``` @@ -148,13 +146,13 @@ flowchart TB SPAWN --> |no| CONN[spawn_connection] CONN --> HC[handle_connect_or_reconnect] SPAWN --> |yes| SUB - + subgraph handle_connect_or_reconnect - Fresh Sync HC --> CHECK_FRESH{is_fresh_sync?} CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] L1 --> RCA[recompute_actions_for_relay] end - + RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] SUB --> PB[Create PendingBatch] PB --> EOSE[Wait for EOSE] @@ -162,6 +160,7 @@ flowchart TB ``` **Key points:** + - No `since` filter on initial connect - get full history - `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` - Layer 1: `build_announcement_filter(None)` - subscribed immediately without since @@ -176,7 +175,7 @@ flowchart TB CLEAR_PEND --> WAIT[Wait for reconnection] WAIT --> RECONN[Connection restored] RECONN --> HC[handle_connect_or_reconnect] - + subgraph handle_connect_or_reconnect - Quick Reconnect HC --> CHECK{is_fresh_sync?} CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] @@ -184,7 +183,7 @@ flowchart TB L1 --> L23[rebuild_layer2_and_layer3 - with since] L23 --> RCA[recompute_actions_for_relay] end - + RCA --> AF[AddFilters for new items only] AF --> SUB[Subscribe] SUB --> PB[Create PendingBatch] @@ -193,6 +192,7 @@ flowchart TB ``` **Key points:** + - PendingSyncIndex cleared on disconnect (not reconnect) - `handle_connect_or_reconnect`: 1. `build_announcement_filter(Some(since))` - Layer 1 with since @@ -205,14 +205,14 @@ flowchart TB ```mermaid flowchart TB RECONN[Connection restored] --> HC[handle_connect_or_reconnect] - + subgraph handle_connect_or_reconnect - Stale Reconnect HC --> CHECK{is_fresh_sync?} CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] CLEAR --> L1[build_announcement_filter - no since] L1 --> RCA[recompute_actions_for_relay] end - + RCA --> CA[compute_actions with empty confirmed] CA --> AF[AddFilters for everything] AF --> SUB[Subscribe - no since filter] @@ -222,6 +222,7 @@ flowchart TB ``` **Key points:** + - `should_clear_state()` returns true → triggers fresh sync - Same path as initial connect after clearing state - Layer 1: `build_announcement_filter(None)` - full history @@ -243,6 +244,7 @@ flowchart TB ``` **Key points:** + - Consolidation checked in `handle_add_filters` BEFORE adding new filters - After closing all subscriptions, re-subscribe: 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since @@ -268,6 +270,7 @@ flowchart TB ``` **Key points:** + - Daily timer is a full fresh sync, NOT consolidation - Clears both PendingSyncIndex and confirmed state - Layer 1: `build_announcement_filter(None)` - full history @@ -288,7 +291,7 @@ fn derive_relay_targets( repo_index: &HashMap ) -> HashMap { let mut targets: HashMap = HashMap::new(); - + for (repo_ref, needs) in repo_index { for relay_url in &needs.relays { let target = targets.entry(relay_url.clone()).or_default(); @@ -296,7 +299,7 @@ fn derive_relay_targets( target.root_events.extend(needs.root_events.iter().cloned()); } } - + targets } ``` @@ -316,7 +319,7 @@ fn compute_actions( confirmed: &HashMap, ) -> Vec { let mut actions = Vec::new(); - + for (relay_url, target) in targets { // Skip disconnected relays - they will get AddFilters on reconnect if let Some(state) = confirmed.get(relay_url) { @@ -324,7 +327,7 @@ fn compute_actions( continue; } } - + // Collect all pending items for this relay let pending_repos: HashSet<_> = pending.get(relay_url) .map(|batches| batches.iter() @@ -336,7 +339,7 @@ fn compute_actions( .flat_map(|b| b.items.root_events.iter().cloned()) .collect()) .unwrap_or_default(); - + // Collect confirmed items for this relay let confirmed_repos = confirmed.get(relay_url) .map(|c| &c.repos) @@ -344,7 +347,7 @@ fn compute_actions( let confirmed_events = confirmed.get(relay_url) .map(|c| &c.root_events) .unwrap_or(&HashSet::new()); - + // New = target - pending - confirmed let new_repos: HashSet<_> = target.repos.iter() .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) @@ -354,7 +357,7 @@ fn compute_actions( .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) .cloned() .collect(); - + if !new_repos.is_empty() || !new_events.is_empty() { let filters = build_filters(&new_repos, &new_events); actions.push(AddFilters { @@ -365,7 +368,7 @@ fn compute_actions( }); } } - + actions } ``` @@ -373,6 +376,7 @@ fn compute_actions( ### 3. Filter Building Functions (Three-Layer Strategy) The filter strategy uses three layers: + - **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation - **Layer 2**: Events tagging our repos - **Layer 3**: Events tagging our root events @@ -388,7 +392,7 @@ fn build_announcement_filter(since: Option) -> Filter { Kind::Custom(30617), // Repository announcements Kind::Custom(30618), // Maintainer lists ]); - + match since { Some(ts) => filter.since(ts), None => filter, @@ -404,10 +408,10 @@ fn tagged_one_of_our_repo_event_filters( ) -> Vec { let mut filters = Vec::new(); let repo_refs: Vec<_> = repos.iter().collect(); - + for chunk in repo_refs.chunks(100) { let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); - + // Lowercase 'a' tag - standard addressable reference let mut f1 = Filter::new() .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); @@ -417,18 +421,18 @@ fn tagged_one_of_our_repo_event_filters( // Quote 'q' tag - NIP-10 quote references to addressable events let mut f3 = Filter::new() .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); - + if let Some(ts) = since { f1 = f1.since(ts); f2 = f2.since(ts); f3 = f3.since(ts); } - + filters.push(f1); filters.push(f2); filters.push(f3); } - + filters } @@ -441,10 +445,10 @@ fn tagged_one_of_our_root_event_filters( ) -> Vec { let mut filters = Vec::new(); let event_ids: Vec = root_events.iter().map(|id| id.to_hex()).collect(); - + for chunk in event_ids.chunks(100) { let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); - + // Lowercase 'e' tag - standard event reference let mut f1 = Filter::new() .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); @@ -454,18 +458,18 @@ fn tagged_one_of_our_root_event_filters( // Quote 'q' tag - NIP-10 quote references to events let mut f3 = Filter::new() .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); - + if let Some(ts) = since { f1 = f1.since(ts); f2 = f2.since(ts); f3 = f3.since(ts); } - + filters.push(f1); filters.push(f2); filters.push(f3); } - + filters } @@ -493,7 +497,7 @@ fn build_layer2_and_layer3_filters( impl SyncManager { async fn handle_add_filters(&mut self, action: AddFilters) { let AddFilters { relay_url, repos, root_events, filters } = action; - + // Auto-spawn connection if needed let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); match state { @@ -522,11 +526,11 @@ impl SyncManager { // Already connected - proceed with subscription } } - + // Subscribe and collect subscription IDs let conn = self.connections.get(&relay_url).unwrap(); let mut sub_ids = HashSet::new(); - + for filter in filters { match conn.client.subscribe(filter, None).await { Ok(output) => { @@ -539,14 +543,14 @@ impl SyncManager { } } } - + // Create pending batch let batch = PendingBatch { batch_id: self.next_batch_id(), items: PendingItems { repos, root_events }, outstanding_subs: sub_ids, }; - + // Add to pending index self.pending_sync_index.write().await .entry(relay_url) @@ -563,19 +567,19 @@ impl SyncManager { /// Called when connection to a relay is lost async fn handle_disconnect(&mut self, relay_url: &str) { let mut index = self.relay_sync_index.write().await; - + if let Some(state) = index.get_mut(relay_url) { state.connection_status = ConnectionStatus::Disconnected; state.disconnected_at = Some(Timestamp::now()); state.connection = None; } - + // Clear pending batches - these items were not confirmed self.pending_sync_index.write().await.remove(relay_url); - + // Remove from active connections map self.connections.remove(relay_url); - + // Health tracker records failure for backoff self.health_tracker.record_failure(relay_url); } @@ -599,39 +603,39 @@ impl SyncManager { Some(s) => s, None => return, // Relay was removed while disconnected }; - + // Determine if this is a fresh sync or quick reconnect let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); let last_connected = state.last_connected; - + if is_fresh_sync && state.last_connected.is_some() { // Stale reconnect (>15min) - clear state tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); state.clear_sync_state(); } - + // Update connection state state.connection_status = ConnectionStatus::Connected; state.last_connected = Some(Timestamp::now()); state.disconnected_at = None; - + // Record success in health tracker self.health_tracker.record_success(relay_url); - + drop(index); // Release lock - + let conn = match self.connections.get(relay_url) { Some(c) => c, None => return, }; - + if is_fresh_sync { // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions - + // Step 1: Subscribe Layer 1 (announcements) without since let layer1 = build_announcement_filter(None); let _ = conn.client.subscribe(layer1, None).await; - + // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) self.recompute_actions_for_relay(relay_url).await; } else { @@ -639,19 +643,19 @@ impl SyncManager { let since = last_connected .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) .unwrap_or(Timestamp::from(0)); - + // Step 1: Subscribe Layer 1 (announcements) with since let layer1 = build_announcement_filter(Some(since)); let _ = conn.client.subscribe(layer1, None).await; - + // Step 2: Rebuild Layer 2+3 for confirmed items with since self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; - + // Step 3: Check for NEW items via compute_actions self.recompute_actions_for_relay(relay_url).await; } } - + /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). /// Used by: /// - Quick reconnect: rebuild confirmed items with since filter @@ -662,46 +666,46 @@ impl SyncManager { Some(s) => s, None => return, }; - + // Build Layer 2+3 filters WITH since let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); drop(confirmed); - + // Subscribe directly - no PendingBatch for catch-up (items already confirmed) let conn = match self.connections.get(relay_url) { Some(c) => c, None => return, }; - + for filter in filters { let _ = conn.client.subscribe(filter, None).await; } } - + /// Rerun compute_actions for a specific relay and process resulting AddFilters. /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. async fn recompute_actions_for_relay(&mut self, relay_url: &str) { let repo_index = self.repo_sync_index.read().await; let targets = derive_relay_targets(&repo_index); drop(repo_index); - + // Filter to just this relay let target = match targets.get(relay_url) { Some(t) => t.clone(), None => return, // No repos reference this relay anymore }; - + let pending = self.pending_sync_index.read().await; let confirmed = self.relay_sync_index.read().await; - + let mut single_relay_targets = HashMap::new(); single_relay_targets.insert(relay_url.to_string(), target); - + let actions = compute_actions(&single_relay_targets, &pending, &confirmed); - + drop(pending); drop(confirmed); - + // Process AddFilters for action in actions { self.handle_add_filters(action).await; @@ -719,30 +723,30 @@ impl SyncManager { // Random 23-25 hours let hours = 23.0 + rand::random::() * 2.0; tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; - + let relay_urls: Vec<_> = self.relay_sync_index.read().await .keys() .cloned() .collect(); - + for relay_url in relay_urls { self.daily_sync(&relay_url).await; } } } - + /// Perform daily fresh sync for a relay async fn daily_sync(&mut self, relay_url: &str) { tracing::info!("Daily sync triggered for {}", relay_url); - + // Close all subscriptions if let Some(conn) = self.connections.get(relay_url) { conn.client.unsubscribe_all().await; } - + // Clear PendingSyncIndex self.pending_sync_index.write().await.remove(relay_url); - + // Clear confirmed state - triggers fresh sync { let mut index = self.relay_sync_index.write().await; @@ -750,7 +754,7 @@ impl SyncManager { state.clear_sync_state(); } } - + // Recompute actions - will generate AddFilters for everything self.recompute_actions_for_relay(relay_url).await; } @@ -767,33 +771,33 @@ impl SyncManager { /// Called from handle_add_filters BEFORE adding new filters. async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { let current_count = self.get_filter_count(relay_url).await; - + if current_count + new_filter_count > 70 { self.consolidate(relay_url).await; } } - + /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. /// Does NOT clear state - just reduces filter count. async fn consolidate(&mut self, relay_url: &str) { tracing::info!("Consolidating filters for {} (count > 70)", relay_url); - + // Wait for all pending batches to complete first self.wait_pending_complete(relay_url).await; - + // Close Layer 2+3 subscriptions only - Layer 1 remains active // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately // For simplicity, we close all and re-add Layer 1 if let Some(conn) = self.connections.get(relay_url) { conn.client.unsubscribe_all().await; } - + // Re-subscribe Layer 1 with since (maintains announcements stream) let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); let conn = self.connections.get(relay_url).unwrap(); let layer1 = build_announcement_filter(Some(since)); let _ = conn.client.subscribe(layer1, None).await; - + // Rebuild Layer 2+3 only self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; } @@ -806,7 +810,7 @@ impl SyncManager { impl SyncManager { async fn handle_add_filters(&mut self, action: AddFilters) { let AddFilters { relay_url, repos, root_events, filters } = action; - + // Auto-spawn connection if needed (unchanged) let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); match state { @@ -834,14 +838,14 @@ impl SyncManager { // Already connected - proceed } } - + // CHECK CONSOLIDATION BEFORE ADDING self.maybe_consolidate(&relay_url, filters.len()).await; - + // Subscribe and collect subscription IDs let conn = self.connections.get(&relay_url).unwrap(); let mut sub_ids = HashSet::new(); - + for filter in filters { match conn.client.subscribe(filter, None).await { Ok(output) => { @@ -854,14 +858,14 @@ impl SyncManager { } } } - + // Create pending batch (unchanged) let batch = PendingBatch { batch_id: self.next_batch_id(), items: PendingItems { repos, root_events }, outstanding_subs: sub_ids, }; - + self.pending_sync_index.write().await .entry(relay_url) .or_default() @@ -881,25 +885,25 @@ impl SyncManager { let confirmed = self.relay_sync_index.read().await; let relays_to_disconnect: Vec<_> = confirmed.iter() .filter(|(_, state)| { - !state.is_bootstrap && - state.repos.is_empty() && + !state.is_bootstrap && + state.repos.is_empty() && state.root_events.is_empty() }) .map(|(url, _)| url.clone()) .collect(); drop(confirmed); - + for relay_url in relays_to_disconnect { self.disconnect_relay(&relay_url).await; } } - + async fn disconnect_relay(&mut self, relay_url: &str) { tracing::info!("Disconnecting relay {} (no repos)", relay_url); - + self.relay_sync_index.write().await.remove(relay_url); self.pending_sync_index.write().await.remove(relay_url); - + if let Some(conn) = self.connections.remove(relay_url) { let _ = conn.client.disconnect().await; } @@ -917,31 +921,31 @@ flowchart TB SS[SelfSubscriber] OWN[Own Relay] end - + subgraph RepoSyncIndex - What We Want RSI[HashMap: Repo to Relays+Events] end - + subgraph Derived Target DT[derive_relay_targets fn] TGT[Per-relay: repos + events we should sync] end - + subgraph compute_actions - Decision Point CA[Three-way diff: target - pending - confirmed] end - + subgraph PendingSyncIndex - In Flight PSI[Vec PendingBatch per relay] end - + subgraph RelaySyncIndex - Confirmed State RLI[RelayState per relay] CONN[connection_status] REPOS[repos + root_events] TIMES[last_connected + disconnected_at] end - + SS -->|subscribe| OWN OWN -->|events| SS SS -->|batch fires| RSI @@ -959,10 +963,10 @@ flowchart TB AF -->|spawn if needed| CONN SUB --> PSI PSI -->|EOSE| REPOS - + CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] DISC -->|any reconnect| HC[handle_connect_or_reconnect] - + subgraph handle_connect_or_reconnect HC --> FRESH_CHECK{is_fresh_sync?} FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] @@ -977,25 +981,25 @@ flowchart TB ## Key Design Decisions -| Decision | Choice | Rationale | -|----------|--------|-----------| -| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | -| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | -| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | -| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | -| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | -| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | -| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | -| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | -| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | -| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | -| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | -| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | -| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | -| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | -| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | -| Connection spawning | Via AddFilters handler | Single path for new relay discovery | -| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | +| Decision | Choice | Rationale | +| -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | +| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | +| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | +| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | +| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | +| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | +| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | +| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | +| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | +| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | +| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | +| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | +| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | +| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | +| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | +| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | +| Connection spawning | Via AddFilters handler | Single path for new relay discovery | +| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | --- @@ -1017,16 +1021,16 @@ src/sync/ ## Comparison: v3 vs v4 -| Aspect | v3 | v4 | -|--------|----|----| -| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | -| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | -| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | -| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | -| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | -| Since filter application | Applied in handle_reconnect | build_all_filters with optional since | -| PSI clearing | On disconnect | On disconnect (confirmed) | -| Daily timer | Consolidation-style | Fresh sync (different from consolidation) | +| Aspect | v3 | v4 | +| ------------------------ | ----------------------------------------- | --------------------------------------------- | +| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | +| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | +| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | +| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | +| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | +| Since filter application | Applied in handle_reconnect | build_all_filters with optional since | +| PSI clearing | On disconnect | On disconnect (confirmed) | +| Daily timer | Consolidation-style | Fresh sync (different from consolidation) | --- @@ -1065,28 +1069,28 @@ impl SelfSubscriber { tokio::time::sleep(Duration::from_secs(5)).await; continue; } - + // Run event loop until disconnection self.event_loop().await; - + // Loop will retry connection } } - + async fn connect_and_subscribe(&mut self) -> Result<(), Error> { let client = Client::new(Keys::generate()); client.add_relay(&self.own_relay_url).await?; client.connect().await; - + // Build filter - add since only on reconnect let filter = Filter::new().kinds([ Kind::Custom(30617), // Repository announcements Kind::GitPatch, // 1617 - Kind::Custom(1618), // PRs + Kind::Custom(1618), // PRs Kind::Custom(1619), // PR updates Kind::GitIssue, // 1621 ]); - + let filter = if let Some(ts) = self.last_connected { // Reconnection: use since filter let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer @@ -1095,10 +1099,10 @@ impl SelfSubscriber { // Initial connect: no since filter - get full history filter }; - + // Update last_connected AFTER computing since self.last_connected = Some(Timestamp::now()); - + client.subscribe(filter, None).await?; self.client = Some(client); Ok(()) @@ -1115,18 +1119,18 @@ impl SelfSubscriber { let mut pending_events: Vec = Vec::new(); let mut batch_timer: Option = None; let batch_window = Duration::from_secs(5); - + loop { let timeout = batch_timer .map(|t| batch_window.saturating_sub(t.elapsed())) .unwrap_or(Duration::from_secs(60)); - + tokio::select! { notification = client.notifications().recv() => { match notification { Ok(RelayPoolNotification::Event { event, .. }) => { pending_events.push(*event); - + // Start timer on first event - does NOT reset if batch_timer.is_none() { batch_timer = Some(Instant::now()); @@ -1147,7 +1151,7 @@ impl SelfSubscriber { } } } - + async fn process_batch(&self, events: Vec) { // 1. Update RepoSyncIndex for event in events { @@ -1157,26 +1161,26 @@ impl SelfSubscriber { _ => {} } } - + // 2. Derive targets and compute actions let repo_index = self.repo_sync_index.read().await; let targets = derive_relay_targets(&repo_index); - + let pending = self.pending_sync_index.read().await; let confirmed = self.relay_sync_index.read().await; - + let actions = compute_actions(&targets, &pending, &confirmed); - + drop(repo_index); drop(pending); drop(confirmed); - + // 3. Send actions to SyncManager for action in actions { let _ = self.action_tx.send(action).await; } } - + async fn handle_announcement(&self, event: &Event) { // Extract repo_ref from event - 30617:pubkey:identifier let d_tag = event.tags.iter() @@ -1188,9 +1192,9 @@ impl SelfSubscriber { } }) .unwrap_or_default(); - + let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); - + // Extract relay URLs from 'r' tags let relays: HashSet = event.tags.iter() .filter_map(|tag| { @@ -1201,13 +1205,13 @@ impl SelfSubscriber { } }) .collect(); - + // Update RepoSyncIndex let mut index = self.repo_sync_index.write().await; let needs = index.entry(repo_ref).or_default(); needs.relays = relays; } - + async fn handle_root_event(&self, event: &Event) { // Extract repo_ref from 'a' tag let repo_ref = event.tags.iter() @@ -1218,7 +1222,7 @@ impl SelfSubscriber { None } }); - + if let Some(repo_ref) = repo_ref { let mut index = self.repo_sync_index.write().await; let needs = index.entry(repo_ref).or_default(); @@ -1246,6 +1250,7 @@ let sync_manager = Arc::new(Mutex::new(self)); This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. **Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: + - `DashMap` for thread-safe concurrent access without external locking - Three states: `Healthy`, `Degraded`, `Dead` - Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff @@ -1254,14 +1259,14 @@ This allows background tasks (daily timer, disconnect checker) to acquire the lo ### Implementation Constants -| Constant | Value | Purpose | -|----------|-------|---------| -| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | +| Constant | Value | Purpose | +| --------------------------------- | ---------- | ------------------------------------------------ | +| `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | | `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | -| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | -| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | -| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | -| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | +| `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | +| `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | +| `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | +| `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | ### Daily Timer Randomization @@ -1308,6 +1313,7 @@ src/sync/ ``` Key differences from spec: + - No separate `state.rs` - types are defined in `mod.rs` - No separate `actions.rs` - moved to `algorithms.rs` - No separate `consolidation.rs` - consolidation logic in `mod.rs` @@ -1321,4 +1327,4 @@ Key differences from spec: 3. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. -4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. \ No newline at end of file +4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. -- cgit v1.2.3