From 7821b107190cc116a30a4c339f935bc16a1d5197 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 16 Dec 2025 15:26:55 +0000 Subject: proactive sync prep - some helper functions written but not enabled --- src/sync/algorithms.rs | 33 ++- src/sync/mod.rs | 700 +++++++++++++++++++++++++++++++++++++++++--- src/sync/self_subscriber.rs | 8 +- 3 files changed, 688 insertions(+), 53 deletions(-) (limited to 'src/sync') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 5b5b520..84248b1 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -11,7 +11,9 @@ use std::collections::{HashMap, HashSet}; use nostr_sdk::prelude::*; -use super::{ConnectionStatus, PendingBatch, RelayState, SyncMethod}; +use crate::sync::PendingItems; + +use super::{ConnectionStatus, PendingBatch, RelayState}; // ============================================================================= // Data Structures @@ -36,10 +38,8 @@ pub struct RelaySyncNeeds { pub struct AddFilters { /// The relay URL to add filters to pub relay_url: String, - /// Repos being synced in this action - pub repos: HashSet, - /// Root events being tracked in this action - pub root_events: HashSet, + /// pending items - repos and root events + pub items: PendingItems, /// The actual filters to subscribe with pub filters: Vec, } @@ -161,8 +161,10 @@ pub fn compute_actions( actions.push(AddFilters { relay_url: relay_url.clone(), - repos: new_repos, - root_events: new_events, + items: PendingItems { + repos: new_repos, + root_events: new_events, + }, filters, }); } @@ -175,6 +177,7 @@ pub fn compute_actions( mod tests { use super::*; use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; + use crate::sync::SyncMethod; // ========================================================================= // derive_relay_targets tests @@ -371,7 +374,7 @@ mod tests { assert_eq!(actions.len(), 1); let action = &actions[0]; assert_eq!(action.relay_url, "wss://relay1.com"); - assert!(action.repos.contains("repo1")); + assert!(action.items.repos.contains("repo1")); assert!(!action.filters.is_empty()); } @@ -528,10 +531,10 @@ mod tests { assert_eq!(actions.len(), 1); let action = &actions[0]; // Only repo3 should be in the action (repo1 pending, repo2 confirmed) - assert_eq!(action.repos.len(), 1); - assert!(action.repos.contains("repo3")); - assert!(!action.repos.contains("repo1")); - assert!(!action.repos.contains("repo2")); + assert_eq!(action.items.repos.len(), 1); + assert!(action.items.repos.contains("repo3")); + assert!(!action.items.repos.contains("repo1")); + assert!(!action.items.repos.contains("repo2")); } #[test] @@ -554,9 +557,9 @@ mod tests { assert_eq!(actions.len(), 1); let action = &actions[0]; - assert!(action.repos.is_empty()); - assert_eq!(action.root_events.len(), 1); - assert!(action.root_events.contains(&event_id)); + assert!(action.items.repos.is_empty()); + assert_eq!(action.items.root_events.len(), 1); + assert!(action.items.root_events.contains(&event_id)); // Should have 3 filters for the root event (e, E, q tags) assert_eq!(action.filters.len(), 3); } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 41586a4..401cf21 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -591,7 +591,7 @@ impl SyncManager { } // Recompute actions for Layer 2+3 based on synced events - self.recompute_actions_for_relay(relay_url).await; + self.recompute_new_sync_filters_for_relay(relay_url).await; } else { // NIP-77 not supported - fall back to REQ+EOSE tracing::info!( @@ -612,7 +612,7 @@ impl SyncManager { } // Recompute actions for Layer 2+3 - will discover all repos/events again - self.recompute_actions_for_relay(relay_url).await; + self.recompute_new_sync_filters_for_relay(relay_url).await; } if let Some(ref metrics) = self.metrics { @@ -709,7 +709,7 @@ impl SyncManager { Some(add_filters) => { // Process AddFilters action directly let mut manager = sync_manager.lock().await; - manager.handle_add_filters(add_filters).await; + manager.handle_new_sync_filters(add_filters).await; } None => break, } @@ -763,13 +763,13 @@ impl SyncManager { /// - For new relays: creates entry with Connecting status, spawns connection /// - For existing connected relays: subscribes to filters, creates PendingBatch /// - For disconnected/connecting relays: returns (will be handled on connection) - async fn handle_add_filters(&mut self, action: AddFilters) { + async fn handle_new_sync_filters(&mut self, action: AddFilters) { tracing::info!( relay = %action.relay_url, - repo_count = action.repos.len(), - root_event_count = action.root_events.len(), + repo_count = action.items.repos.len(), + root_event_count = action.items.root_events.len(), filter_count = action.filters.len(), - "[DIAG] handle_add_filters called" + "[DIAG] handle_new_sync_filters called" ); // Step 1: Check if relay exists in relay_sync_index @@ -801,7 +801,7 @@ impl SyncManager { tracing::info!( relay = %action.relay_url, - repos = action.repos.len(), + repos = action.items.repos.len(), "Spawning connection for new relay" ); @@ -827,7 +827,7 @@ impl SyncManager { // Step 2: Check if consolidation is needed BEFORE adding new filters self.maybe_consolidate(&action.relay_url, action.filters.len()) .await; - + /// DELETE this bit // Step 3: Get connection and subscribe to all filters let connection = match self.connections.get(&action.relay_url) { Some(conn) => conn, @@ -870,8 +870,8 @@ impl SyncManager { let batch = PendingBatch { batch_id, items: PendingItems { - repos: action.repos.clone(), - root_events: action.root_events.clone(), + repos: action.items.repos.clone(), + root_events: action.items.root_events.clone(), }, outstanding_subs: subscription_ids.into_iter().collect(), sync_method: SyncMethod::ReqEose, @@ -889,33 +889,84 @@ impl SyncManager { tracing::debug!( relay = %action.relay_url, batch_id = batch_id, - repos = action.repos.len(), - root_events = action.root_events.len(), + repos = action.items.repos.len(), + root_events = action.items.root_events.len(), filters = action.filters.len(), "Created pending batch for filter subscriptions" ); + // REPLACE WITH THIS: + // // Subscribe to each filter and collect subscription IDs + // self.sync_live(&action.relay_url, &action.filters).await; + // // TODO need to do actions.repos + // self.historic_sync(&action.relay_url, action.filters, action.items, None) + // .await; } /// Handle a connection success (called when a relay connects or reconnects) /// - /// This method implements smart reconnection logic: - /// - Fresh sync if never connected or >15 min since last connection - /// - Quick reconnect with since filter if <15 min since last connection - /// - /// For fresh sync (with NIP-77 negentropy if supported): - /// - Clears any stale state - /// - Uses negentropy sync for Layer 1 (if NIP-77 supported) - /// - Falls back to REQ+EOSE if NIP-77 not supported - /// - Recomputes actions for new items - /// - /// For quick reconnect: - /// - Preserves existing state - /// - Subscribes to Layer 1 with since filter - /// - Rebuilds Layer 2 and Layer 3 with since filter - /// - Recomputes actions for new items + /// This method dispatches to the appropriate reconnection strategy: + /// - `fresh_start()` if never connected before + /// - `quick_reconnect()` if disconnected < 15 minutes + /// - `long_reconnect()` if disconnected > 15 minutes async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { let now = Timestamp::now(); + // // Get the relay state to determine reconnect type + // let (last_connected, disconnected_at) = { + // let index = self.relay_sync_index.read().await; + // if let Some(state) = index.get(relay_url) { + // (state.last_connected, state.disconnected_at) + // } else { + // (None, None) // No state found + // } + // }; + + // // Determine which reconnection strategy to use + // match (last_connected, disconnected_at) { + // (None, _) => { + // // Never connected before - fresh start + // tracing::info!( + // relay = %relay_url, + // "First connection - initiating fresh_start" + // ); + // self.fresh_start(relay_url).await; + // } + // (Some(last), Some(disconnected)) => { + // // Was connected before, check how long disconnected + // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs()); + + // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS { + // // Disconnected < 15 minutes - quick reconnect + // // Use last_connected minus buffer as since timestamp + // let since = + // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); + // tracing::info!( + // relay = %relay_url, + // disconnect_secs = disconnect_duration, + // since = %since, + // "Short disconnection - initiating quick_reconnect" + // ); + // self.quick_reconnect(relay_url, since).await; + // } else { + // // Disconnected > 15 minutes - long reconnect + // tracing::info!( + // relay = %relay_url, + // disconnect_secs = disconnect_duration, + // "Long disconnection - initiating long_reconnect" + // ); + // self.long_reconnect(relay_url).await; + // } + // } + // (Some(_last), None) => { + // // Was connected but no disconnected_at - shouldn't happen normally + // // Treat as long reconnect to be safe + // tracing::warn!( + // relay = %relay_url, + // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect" + // ); + // self.long_reconnect(relay_url).await; + // } + // } // Get the relay state to determine reconnect type let (is_fresh_sync, last_connected, is_bootstrap) = { let index = self.relay_sync_index.read().await; @@ -998,7 +1049,7 @@ impl SyncManager { // After negentropy sync, recompute Layer 2+3 actions // Layer 1 events are now in sync, so we can proceed with Layer 2+3 - self.recompute_actions_for_relay(relay_url).await; + self.recompute_new_sync_filters_for_relay(relay_url).await; // Set up live subscription for new events (since=now) let live_filter = filters::build_announcement_filter(Some(now)); @@ -1021,7 +1072,7 @@ impl SyncManager { // during connect_and_subscribe() in handle_add_filters(). That call subscribes // to kinds 30617+30618 for the full history. Here we only need to recompute // Layer 2+3 actions based on the repos we're tracking. - self.recompute_actions_for_relay(relay_url).await; + self.recompute_new_sync_filters_for_relay(relay_url).await; } } else { // Quick reconnect: use since filter (no negentropy needed) @@ -1055,7 +1106,7 @@ impl SyncManager { .await; // Recompute actions for any new items discovered while disconnected - self.recompute_actions_for_relay(relay_url).await; + self.recompute_new_sync_filters_for_relay(relay_url).await; if let Some(ref metrics) = self.metrics { metrics.record_event(event_source::RECONNECT); @@ -1063,6 +1114,225 @@ impl SyncManager { } } + /// Fresh start - clears state and does full sync + /// + /// Called by: initial connect, long_reconnect, daily_sync + /// + /// Flow: + /// 1. Clear PendingSyncIndex for this relay + /// 2. Clear RelaySyncIndex sync state (repos/root_events) + /// 3. Update connection state to Connected + /// 4. L1 live + L1 historic (negentropy if available) + /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3 + async fn fresh_start(&mut self, relay_url: &str) { + let now = Timestamp::now(); + + tracing::info!(relay = %relay_url, "Starting fresh_start"); + + // Step 1: Clear PendingSyncIndex for this relay + { + let mut pending = self.pending_sync_index.write().await; + if pending.remove(relay_url).is_some() { + tracing::debug!( + relay = %relay_url, + "Cleared pending batches in fresh_start" + ); + } + } + + // Step 2: Clear RelaySyncIndex sync state (but preserve connection metadata) + { + let mut index = self.relay_sync_index.write().await; + if let Some(state) = index.get_mut(relay_url) { + let repos_cleared = state.repos.len(); + let events_cleared = state.root_events.len(); + state.clear_sync_state(); + if repos_cleared > 0 || events_cleared > 0 { + tracing::debug!( + relay = %relay_url, + repos_cleared = repos_cleared, + events_cleared = events_cleared, + "Cleared sync state in fresh_start" + ); + } + } + } + + // Step 3: Update connection state + { + let mut index = self.relay_sync_index.write().await; + let state = index.entry(relay_url.to_string()).or_default(); + state.connection_status = ConnectionStatus::Connected; + state.last_connected = Some(now); + state.disconnected_at = None; + } + + // Record success in health tracker + self.health_tracker.record_success(relay_url); + + // Update metrics + if let Some(ref metrics) = self.metrics { + metrics.set_relay_connected(relay_url, true); + metrics.inc_connected_count(); + metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + } + + // Step 4: L1 sync - check negentropy support + let use_negentropy = if self.config.sync_disable_negentropy { + tracing::debug!(relay = %relay_url, "Negentropy disabled via config"); + false + } else if let Some(connection) = self.connections.get(relay_url) { + connection.supports_negentropy().await + } else { + false + }; + + if use_negentropy { + // NIP-77 supported - use negentropy for L1 historical sync + tracing::info!( + relay = %relay_url, + "Using NIP-77 negentropy for L1 historical sync" + ); + + // L1 historic sync (no since - full sync) + let layer1_filter = filters::build_announcement_filter(None); + self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)") + .await; + + // L1 live subscription (since=now for ongoing events) + let live_filter = filters::build_announcement_filter(Some(now)); + if let Some(connection) = self.connections.get(relay_url) { + if let Err(e) = connection.subscribe_filter(live_filter).await { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to set up L1 live subscription in fresh_start" + ); + } + } + } else { + // NIP-77 not supported - REQ+EOSE + // Note: Layer 1 subscription (without since) was already established + // during connect_and_subscribe() in spawn_relay_connection + tracing::info!( + relay = %relay_url, + "Using REQ+EOSE for L1 sync (negentropy not available)" + ); + } + + // Step 5: compute_actions → AddFilters for L2+L3 + // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters + // for ALL repos that should be synced from this relay + self.recompute_new_sync_filters_for_relay(relay_url).await; + + tracing::info!(relay = %relay_url, "fresh_start complete"); + } + + /// Quick reconnect - for disconnections < 15 minutes + /// + /// Flow: + /// 1. Clear PendingSyncIndex for this relay + /// 2. Update connection state to Connected + /// 3. L1 live + L1 historic(since) + /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since) + /// 5. compute_actions for any new items discovered during catchup + async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { + let now = Timestamp::now(); + + tracing::info!( + relay = %relay_url, + since = %since, + "Starting quick_reconnect" + ); + + // Step 1: Clear PendingSyncIndex for this relay + // Old subscriptions are dead after disconnect + { + let mut pending = self.pending_sync_index.write().await; + if pending.remove(relay_url).is_some() { + tracing::debug!( + relay = %relay_url, + "Cleared pending batches in quick_reconnect" + ); + } + } + + // Step 2: Update connection state (preserve repos/root_events - that's the point!) + { + let mut index = self.relay_sync_index.write().await; + let state = index.entry(relay_url.to_string()).or_default(); + state.connection_status = ConnectionStatus::Connected; + state.last_connected = Some(now); + state.disconnected_at = None; + } + + // Record success in health tracker + self.health_tracker.record_success(relay_url); + + // Update metrics + if let Some(ref metrics) = self.metrics { + metrics.set_relay_connected(relay_url, true); + metrics.inc_connected_count(); + metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + metrics.record_event(event_source::RECONNECT); + } + + // Step 3: L1 live + L1 historic with since filter + // L1 live subscription (since=now for ongoing events) + let live_filter = filters::build_announcement_filter(Some(now)); + if let Some(connection) = self.connections.get(relay_url) { + if let Err(e) = connection.subscribe_filter(live_filter).await { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to set up L1 live subscription in quick_reconnect" + ); + } + } + + // L1 historic with since filter (catch up on missed announcements) + let layer1_filter = filters::build_announcement_filter(Some(since)); + if let Some(connection) = self.connections.get(relay_url) { + if let Err(e) = connection.subscribe_filter(layer1_filter).await { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to subscribe to L1 historic filter in quick_reconnect" + ); + } + } + + // Step 4: Rebuild L2+L3 from confirmed state with since filter + // This uses the preserved repos/root_events from RelaySyncIndex + self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; + + // Step 5: compute_actions for any NEW items discovered while disconnected + self.recompute_new_sync_filters_for_relay(relay_url).await; + + tracing::info!(relay = %relay_url, "quick_reconnect complete"); + } + + /// Long reconnect - for disconnections > 15 minutes + /// + /// Flow: + /// 1. Record disconnect/reconnect metric + /// 2. Delegate to fresh_start() + async fn long_reconnect(&mut self, relay_url: &str) { + tracing::info!(relay = %relay_url, "Starting long_reconnect"); + + // Step 1: Record disconnect/reconnect metric + // This distinguishes intentional daily refresh from failure recovery + if let Some(ref metrics) = self.metrics { + metrics.record_event(event_source::RECONNECT); + } + + // Step 2: Delegate to fresh_start + // State is too stale to trust, start fresh + self.fresh_start(relay_url).await; + + tracing::info!(relay = %relay_url, "long_reconnect complete"); + } + /// Rebuild Layer 2 and Layer 3 subscriptions for a relay /// /// Uses the confirmed repos and root_events from RelayState to build filters. @@ -1129,7 +1399,7 @@ impl SyncManager { /// /// Uses derive_relay_targets and compute_actions to find new items /// that need to be synced. Processes AddFilters actions for new items. - async fn recompute_actions_for_relay(&mut self, relay_url: &str) { + async fn recompute_new_sync_filters_for_relay(&mut self, relay_url: &str) { use crate::sync::algorithms::{compute_actions, derive_relay_targets}; // Get current state from indexes (need to collect to avoid holding locks) @@ -1173,12 +1443,12 @@ impl SyncManager { for action in actions { tracing::info!( relay = %action.relay_url, - new_repos = action.repos.len(), - new_root_events = action.root_events.len(), + new_repos = action.items.repos.len(), + new_root_events = action.items.root_events.len(), filters = action.filters.len(), "Processing AddFilters for new items" ); - self.handle_add_filters(action).await; + self.handle_new_sync_filters(action).await; } } @@ -2095,6 +2365,366 @@ impl SyncManager { } } + // ========================================================================= + // Sync Primitives (Phase 1 of GRASP-02 refactoring) + // These methods are new primitives that will be used in subsequent phases. + // ========================================================================= + + /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex + #[allow(dead_code)] // Will be used in Phase 2+ + /// + /// This method subscribes to filters with `limit: 0` for receiving ongoing events. + /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have + /// a definite "completion" - they stay open indefinitely. + /// + /// Used for: + /// - Layer 1 live subscription (new announcements after initial sync) + /// - Layer 2+3 live subscriptions (new events after initial sync) + /// + /// # Arguments + /// * `relay_url` - The relay URL to subscribe on + /// * `filters` - Filters to subscribe to (will have `limit: 0` applied) + /// + /// # Returns + /// Vec of subscription IDs for the live subscriptions, or empty if connection not found + async fn sync_live(&self, relay_url: &str, filters: &[Filter]) -> Vec { + if filters.is_empty() { + return vec![]; + } + + let connection = match self.connections.get(relay_url) { + Some(conn) => conn, + None => { + tracing::warn!( + relay = %relay_url, + "No connection found for sync_live" + ); + return vec![]; + } + }; + + let mut sub_ids = Vec::new(); + + for filter in filters { + // Apply limit: 0 to make this a live subscription + // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", + // but omitting limit means "no limit" which is what we want for live. + // The filter passed in should already NOT have a limit set. + match connection.subscribe_filter(filter.clone()).await { + Ok(sub_id) => { + tracing::trace!( + relay = %relay_url, + sub_id = %sub_id, + "Live subscription created" + ); + sub_ids.push(sub_id); + } + Err(e) => { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to create live subscription" + ); + } + } + } + + tracing::debug!( + relay = %relay_url, + filter_count = filters.len(), + sub_count = sub_ids.len(), + "sync_live completed" + ); + + sub_ids + } + + /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY) + /// + /// Returns raw Vec for L1+L2+L3. + /// Used by: quick_reconnect, consolidate + /// Does NOT include pending items - those flow through AddFilters path. + /// + /// # Arguments + /// * `relay_url` - The relay URL to reconstruct filters for + /// + /// # Returns + /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags) + #[allow(dead_code)] // Will be used in Phase 3+ + async fn reconstruct_filters(&self, relay_url: &str) -> Vec { + // Get confirmed state from relay_sync_index + let (repos, root_events) = { + let index = self.relay_sync_index.read().await; + match index.get(relay_url) { + Some(state) => (state.repos.clone(), state.root_events.clone()), + None => { + tracing::warn!( + relay = %relay_url, + "No RelayState found for reconstruct_filters" + ); + return vec![]; + } + } + }; + + let mut all_filters = Vec::new(); + + // Layer 1: Announcements (always included) + // Note: No `since` filter - this returns raw filters for live subscriptions + all_filters.push(filters::build_announcement_filter(None)); + + // Layer 2 + Layer 3: Repo and root event tag filters + if !repos.is_empty() || !root_events.is_empty() { + let l2_l3_filters = + filters::build_layer2_and_layer3_filters(&repos, &root_events, None); + all_filters.extend(l2_l3_filters); + } + + tracing::debug!( + relay = %relay_url, + total_filters = all_filters.len(), + repos_count = repos.len(), + root_events_count = root_events.len(), + "Reconstructed filters from confirmed state" + ); + + all_filters + } + + /// Sync historical events and track in PendingSyncIndex + #[allow(dead_code)] // Will be used in Phase 3+ + /// + /// This method handles historical synchronization for a set of filters, + /// creating a PendingBatch to track completion. It dispatches to either + /// negentropy sync or traditional REQ+EOSE based on relay capability and config. + /// + /// Used for: + /// - Initial sync (no since filter) + /// - Reconnect sync (with since filter) + /// - Daily sync (no since filter, full re-sync) + /// + /// # Arguments + /// * `relay_url` - The relay URL to sync from + /// * `filters` - Filters to sync (will have `since` applied if provided) + /// * `items` - Items being synced (for tracking in PendingBatch) + /// * `since` - Optional timestamp for incremental sync + /// + /// # Returns + /// * `Some(batch_id)` - Batch was created and sync initiated + /// * `None` - No connection or sync failed to start + async fn historic_sync( + &mut self, + relay_url: &str, + filters: Vec, + items: PendingItems, + since: Option, + ) -> Option { + if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() { + tracing::debug!( + relay = %relay_url, + "historic_sync called with empty filters and items, skipping" + ); + return None; + } + + // Check connection exists + let connection = match self.connections.get(relay_url) { + Some(conn) => conn, + None => { + tracing::warn!( + relay = %relay_url, + "No connection found for historic_sync" + ); + return None; + } + }; + + // Apply since filter if provided + let filters_with_since: Vec = if let Some(ts) = since { + filters.into_iter().map(|f| f.since(ts)).collect() + } else { + filters + }; + + // Check if we should use negentropy + let use_negentropy = + !self.config.sync_disable_negentropy && connection.supports_negentropy().await; + + // Generate batch ID + let batch_id = self.next_batch_id(); + + if use_negentropy && !filters_with_since.is_empty() { + // NIP-77 negentropy path + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + filter_count = filters_with_since.len(), + repos = items.repos.len(), + root_events = items.root_events.len(), + "Starting historic_sync with negentropy" + ); + + // Create PendingBatch for negentropy (empty outstanding_subs) + let batch = PendingBatch { + batch_id, + items: items.clone(), + outstanding_subs: HashSet::new(), + sync_method: SyncMethod::Negentropy, + }; + + // Add to pending_sync_index + { + let mut pending = self.pending_sync_index.write().await; + pending + .entry(relay_url.to_string()) + .or_insert_with(Vec::new) + .push(batch); + } + + // Perform negentropy sync for each filter + // Note: We sync each filter separately because negentropy works on a single filter + let mut total_received = 0; + let mut any_success = false; + + for filter in &filters_with_since { + if let Some(conn) = self.connections.get(relay_url) { + match conn.negentropy_sync_filter(filter.clone()).await { + Ok(result) => { + total_received += result.received.len(); + any_success = true; + + // Record metrics for received events + if let Some(ref metrics) = self.metrics { + for _ in 0..result.received.len() { + metrics.record_event(event_source::STARTUP); + } + } + } + Err(e) => { + tracing::warn!( + relay = %relay_url, + error = %e, + "Negentropy sync failed for filter in historic_sync" + ); + } + } + } + } + + if any_success { + // Remove batch from pending and confirm it + let completed_batch = { + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(relay_url) { + let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); + if let Some(idx) = batch_idx { + let batch = batches.remove(idx); + if batches.is_empty() { + pending.remove(relay_url); + } + Some(batch) + } else { + None + } + } else { + None + } + }; + + if let Some(batch) = completed_batch { + self.confirm_batch(relay_url, batch).await; + } + + tracing::info!( + relay = %relay_url, + batch_id = batch_id, + total_received = total_received, + "historic_sync (negentropy) completed" + ); + } else { + // All negentropy syncs failed - remove the pending batch + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(relay_url) { + batches.retain(|b| b.batch_id != batch_id); + if batches.is_empty() { + pending.remove(relay_url); + } + } + + tracing::warn!( + relay = %relay_url, + batch_id = batch_id, + "historic_sync (negentropy) failed for all filters" + ); + return None; + } + } else { + // Traditional REQ+EOSE path + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + filter_count = filters_with_since.len(), + repos = items.repos.len(), + root_events = items.root_events.len(), + use_negentropy = use_negentropy, + "Starting historic_sync with REQ+EOSE" + ); + + // Subscribe to each filter and collect subscription IDs + let mut subscription_ids = HashSet::new(); + + for filter in &filters_with_since { + if let Some(conn) = self.connections.get(relay_url) { + match conn.subscribe_filter(filter.clone()).await { + Ok(sub_id) => { + subscription_ids.insert(sub_id); + } + Err(e) => { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to subscribe to filter in historic_sync" + ); + } + } + } + } + + if subscription_ids.is_empty() && !filters_with_since.is_empty() { + tracing::warn!( + relay = %relay_url, + "All filter subscriptions failed in historic_sync" + ); + return None; + } + + // Create PendingBatch for REQ+EOSE + let batch = PendingBatch { + batch_id, + items, + outstanding_subs: subscription_ids, + sync_method: SyncMethod::ReqEose, + }; + + // Add to pending_sync_index + { + let mut pending = self.pending_sync_index.write().await; + pending + .entry(relay_url.to_string()) + .or_insert_with(Vec::new) + .push(batch); + } + + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + "historic_sync (REQ+EOSE) batch created, awaiting EOSE" + ); + } + + Some(batch_id) + } + /// Gracefully shutdown the SyncManager /// /// This method: diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 0379fe4..9643fc0 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -499,7 +499,7 @@ impl SelfSubscriber { drop(index); // Release lock before async operations // For each relay, send AddFilters action directly - // SyncManager's handle_add_filters auto-spawns connection for unknown relays + // SyncManager's handle_new_sync_filters auto-spawns connection for unknown relays for (relay_url, needs) in targets { // Skip our own relay URL (we're subscribed to ourselves via self-subscription) if relay_url.contains(&self.relay_domain) { @@ -519,8 +519,10 @@ impl SelfSubscriber { let action = AddFilters { relay_url: relay_url.clone(), - repos: needs.repos, - root_events: needs.root_events, + items: crate::sync::PendingItems { + repos: needs.repos, + root_events: needs.root_events, + }, filters, }; -- cgit v1.2.3