From 0228fa1e2fac86cfd2543444eef0784faa7a9715 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 10:35:48 +0000 Subject: sync: negentropy fixes --- src/sync/mod.rs | 47 ++++++++++++++++------------------------------- 1 file changed, 16 insertions(+), 31 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6adfc55..e66611c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1107,7 +1107,10 @@ impl SyncManager { /// /// Uses the confirmed repos and root_events from RelayState to build filters. /// If since is provided, applies it to all filters for incremental sync. - async fn rebuild_layer2_and_layer3(&self, relay_url: &str, since: Option) { + /// + /// CRITICAL: This method now creates a PendingBatch to track subscriptions, + /// ensuring EOSE handling works correctly for live sync scenarios. + async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option) { use crate::sync::filters::build_layer2_and_layer3_filters; // Get confirmed state from relay_sync_index @@ -1137,36 +1140,15 @@ impl SyncManager { // Build Layer 2 and Layer 3 filters let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); - // DEBUG TRACING: Log detailed filter information - tracing::debug!( - relay = %relay_url, - filter_count = filters.len(), - repos_count = repos.len(), - root_events_count = root_events.len(), - repos = ?repos, - root_events = ?root_events.iter().map(|id| id.to_hex()).collect::>(), - filters = ?filters, - since = ?since, - "Rebuilding Layer 2/3 filters" - ); - - // Subscribe to filters on the relay connection - if let Some(connection) = self.connections.get(relay_url) { - for filter in filters { - if let Err(e) = connection.subscribe_filter(filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to subscribe to Layer 2/3 filter during rebuild" - ); - } - } - } else { - tracing::warn!( + if filters.is_empty() { + tracing::debug!( relay = %relay_url, - "No active connection found for Layer 2/3 rebuild" + "No filters generated for Layer 2/3 rebuild" ); + return; } + // TODO do we add since instead of limit to live sync or do a historic sync of filters? + self.sync_live(relay_url, &filters).await; } /// Register a relay for managed connection/reconnection @@ -1950,7 +1932,6 @@ impl SyncManager { } /// Sync historical events and track in PendingSyncIndex - #[allow(dead_code)] // Will be used in Phase 3+ /// /// This method handles historical synchronization for a set of filters, /// creating a PendingBatch to track completion. It dispatches to either @@ -2017,8 +1998,8 @@ impl SyncManager { // Check if we should use negentropy // TODO once we have setup our new tests we will re-enable this and fix our implementation - let use_negentropy = false; - // !self.config.sync_disable_negentropy && connection.supports_negentropy().await; + let use_negentropy = + !self.config.sync_disable_negentropy && connection.supports_negentropy().await; // Generate batch ID let batch_id = self.next_batch_id(); @@ -2136,7 +2117,11 @@ impl SyncManager { total_received = 0, "historic_sync (negentropy) completed - already up-to-date" ); + + // Batch already confirmed, nothing more to do + return Some(batch_id); } + // launch subscriptions to fetch missing events by id let ids_filters: Vec<_> = all_remote_ids .chunks(300) -- cgit v1.2.3