From 03f074d0d0840b946a356badde75551d61c0f84c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 18 Dec 2025 10:12:11 +0000 Subject: sync removing dead code --- src/sync/filters.rs | 21 +- src/sync/mod.rs | 812 ++++++++++--------------------------------- src/sync/relay_connection.rs | 87 +++-- 3 files changed, 237 insertions(+), 683 deletions(-) (limited to 'src/sync') diff --git a/src/sync/filters.rs b/src/sync/filters.rs index e508eb2..963fa02 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs @@ -103,10 +103,18 @@ pub fn tagged_one_of_our_root_event_filters( return vec![]; } + // DEBUG TRACING: Log the root events we're creating Layer 3 filters for + tracing::debug!( + root_event_count = root_events.len(), + root_event_ids = ?root_events.iter().map(|id| id.to_hex()).collect::>(), + since = ?since, + "Building Layer 3 filters for root events" + ); + let mut filters = Vec::new(); let event_ids: Vec = root_events.iter().map(|id| id.to_hex()).collect(); - for chunk in event_ids.chunks(100) { + for (chunk_idx, chunk) in event_ids.chunks(100).enumerate() { // Lowercase 'e' tag - standard event reference let mut f1 = Filter::new(); for event_id in chunk { @@ -131,6 +139,17 @@ pub fn tagged_one_of_our_root_event_filters( f3 = f3.since(ts); } + // DEBUG TRACING: Log the filters being created + tracing::debug!( + chunk_idx = chunk_idx, + chunk_size = chunk.len(), + event_ids_in_chunk = ?chunk, + filter_e = ?f1, + filter_E = ?f2, + filter_q = ?f3, + "Created Layer 3 filter chunk" + ); + filters.push(f1); filters.push(f2); filters.push(f3); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 401cf21..3c50387 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -490,16 +490,19 @@ impl SyncManager { // Move repos to confirmed state.repos.extend(batch.items.repos); // Move root_events to confirmed - state.root_events.extend(batch.items.root_events); + state.root_events.extend(batch.items.root_events.clone()); + // DEBUG TRACING: Log the root events being confirmed tracing::info!( relay = %relay_url, batch_id = batch_id, sync_method = ?sync_method, repos_confirmed = repos_count, root_events_confirmed = events_count, + root_events_ids = ?batch.items.root_events.iter().map(|id| id.to_hex()).collect::>(), total_repos = state.repos.len(), total_root_events = state.root_events.len(), + all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::>(), "Batch confirmed - items moved from pending to confirmed" ); } else { @@ -535,10 +538,6 @@ impl SyncManager { } }; - // Check if relay supports NIP-77 negentropy AND negentropy is not disabled - let use_negentropy = - !self.config.sync_disable_negentropy && connection.supports_negentropy().await; - // Unsubscribe all current subscriptions connection.unsubscribe_all().await; @@ -564,62 +563,14 @@ impl SyncManager { } } - let now = Timestamp::now(); - - if use_negentropy { - // NIP-77 supported - use negentropy for efficient reconciliation - tracing::info!( - relay = %relay_url, - "Using NIP-77 negentropy for daily sync" - ); - - // Perform negentropy sync for Layer 1 (announcements) - let layer1_filter = filters::build_announcement_filter(None); - self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (daily)") - .await; - - // After negentropy sync, set up live subscription for new events - let live_filter = filters::build_announcement_filter(Some(now)); - if let Some(conn) = self.connections.get(relay_url) { - if let Err(e) = conn.subscribe_filter(live_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to set up live Layer 1 subscription after negentropy daily sync" - ); - } - } - - // Recompute actions for Layer 2+3 based on synced events - self.recompute_new_sync_filters_for_relay(relay_url).await; - } else { - // NIP-77 not supported - fall back to REQ+EOSE - tracing::info!( - relay = %relay_url, - "NIP-77 not supported, using REQ+EOSE for daily sync" - ); - - // Re-subscribe to Layer 1 (announcements) without since filter for full discovery - let layer1_filter = filters::build_announcement_filter(None); - if let Some(conn) = self.connections.get(relay_url) { - if let Err(e) = conn.subscribe_filter(layer1_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to re-subscribe to Layer 1 during daily sync" - ); - } - } - - // Recompute actions for Layer 2+3 - will discover all repos/events again - self.recompute_new_sync_filters_for_relay(relay_url).await; - } + // maybe we just run start fresh with a daily flag? make sture so start layer 1 filters + self.fresh_start(relay_url).await; - if let Some(ref metrics) = self.metrics { - metrics.record_event(event_source::DAILY); - } + // if let Some(ref metrics) = self.metrics { + // metrics.record_event(event_source::DAILY); + // } - tracing::info!(relay = %relay_url, "Daily sync complete"); + // tracing::info!(relay = %relay_url, "Daily sync complete"); } /// Run the sync manager @@ -827,79 +778,19 @@ impl SyncManager { // Step 2: Check if consolidation is needed BEFORE adding new filters self.maybe_consolidate(&action.relay_url, action.filters.len()) .await; - /// DELETE this bit - // Step 3: Get connection and subscribe to all filters - let connection = match self.connections.get(&action.relay_url) { - Some(conn) => conn, - None => { - tracing::warn!( - relay = %action.relay_url, - "No connection for relay, cannot subscribe" - ); - return; - } - }; // Subscribe to each filter and collect subscription IDs - let mut subscription_ids = Vec::new(); - for filter in &action.filters { - match connection.subscribe_filter(filter.clone()).await { - Ok(sub_id) => { - subscription_ids.push(sub_id); - } - Err(e) => { - tracing::error!( - relay = %action.relay_url, - error = %e, - "Failed to subscribe to filter" - ); - } - } - } - - if subscription_ids.is_empty() && !action.filters.is_empty() { - tracing::warn!( - relay = %action.relay_url, - "All filter subscriptions failed, not creating batch" - ); - return; - } - - // Step 4: Create PendingBatch - let batch_id = self.next_batch_id(); - let batch = PendingBatch { - batch_id, - items: PendingItems { - repos: action.items.repos.clone(), - root_events: action.items.root_events.clone(), - }, - outstanding_subs: subscription_ids.into_iter().collect(), - sync_method: SyncMethod::ReqEose, - }; - - // Step 5: Add to pending_sync_index - { - let mut pending = self.pending_sync_index.write().await; - pending - .entry(action.relay_url.clone()) - .or_insert_with(Vec::new) - .push(batch); - } - - tracing::debug!( + tracing::info!( relay = %action.relay_url, - batch_id = batch_id, - repos = action.items.repos.len(), - root_events = action.items.root_events.len(), - filters = action.filters.len(), - "Created pending batch for filter subscriptions" + filter_count = action.filters.len(), + repo_count = action.items.repos.len(), + root_event_count = action.items.root_events.len(), + "handle_add_filters: calling sync_live and historic_sync" ); - // REPLACE WITH THIS: - // // Subscribe to each filter and collect subscription IDs - // self.sync_live(&action.relay_url, &action.filters).await; - // // TODO need to do actions.repos - // self.historic_sync(&action.relay_url, action.filters, action.items, None) - // .await; + + self.sync_live(&action.relay_url, &action.filters).await; + self.historic_sync(&action.relay_url, action.filters, action.items, None) + .await; } /// Handle a connection success (called when a relay connects or reconnects) @@ -909,209 +800,39 @@ impl SyncManager { /// - `quick_reconnect()` if disconnected < 15 minutes /// - `long_reconnect()` if disconnected > 15 minutes async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { - let now = Timestamp::now(); - - // // Get the relay state to determine reconnect type - // let (last_connected, disconnected_at) = { - // let index = self.relay_sync_index.read().await; - // if let Some(state) = index.get(relay_url) { - // (state.last_connected, state.disconnected_at) - // } else { - // (None, None) // No state found - // } - // }; - - // // Determine which reconnection strategy to use - // match (last_connected, disconnected_at) { - // (None, _) => { - // // Never connected before - fresh start - // tracing::info!( - // relay = %relay_url, - // "First connection - initiating fresh_start" - // ); - // self.fresh_start(relay_url).await; - // } - // (Some(last), Some(disconnected)) => { - // // Was connected before, check how long disconnected - // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs()); - - // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS { - // // Disconnected < 15 minutes - quick reconnect - // // Use last_connected minus buffer as since timestamp - // let since = - // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); - // tracing::info!( - // relay = %relay_url, - // disconnect_secs = disconnect_duration, - // since = %since, - // "Short disconnection - initiating quick_reconnect" - // ); - // self.quick_reconnect(relay_url, since).await; - // } else { - // // Disconnected > 15 minutes - long reconnect - // tracing::info!( - // relay = %relay_url, - // disconnect_secs = disconnect_duration, - // "Long disconnection - initiating long_reconnect" - // ); - // self.long_reconnect(relay_url).await; - // } - // } - // (Some(_last), None) => { - // // Was connected but no disconnected_at - shouldn't happen normally - // // Treat as long reconnect to be safe - // tracing::warn!( - // relay = %relay_url, - // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect" - // ); - // self.long_reconnect(relay_url).await; - // } - // } - // Get the relay state to determine reconnect type - let (is_fresh_sync, last_connected, is_bootstrap) = { + let last_connected = { let index = self.relay_sync_index.read().await; - if let Some(state) = index.get(relay_url) { - let last_conn = state.last_connected; - let is_fresh = match last_conn { - None => true, // Never connected before - Some(last) => { - let elapsed = now.as_secs().saturating_sub(last.as_secs()); - elapsed > QUICK_RECONNECT_WINDOW_SECS // Stale if > 15 min - } - }; - (is_fresh, last_conn, state.is_bootstrap) - } else { - (true, None, false) // No state found, treat as fresh - } + index.get(relay_url).and_then(|s| s.last_connected) }; - // If stale reconnect, clear state - if is_fresh_sync && last_connected.is_some() { - let mut index = self.relay_sync_index.write().await; - if let Some(state) = index.get_mut(relay_url) { - state.clear_sync_state(); + if let Some(last) = last_connected { + let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); + if elapsed < QUICK_RECONNECT_WINDOW_SECS { + // short disconnect tracing::info!( relay = %relay_url, - "Cleared stale sync state (was disconnected > 15 min)" + disconnect_secs = elapsed, + "Short disconnection - initiating quick_reconnect" ); - } - } - - // Update connection state - { - let mut index = self.relay_sync_index.write().await; - let state = index.entry(relay_url.to_string()).or_default(); - state.connection_status = ConnectionStatus::Connected; - state.last_connected = Some(now); - state.disconnected_at = None; - } - - // Record success in health tracker - self.health_tracker.record_success(relay_url); - - // Update metrics - if let Some(ref metrics) = self.metrics { - metrics.set_relay_connected(relay_url, true); - metrics.inc_connected_count(); - metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); - } - - // Subscribe based on reconnect type - if is_fresh_sync { - tracing::info!( - relay = %relay_url, - is_bootstrap = is_bootstrap, - "Fresh sync - checking NIP-77 negentropy support" - ); - - // Check if relay supports NIP-77 negentropy for efficient sync - // Respect the sync_disable_negentropy config option - let use_negentropy = if self.config.sync_disable_negentropy { - tracing::debug!(relay = %relay_url, "Negentropy disabled via config"); - false - } else if let Some(connection) = self.connections.get(relay_url) { - connection.supports_negentropy().await - } else { - false - }; - - if use_negentropy { - // NIP-77 supported - use negentropy for historical sync - tracing::info!( - relay = %relay_url, - "Using NIP-77 negentropy for fresh sync" - ); - - // Perform negentropy sync for Layer 1 (announcements) - let layer1_filter = filters::build_announcement_filter(None); - self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1") + self.quick_reconnect(relay_url, Timestamp::from(elapsed)) .await; - - // After negentropy sync, recompute Layer 2+3 actions - // Layer 1 events are now in sync, so we can proceed with Layer 2+3 - self.recompute_new_sync_filters_for_relay(relay_url).await; - - // Set up live subscription for new events (since=now) - let live_filter = filters::build_announcement_filter(Some(now)); - if let Some(connection) = self.connections.get(relay_url) { - if let Err(e) = connection.subscribe_filter(live_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to set up live Layer 1 subscription after negentropy sync" - ); - } - } } else { - // NIP-77 not supported - fall back to REQ+EOSE + // long disconnect tracing::info!( relay = %relay_url, - "NIP-77 not supported, using REQ+EOSE for fresh sync" + disconnect_secs = elapsed, + "Long disconnection - initiating fresh_start" ); - // Fresh sync: Layer 1 subscription (without since) was already established - // during connect_and_subscribe() in handle_add_filters(). That call subscribes - // to kinds 30617+30618 for the full history. Here we only need to recompute - // Layer 2+3 actions based on the repos we're tracking. - self.recompute_new_sync_filters_for_relay(relay_url).await; + self.fresh_start(relay_url).await; } } else { - // Quick reconnect: use since filter (no negentropy needed) - let since_ts = Timestamp::from( - last_connected - .unwrap() - .as_secs() - .saturating_sub(QUICK_RECONNECT_WINDOW_SECS), - ); - + // not successfully connected before (since launching binary) tracing::info!( relay = %relay_url, - since = %since_ts, - "Quick reconnect - using since filter for incremental sync" + "First connection - initiating fresh_start" ); - - // Subscribe to Layer 1 (announcements) with since filter to catch new repos - let layer1_filter = filters::build_announcement_filter(Some(since_ts)); - if let Some(connection) = self.connections.get(relay_url) { - if let Err(e) = connection.subscribe_filter(layer1_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to subscribe to Layer 1 filter on quick reconnect" - ); - } - } - - // Rebuild Layer 2 and Layer 3 with since filter - self.rebuild_layer2_and_layer3(relay_url, Some(since_ts)) - .await; - - // Recompute actions for any new items discovered while disconnected - self.recompute_new_sync_filters_for_relay(relay_url).await; - - if let Some(ref metrics) = self.metrics { - metrics.record_event(event_source::RECONNECT); - } - } + self.fresh_start(relay_url).await; + }; } /// Fresh start - clears state and does full sync @@ -1155,77 +876,16 @@ impl SyncManager { "Cleared sync state in fresh_start" ); } - } - } - - // Step 3: Update connection state - { - let mut index = self.relay_sync_index.write().await; - let state = index.entry(relay_url.to_string()).or_default(); - state.connection_status = ConnectionStatus::Connected; - state.last_connected = Some(now); - state.disconnected_at = None; - } - - // Record success in health tracker - self.health_tracker.record_success(relay_url); - - // Update metrics - if let Some(ref metrics) = self.metrics { - metrics.set_relay_connected(relay_url, true); - metrics.inc_connected_count(); - metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); - } - - // Step 4: L1 sync - check negentropy support - let use_negentropy = if self.config.sync_disable_negentropy { - tracing::debug!(relay = %relay_url, "Negentropy disabled via config"); - false - } else if let Some(connection) = self.connections.get(relay_url) { - connection.supports_negentropy().await - } else { - false - }; - - if use_negentropy { - // NIP-77 supported - use negentropy for L1 historical sync - tracing::info!( - relay = %relay_url, - "Using NIP-77 negentropy for L1 historical sync" - ); - - // L1 historic sync (no since - full sync) - let layer1_filter = filters::build_announcement_filter(None); - self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)") - .await; - - // L1 live subscription (since=now for ongoing events) - let live_filter = filters::build_announcement_filter(Some(now)); - if let Some(connection) = self.connections.get(relay_url) { - if let Err(e) = connection.subscribe_filter(live_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to set up L1 live subscription in fresh_start" - ); + if state.connection_status == ConnectionStatus::Connected { + // TODO start layer 1 + drop(index); + self.recompute_new_sync_filters_for_relay(relay_url).await; } + } else { + drop(index); + return self.spawn_relay_connection(relay_url.to_string()).await; } - } else { - // NIP-77 not supported - REQ+EOSE - // Note: Layer 1 subscription (without since) was already established - // during connect_and_subscribe() in spawn_relay_connection - tracing::info!( - relay = %relay_url, - "Using REQ+EOSE for L1 sync (negentropy not available)" - ); } - - // Step 5: compute_actions → AddFilters for L2+L3 - // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters - // for ALL repos that should be synced from this relay - self.recompute_new_sync_filters_for_relay(relay_url).await; - - tracing::info!(relay = %relay_url, "fresh_start complete"); } /// Quick reconnect - for disconnections < 15 minutes @@ -1312,27 +972,6 @@ impl SyncManager { tracing::info!(relay = %relay_url, "quick_reconnect complete"); } - /// Long reconnect - for disconnections > 15 minutes - /// - /// Flow: - /// 1. Record disconnect/reconnect metric - /// 2. Delegate to fresh_start() - async fn long_reconnect(&mut self, relay_url: &str) { - tracing::info!(relay = %relay_url, "Starting long_reconnect"); - - // Step 1: Record disconnect/reconnect metric - // This distinguishes intentional daily refresh from failure recovery - if let Some(ref metrics) = self.metrics { - metrics.record_event(event_source::RECONNECT); - } - - // Step 2: Delegate to fresh_start - // State is too stale to trust, start fresh - self.fresh_start(relay_url).await; - - tracing::info!(relay = %relay_url, "long_reconnect complete"); - } - /// Rebuild Layer 2 and Layer 3 subscriptions for a relay /// /// Uses the confirmed repos and root_events from RelayState to build filters. @@ -1367,11 +1006,15 @@ impl SyncManager { // Build Layer 2 and Layer 3 filters let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); + // DEBUG TRACING: Log detailed filter information tracing::debug!( relay = %relay_url, filter_count = filters.len(), repos_count = repos.len(), root_events_count = root_events.len(), + repos = ?repos, + root_events = ?root_events.iter().map(|id| id.to_hex()).collect::>(), + filters = ?filters, since = ?since, "Rebuilding Layer 2/3 filters" ); @@ -1828,197 +1471,6 @@ impl SyncManager { } } - // ========================================================================= - // NIP-77 Negentropy Sync Helpers - // ========================================================================= - - /// Perform negentropy sync for a filter and process received events - /// - /// This method uses the unified PendingBatch flow: - /// 1. Creates a PendingBatch with targets for this relay - /// 2. Performs negentropy reconciliation with the remote relay - /// 3. On success, confirms the batch (moves items to RelayState) - /// 4. On failure, removes the batch without confirming - /// - /// This ensures consistent state tracking across both sync paths - /// (REQ+EOSE and Negentropy). - /// - /// # Arguments - /// * `relay_url` - The relay URL to sync with - /// * `filter` - The filter defining which events to sync - /// * `layer_name` - Human-readable layer name for logging (e.g., "Layer 1") - /// - /// # Returns - /// Number of events received from negentropy sync - async fn negentropy_sync_and_process( - &mut self, - relay_url: &str, - filter: Filter, - layer_name: &str, - ) -> usize { - use crate::sync::algorithms::derive_relay_targets; - - // Check connection exists first (borrow ends immediately) - if !self.connections.contains_key(relay_url) { - tracing::warn!( - relay = %relay_url, - layer = layer_name, - "No connection found for negentropy sync" - ); - return 0; - } - - // Step 1: Get targets for this relay and create PendingBatch - // Get batch_id first (requires mutable borrow of self) - let batch_id = self.next_batch_id(); - - let pending_items = { - let repo_index = self.repo_sync_index.read().await; - let targets = derive_relay_targets(&repo_index); - - let relay_targets = match targets.get(relay_url) { - Some(t) => t, - None => { - tracing::debug!( - relay = %relay_url, - layer = layer_name, - "No targets found for relay, skipping negentropy sync" - ); - return 0; - } - }; - - PendingItems { - repos: relay_targets.repos.clone(), - root_events: relay_targets.root_events.clone(), - } - }; - - // Create PendingBatch for negentropy sync (empty outstanding_subs) - let batch = PendingBatch { - batch_id, - items: pending_items.clone(), - outstanding_subs: HashSet::new(), // Negentropy doesn't use subscriptions - sync_method: SyncMethod::Negentropy, - }; - - // Add batch to pending_sync_index before starting sync - { - let mut pending = self.pending_sync_index.write().await; - pending - .entry(relay_url.to_string()) - .or_insert_with(Vec::new) - .push(batch); - } - - tracing::debug!( - relay = %relay_url, - layer = layer_name, - batch_id = batch_id, - repos = pending_items.repos.len(), - root_events = pending_items.root_events.len(), - "Created pending batch for negentropy sync" - ); - - // Step 2: Perform negentropy sync - // Get connection reference here (borrows self.connections briefly) - let Some(connection) = self.connections.get(relay_url) else { - // Connection was removed between check and use (race condition) - // Remove the pending batch we just added - let mut pending = self.pending_sync_index.write().await; - if let Some(batches) = pending.get_mut(relay_url) { - batches.retain(|b| b.batch_id != batch_id); - if batches.is_empty() { - pending.remove(relay_url); - } - } - tracing::warn!( - relay = %relay_url, - layer = layer_name, - "Connection disappeared before negentropy sync could start" - ); - return 0; - }; - - match connection.negentropy_sync_filter(filter).await { - Ok(result) => { - let event_count = result.received.len(); - - tracing::info!( - relay = %relay_url, - layer = layer_name, - received = event_count, - remote_only = result.remote_only.len(), - local_only = result.local_only.len(), - "Negentropy sync completed for {}", - layer_name - ); - - tracing::debug!( - relay = %relay_url, - layer = layer_name, - event_ids = ?result.received.iter().take(5).collect::>(), - "Received event IDs via negentropy (first 5 shown)" - ); - - // Record metrics for negentropy events - if let Some(ref metrics) = self.metrics { - for _ in 0..event_count { - metrics.record_event(event_source::STARTUP); - } - } - - // Step 3: Remove batch from pending and confirm it - let completed_batch = { - let mut pending = self.pending_sync_index.write().await; - if let Some(batches) = pending.get_mut(relay_url) { - let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); - if let Some(idx) = batch_idx { - let batch = batches.remove(idx); - if batches.is_empty() { - pending.remove(relay_url); - } - Some(batch) - } else { - None - } - } else { - None - } - }; - - // Confirm the batch using unified confirm_batch method - if let Some(batch) = completed_batch { - self.confirm_batch(relay_url, batch).await; - } - - event_count - } - Err(e) => { - tracing::warn!( - relay = %relay_url, - layer = layer_name, - error = %e, - "Negentropy sync failed for {}", - layer_name - ); - - // Remove the batch without confirming on failure - { - let mut pending = self.pending_sync_index.write().await; - if let Some(batches) = pending.get_mut(relay_url) { - batches.retain(|b| b.batch_id != batch_id); - if batches.is_empty() { - pending.remove(relay_url); - } - } - } - - 0 - } - } - } - // ========================================================================= // Consolidation System // ========================================================================= @@ -2410,7 +1862,7 @@ impl SyncManager { // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", // but omitting limit means "no limit" which is what we want for live. // The filter passed in should already NOT have a limit set. - match connection.subscribe_filter(filter.clone()).await { + match connection.subscribe_filter(filter.clone().limit(1)).await { Ok(sub_id) => { tracing::trace!( relay = %relay_url, @@ -2519,6 +1971,17 @@ impl SyncManager { items: PendingItems, since: Option, ) -> Option { + // DEBUG TRACING: Log all filters being passed to historic_sync + tracing::debug!( + relay = %relay_url, + filter_count = filters.len(), + filters = ?filters, + repos_count = items.repos.len(), + root_events_count = items.root_events.len(), + since = ?since, + "historic_sync called" + ); + if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() { tracing::debug!( relay = %relay_url, @@ -2527,9 +1990,9 @@ impl SyncManager { return None; } - // Check connection exists + // Check connection exists and clone for async usage let connection = match self.connections.get(relay_url) { - Some(conn) => conn, + Some(conn) => conn.clone(), None => { tracing::warn!( relay = %relay_url, @@ -2581,38 +2044,63 @@ impl SyncManager { .push(batch); } - // Perform negentropy sync for each filter + // Perform negentropy sync for all filters concurrently // Note: We sync each filter separately because negentropy works on a single filter - let mut total_received = 0; - let mut any_success = false; + let diff_futures: Vec<_> = filters_with_since + .iter() + .enumerate() + .map(|(idx, filter)| { + let filter = filter.clone(); + let conn = connection.clone(); + async move { (idx, conn.negentropy_sync_diff(filter).await) } + }) + .collect(); - for filter in &filters_with_since { - if let Some(conn) = self.connections.get(relay_url) { - match conn.negentropy_sync_filter(filter.clone()).await { - Ok(result) => { - total_received += result.received.len(); - any_success = true; - - // Record metrics for received events - if let Some(ref metrics) = self.metrics { - for _ in 0..result.received.len() { - metrics.record_event(event_source::STARTUP); - } - } - } - Err(e) => { - tracing::warn!( + let diff_results = futures_util::future::join_all(diff_futures).await; + + // Process results - collect all event IDs we need to fetch + let mut all_remote_ids = Vec::new(); + let mut failed_count = 0; + + for (idx, result) in diff_results { + match result { + Ok(reconciliation) => { + let remote_count = reconciliation.remote.len(); + if remote_count > 0 { + tracing::debug!( relay = %relay_url, - error = %e, - "Negentropy sync failed for filter in historic_sync" + filter_idx = idx, + remote_count = remote_count, + "Negentropy diff identified missing events" ); + all_remote_ids.extend(reconciliation.remote.into_iter()); } } + Err(e) => { + failed_count += 1; + tracing::warn!( + relay = %relay_url, + filter_idx = idx, + error = %e, + "Negentropy diff failed for filter in historic_sync" + ); + } } } - if any_success { - // Remove batch from pending and confirm it + // Require ALL filters to succeed to confirm the batch + if failed_count > 0 { + // Leave pending batch so it doesnt appear as synced. we can try again later. + tracing::warn!( + relay = %relay_url, + batch_id = batch_id, + failed_count = failed_count, + total_filters = filters_with_since.len(), + "historic_sync (negentropy) failed - not all filters succeeded" + ); + return None; + } else if all_remote_ids.is_empty() { + // Remove batch from pending and confirm it (no items to download) let completed_batch = { let mut pending = self.pending_sync_index.write().await; if let Some(batches) = pending.get_mut(relay_url) { @@ -2638,26 +2126,67 @@ impl SyncManager { tracing::info!( relay = %relay_url, batch_id = batch_id, - total_received = total_received, - "historic_sync (negentropy) completed" + total_received = 0, + "historic_sync (negentropy) completed - already up-to-date" ); - } else { - // All negentropy syncs failed - remove the pending batch + } + // launch subscriptions to fetch missing events by id + let ids_filters: Vec<_> = all_remote_ids + .chunks(300) + .map(|c| Filter::new().ids(c.iter().copied())) + .collect(); + + // DEBUG TRACING: Log that we're requesting events by ID + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + total_event_ids = all_remote_ids.len(), + filter_chunks = ids_filters.len(), + event_ids = ?all_remote_ids, + "Creating subscriptions to fetch missing events by ID (negentropy path)" + ); + + let mut subscription_ids = HashSet::new(); + for (idx, filter) in ids_filters.iter().enumerate() { + if let Some(conn) = self.connections.get(relay_url) { + // DEBUG TRACING: Log each filter being subscribed + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + chunk_idx = idx, + filter = ?filter, + "Subscribing to ID filter chunk" + ); + + match conn.subscribe_filter(filter.clone()).await { + Ok(sub_id) => { + subscription_ids.insert(sub_id); + } + Err(e) => { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to subscribe to filter in historic_sync" + ); + } + } + } + } + { let mut pending = self.pending_sync_index.write().await; - if let Some(batches) = pending.get_mut(relay_url) { - batches.retain(|b| b.batch_id != batch_id); - if batches.is_empty() { - pending.remove(relay_url); + if let Some(relay_batches) = pending.get_mut(relay_url) { + if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { + batch.outstanding_subs.extend(subscription_ids.clone()); } } - - tracing::warn!( - relay = %relay_url, - batch_id = batch_id, - "historic_sync (negentropy) failed for all filters" - ); - return None; } + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + subscription_ids = subscription_ids.len(), + events = all_remote_ids.len(), + "historic_sync (Negentropy) created subscritions to fetch missing events by id, awaiting EOSE" + ); } else { // Traditional REQ+EOSE path tracing::debug!( @@ -2673,7 +2202,16 @@ impl SyncManager { // Subscribe to each filter and collect subscription IDs let mut subscription_ids = HashSet::new(); - for filter in &filters_with_since { + // DEBUG TRACING: Log each filter in REQ+EOSE path + for (idx, filter) in filters_with_since.iter().enumerate() { + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + filter_idx = idx, + filter = ?filter, + "Subscribing to filter in REQ+EOSE path" + ); + if let Some(conn) = self.connections.get(relay_url) { match conn.subscribe_filter(filter.clone()).await { Ok(sub_id) => { diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 4167a0c..bc4b59e 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -325,11 +325,25 @@ impl RelayConnection { /// * `Ok(SubscriptionId)` - The subscription ID on success /// * `Err(String)` - Error description on failure pub async fn subscribe_filter(&self, filter: Filter) -> Result { + // DEBUG TRACING: Log the filter being subscribed to + tracing::debug!( + relay = %self.url, + filter = ?filter, + "subscribe_filter called with filter" + ); + let output = self .client .subscribe(filter, None) .await .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; + + tracing::debug!( + relay = %self.url, + subscription_id = %output.val, + "subscribe_filter succeeded" + ); + Ok(output.val) } @@ -407,31 +421,36 @@ impl RelayConnection { true } - /// Perform negentropy synchronization for a filter + /// Perform a negentropy sync diff (dry run) to identify missing events /// - /// Uses NIP-77 negentropy protocol to efficiently reconcile events matching - /// the filter between local database and remote relay. This is much more - /// efficient than REQ+EOSE for relays with overlapping event sets. + /// This method performs NIP-77 negentropy reconciliation without downloading events. + /// It returns the list of event IDs that need to be fetched. The caller should then + /// manually fetch these events and pass them through the write policy for validation. /// /// # Arguments - /// * `filter` - The filter defining which events to sync + /// * `filter` - The filter to sync /// /// # Returns - /// * `Ok(NegentropySyncResult)` - Sync completed successfully with reconciliation info + /// * `Ok(Reconciliation)` - Reconciliation result with remote/local/sent event IDs /// * `Err(String)` - Sync failed (relay may not support NIP-77, or other error) /// - /// # Fallback Behavior - /// If this method fails, the caller should fall back to traditional REQ+EOSE sync. - /// Failure reasons include: - /// - Relay doesn't actually support NIP-77 (despite claiming to) - /// - Network errors during reconciliation - /// - Timeout during sync - pub async fn negentropy_sync_filter( - &self, - filter: Filter, - ) -> Result { - // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange - let sync_opts = SyncOptions::default(); + /// # Usage Pattern + /// ```ignore + /// // 1. Get the diff + /// let reconciliation = conn.negentropy_sync_diff(filter).await?; + /// + /// // 2. Fetch missing events by ID + /// if !reconciliation.remote.is_empty() { + /// let ids: Vec = reconciliation.remote.into_iter().collect(); + /// let filter = Filter::new().ids(ids); + /// conn.subscribe_filter(filter, tx).await?; + /// } + /// + /// // 3. Events come through normal flow and get validated via process_event_static + /// ``` + pub async fn negentropy_sync_diff(&self, filter: Filter) -> Result { + // Use dry_run to only identify differences without downloading events + let sync_opts = SyncOptions::default().dry_run(); match self.client.sync(filter.clone(), &sync_opts).await { Ok(output) => { @@ -441,9 +460,7 @@ impl RelayConnection { relay = %self.url, local_count = reconciliation.local.len(), remote_count = reconciliation.remote.len(), - sent_count = reconciliation.sent.len(), - received_count = reconciliation.received.len(), - "Negentropy sync completed" + "Negentropy diff completed (dry run)" ); // Check for any failures @@ -451,15 +468,11 @@ impl RelayConnection { tracing::warn!( relay = %self.url, failures = ?output.failed, - "Some relays failed during negentropy sync" + "Some relays failed during negentropy diff" ); } - Ok(NegentropySyncResult { - remote_only: reconciliation.remote.into_iter().collect(), - local_only: reconciliation.local.into_iter().collect(), - received: reconciliation.received.into_iter().collect(), - }) + Ok(reconciliation) } Err(e) => { // Log warning only once per relay to avoid spam @@ -470,30 +483,14 @@ impl RelayConnection { tracing::warn!( relay = %self.url, error = %e, - "Negentropy sync failed, will fall back to REQ+EOSE" + "Negentropy diff failed, will fall back to REQ+EOSE" ); } - Err(format!("Negentropy sync failed: {}", e)) + Err(format!("Negentropy diff failed: {}", e)) } } } - /// Perform negentropy sync and return received event IDs - /// - /// Convenience method that performs negentropy sync and returns the event IDs - /// that were received (i.e., events that exist on remote but not locally). - /// - /// # Arguments - /// * `filter` - The filter defining which events to sync - /// - /// # Returns - /// * `Ok(Vec)` - Event IDs received from remote relay - /// * `Err(String)` - Sync failed - pub async fn negentropy_sync_and_fetch(&self, filter: Filter) -> Result, String> { - let result = self.negentropy_sync_filter(filter).await?; - Ok(result.received) - } - /// Check if this connection has a database configured for negentropy pub fn has_database(&self) -> bool { self.database.is_some() -- cgit v1.2.3