From de07e31fad60f9c68a08807cde17ff81d8371a65 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 12 Dec 2025 10:09:47 +0000 Subject: fix: unify sync state tracking for negentropy and REQ+EOSE paths When negentropy (NIP-77) sync was enabled, the RelaySyncIndex was never updated to reflect historical sync completion. This caused the three-way diff algorithm in compute_actions() to malfunction, leading to: - Repeated sync attempts for the same items - Incorrect filter counting for consolidation - Potential premature relay disconnection This fix unifies both sync paths (REQ+EOSE and Negentropy) through a consistent PendingBatch flow: 1. Added SyncMethod enum to distinguish between sync types 2. Updated PendingBatch struct to include sync_method field 3. Extracted confirm_batch() method for unified batch confirmation 4. Modified negentropy_sync_and_process() to: - Create a PendingBatch before sync - Add batch to pending_sync_index - On success: Remove batch and call confirm_batch() - On failure: Remove batch without confirming The confirm_batch() method moves repos and root_events from the batch to the RelayState.repos and RelayState.root_events, ensuring the three-way diff works correctly regardless of sync method. Closes: negentropy-sync-state-tracking.md --- src/sync/algorithms.rs | 4 +- src/sync/mod.rs | 242 +++++++++++++++++++++++++++++++++++++------------ 2 files changed, 188 insertions(+), 58 deletions(-) (limited to 'src/sync') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 3063516..5b5b520 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -11,7 +11,7 @@ use std::collections::{HashMap, HashSet}; use nostr_sdk::prelude::*; -use super::{ConnectionStatus, PendingBatch, RelayState}; +use super::{ConnectionStatus, PendingBatch, RelayState, SyncMethod}; // ============================================================================= // Data Structures @@ -396,6 +396,7 @@ mod tests { root_events: HashSet::new(), }, outstanding_subs: HashSet::new(), + sync_method: SyncMethod::ReqEose, }], ); @@ -504,6 +505,7 @@ mod tests { root_events: HashSet::new(), }, outstanding_subs: HashSet::new(), + sync_method: SyncMethod::ReqEose, }], ); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 50d4ae5..b69d627 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -137,15 +137,27 @@ impl RelayState { } } -/// A batch of items pending EOSE confirmation +/// Method used for synchronization +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncMethod { + /// Traditional REQ+EOSE flow - waits for EOSE on subscriptions + ReqEose, + /// NIP-77 negentropy sync - confirms immediately after sync completes + Negentropy, +} + +/// A batch of items pending confirmation #[derive(Debug, Clone)] pub struct PendingBatch { /// Unique ID for this batch - for debugging/logging pub batch_id: u64, /// The items this batch is syncing pub items: PendingItems, - /// Subscription IDs that must ALL receive EOSE before confirming + /// Subscription IDs that must ALL receive EOSE before confirming (for ReqEose) + /// Empty for Negentropy sync method pub outstanding_subs: HashSet, + /// The sync method used for this batch + pub sync_method: SyncMethod, } /// Items included in a pending batch @@ -397,9 +409,7 @@ impl SyncManager { /// - Finds the PendingBatch containing this subscription ID /// - Removes the subscription from outstanding_subs /// - When all subscriptions complete (outstanding_subs empty): - /// - Moves repos from pending to confirmed in RelayState - /// - Moves root_events from pending to confirmed - /// - Removes the batch from pending_sync_index + /// - Calls confirm_batch to move items to confirmed state async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { // 1. Find and update the pending batch let mut pending = self.pending_sync_index.write().await; @@ -444,46 +454,60 @@ impl SyncManager { return; } - // 2. Batch complete - extract items and remove batch + // 2. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); - let batch_id = completed_batch.batch_id; - let repos_count = completed_batch.items.repos.len(); - let events_count = completed_batch.items.root_events.len(); // Clean up empty relay entry if batches.is_empty() { pending.remove(relay_url); } - // Drop the pending lock before acquiring relay_sync_index lock + // Drop the pending lock before confirm_batch drop(pending); - // 3. Move items to confirmed state in RelayState - { - let mut relay_index = self.relay_sync_index.write().await; + // 3. Confirm the batch (moves items to RelayState) + self.confirm_batch(relay_url, completed_batch).await; + } - if let Some(state) = relay_index.get_mut(relay_url) { - // Move repos to confirmed - state.repos.extend(completed_batch.items.repos); - // Move root_events to confirmed - state.root_events.extend(completed_batch.items.root_events); + /// Confirm a completed batch by moving items to RelayState + /// + /// This method is used by both sync paths (REQ+EOSE and Negentropy) to + /// move repos and root_events from pending to confirmed state. This unified + /// flow ensures consistent state tracking regardless of sync method. + /// + /// # Arguments + /// * `relay_url` - The relay URL the batch belongs to + /// * `batch` - The completed batch to confirm + async fn confirm_batch(&self, relay_url: &str, batch: PendingBatch) { + let batch_id = batch.batch_id; + let repos_count = batch.items.repos.len(); + let events_count = batch.items.root_events.len(); + let sync_method = batch.sync_method; + + let mut relay_index = self.relay_sync_index.write().await; + + if let Some(state) = relay_index.get_mut(relay_url) { + // Move repos to confirmed + state.repos.extend(batch.items.repos); + // Move root_events to confirmed + state.root_events.extend(batch.items.root_events); - tracing::info!( - relay = %relay_url, - batch_id = batch_id, - repos_confirmed = repos_count, - root_events_confirmed = events_count, - total_repos = state.repos.len(), - total_root_events = state.root_events.len(), - "Batch confirmed - items moved from pending to confirmed" - ); - } else { - tracing::warn!( - relay = %relay_url, - batch_id = batch_id, - "Batch completed but no RelayState found for relay" - ); - } + tracing::info!( + relay = %relay_url, + batch_id = batch_id, + sync_method = ?sync_method, + repos_confirmed = repos_count, + root_events_confirmed = events_count, + total_repos = state.repos.len(), + total_root_events = state.root_events.len(), + "Batch confirmed - items moved from pending to confirmed" + ); + } else { + tracing::warn!( + relay = %relay_url, + batch_id = batch_id, + "Batch completed but no RelayState found for relay" + ); } } @@ -850,6 +874,7 @@ impl SyncManager { root_events: action.root_events.clone(), }, outstanding_subs: subscription_ids.into_iter().collect(), + sync_method: SyncMethod::ReqEose, }; // Step 5: Add to pending_sync_index @@ -1539,10 +1564,14 @@ impl SyncManager { /// Perform negentropy sync for a filter and process received events /// - /// This method: - /// 1. Performs negentropy reconciliation with the remote relay - /// 2. Processes all received events (dedup, policy check, save, broadcast) - /// 3. Returns the number of events received and processed + /// This method uses the unified PendingBatch flow: + /// 1. Creates a PendingBatch with targets for this relay + /// 2. Performs negentropy reconciliation with the remote relay + /// 3. On success, confirms the batch (moves items to RelayState) + /// 4. On failure, removes the batch without confirming + /// + /// This ensures consistent state tracking across both sync paths + /// (REQ+EOSE and Negentropy). /// /// # Arguments /// * `relay_url` - The relay URL to sync with @@ -1552,24 +1581,95 @@ impl SyncManager { /// # Returns /// Number of events received from negentropy sync async fn negentropy_sync_and_process( - &self, + &mut self, relay_url: &str, filter: Filter, layer_name: &str, ) -> usize { - let connection = match self.connections.get(relay_url) { - Some(conn) => conn, - None => { - tracing::warn!( - relay = %relay_url, - layer = layer_name, - "No connection found for negentropy sync" - ); - return 0; + use crate::sync::algorithms::derive_relay_targets; + + // Check connection exists first (borrow ends immediately) + if !self.connections.contains_key(relay_url) { + tracing::warn!( + relay = %relay_url, + layer = layer_name, + "No connection found for negentropy sync" + ); + return 0; + } + + // Step 1: Get targets for this relay and create PendingBatch + // Get batch_id first (requires mutable borrow of self) + let batch_id = self.next_batch_id(); + + let pending_items = { + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + + let relay_targets = match targets.get(relay_url) { + Some(t) => t, + None => { + tracing::debug!( + relay = %relay_url, + layer = layer_name, + "No targets found for relay, skipping negentropy sync" + ); + return 0; + } + }; + + PendingItems { + repos: relay_targets.repos.clone(), + root_events: relay_targets.root_events.clone(), } }; - // Perform negentropy sync + // Create PendingBatch for negentropy sync (empty outstanding_subs) + let batch = PendingBatch { + batch_id, + items: pending_items.clone(), + outstanding_subs: HashSet::new(), // Negentropy doesn't use subscriptions + sync_method: SyncMethod::Negentropy, + }; + + // Add batch to pending_sync_index before starting sync + { + let mut pending = self.pending_sync_index.write().await; + pending + .entry(relay_url.to_string()) + .or_insert_with(Vec::new) + .push(batch); + } + + tracing::debug!( + relay = %relay_url, + layer = layer_name, + batch_id = batch_id, + repos = pending_items.repos.len(), + root_events = pending_items.root_events.len(), + "Created pending batch for negentropy sync" + ); + + // Step 2: Perform negentropy sync + // Get connection reference here (borrows self.connections briefly) + let Some(connection) = self.connections.get(relay_url) else { + // Connection was removed between check and use (race condition) + // Remove the pending batch we just added + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(relay_url) { + batches.retain(|b| b.batch_id != batch_id); + if batches.is_empty() { + pending.remove(relay_url); + } + } + tracing::warn!( + relay = %relay_url, + layer = layer_name, + "Connection disappeared before negentropy sync could start" + ); + return 0; + }; + match connection.negentropy_sync_filter(filter).await { Ok(result) => { let event_count = result.received.len(); @@ -1584,14 +1684,6 @@ impl SyncManager { layer_name ); - // Note: nostr-sdk's sync() handles fetching events automatically. - // The result.received contains EventIds that were fetched during sync. - // Events are stored in nostr-sdk's client database. - // For production use, we would need to either: - // 1. Configure nostr-sdk client to use our SharedDatabase - // 2. Fetch events by ID from nostr-sdk's database to process them - // For now, we just log the count - the sync operation itself ensures - // the relay has the events available. tracing::debug!( relay = %relay_url, layer = layer_name, @@ -1606,6 +1698,30 @@ impl SyncManager { } } + // Step 3: Remove batch from pending and confirm it + let completed_batch = { + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(relay_url) { + let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); + if let Some(idx) = batch_idx { + let batch = batches.remove(idx); + if batches.is_empty() { + pending.remove(relay_url); + } + Some(batch) + } else { + None + } + } else { + None + } + }; + + // Confirm the batch using unified confirm_batch method + if let Some(batch) = completed_batch { + self.confirm_batch(relay_url, batch).await; + } + event_count } Err(e) => { @@ -1616,6 +1732,18 @@ impl SyncManager { "Negentropy sync failed for {}, will fall back to REQ+EOSE", layer_name ); + + // Remove the batch without confirming on failure + { + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(relay_url) { + batches.retain(|b| b.batch_id != batch_id); + if batches.is_empty() { + pending.remove(relay_url); + } + } + } + 0 } } -- cgit v1.2.3