From 3f19ab476799071d11e5f61074b60e31511f68a2 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Sat, 10 Jan 2026 01:34:11 +0000 Subject: fix: implement negentropy fallback to REQ+EOSE when negentropy fails When negentropy sync fails (one or more filters fail during diff), the code previously left a pending batch and returned early, preventing any sync from happening. This caused the "No sync targets found" issue. Changes: - Track negentropy success with a boolean flag - On negentropy failure: clean up pending batch and fall through to REQ+EOSE - Log the fallback at info level for visibility - Restructure control flow so REQ+EOSE path executes after negentropy failure This ensures sync always completes using traditional REQ+EOSE when NIP-77 negentropy is unavailable or fails. --- src/sync/mod.rs | 178 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 103 insertions(+), 75 deletions(-) (limited to 'src') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8b51fac..d8c2d4f 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -3146,6 +3146,9 @@ impl SyncManager { // Generate batch ID let batch_id = self.next_batch_id(); + // Track whether negentropy succeeded (for fallback logic) + let mut negentropy_succeeded = false; + if use_negentropy && !filters_with_since.is_empty() { // NIP-77 negentropy path tracing::debug!( @@ -3238,106 +3241,131 @@ impl SyncManager { // Require ALL filters to succeed to confirm the batch if failed_count > 0 { - // Leave pending batch so it doesnt appear as synced. we can try again later. - tracing::warn!( - relay = %relay_url, - batch_id = batch_id, - failed_count = failed_count, - total_filters = filters_with_since.len(), - "historic_sync (negentropy) failed - not all filters succeeded" - ); - return None; - } else if all_remote_ids.is_empty() { - // Remove batch from pending and confirm it (no items to download) - let completed_batch = { + // Remove failed negentropy batch and fall back to REQ+EOSE + { let mut pending = self.pending_sync_index.write().await; if let Some(batches) = pending.get_mut(relay_url) { let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); if let Some(idx) = batch_idx { - let batch = batches.remove(idx); + batches.remove(idx); if batches.is_empty() { pending.remove(relay_url); } - Some(batch) - } else { - None } - } else { - None } - }; - - if let Some(batch) = completed_batch { - self.confirm_batch(relay_url, batch).await; } tracing::info!( relay = %relay_url, batch_id = batch_id, - total_received = 0, - "historic_sync (negentropy) completed - already up-to-date" + failed_count = failed_count, + total_filters = filters_with_since.len(), + "historic_sync (negentropy) failed - falling back to REQ+EOSE" ); - // Batch already confirmed, nothing more to do - return Some(batch_id); - } + // Fall through to REQ+EOSE path below + } else { + // Negentropy succeeded - mark success and process results + negentropy_succeeded = true; - // launch subscriptions to fetch missing events by id - let ids_filters: Vec<_> = all_remote_ids - .chunks(300) - .map(|c| Filter::new().ids(c.iter().copied())) - .collect(); + if all_remote_ids.is_empty() { + // Remove batch from pending and confirm it (no items to download) + let completed_batch = { + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(relay_url) { + let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); + if let Some(idx) = batch_idx { + let batch = batches.remove(idx); + if batches.is_empty() { + pending.remove(relay_url); + } + Some(batch) + } else { + None + } + } else { + None + } + }; - // DEBUG TRACING: Log that we're requesting events by ID - tracing::info!( - relay = %relay_url, - batch_id = batch_id, - total_event_ids = all_remote_ids.len(), - filter_chunks = ids_filters.len(), - event_ids = ?all_remote_ids, - "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID", - ids_filters.len(), - all_remote_ids.len() - ); + if let Some(batch) = completed_batch { + self.confirm_batch(relay_url, batch).await; + } - let mut subscription_ids = HashSet::new(); - for (idx, filter) in ids_filters.iter().enumerate() { - if let Some(conn) = self.connections.get(relay_url) { - match conn.subscribe_filter(filter.clone(), true).await { - Ok(sub_id) => { - subscription_ids.insert(sub_id); - } - Err(e) => { - tracing::error!( - relay = %relay_url, - batch_id = batch_id, - chunk_idx = idx, - error = %e, - "Failed to subscribe to ID filter chunk" - ); + tracing::info!( + relay = %relay_url, + batch_id = batch_id, + total_received = 0, + "historic_sync (negentropy) completed - already up-to-date" + ); + + // Batch already confirmed, nothing more to do + return Some(batch_id); + } + + // launch subscriptions to fetch missing events by id + let ids_filters: Vec<_> = all_remote_ids + .chunks(300) + .map(|c| Filter::new().ids(c.iter().copied())) + .collect(); + + // DEBUG TRACING: Log that we're requesting events by ID + tracing::info!( + relay = %relay_url, + batch_id = batch_id, + total_event_ids = all_remote_ids.len(), + filter_chunks = ids_filters.len(), + event_ids = ?all_remote_ids, + "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID", + ids_filters.len(), + all_remote_ids.len() + ); + + let mut subscription_ids = HashSet::new(); + for (idx, filter) in ids_filters.iter().enumerate() { + if let Some(conn) = self.connections.get(relay_url) { + match conn.subscribe_filter(filter.clone(), true).await { + Ok(sub_id) => { + subscription_ids.insert(sub_id); + } + Err(e) => { + tracing::error!( + relay = %relay_url, + batch_id = batch_id, + chunk_idx = idx, + error = %e, + "Failed to subscribe to ID filter chunk" + ); + } } } } - } - { - let mut pending = self.pending_sync_index.write().await; - if let Some(relay_batches) = pending.get_mut(relay_url) { - if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { - batch.outstanding_subs.extend(subscription_ids.clone()); - // Store requested event IDs for validation after EOSE - batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect()); - batch.received_event_ids = Some(HashSet::new()); + { + let mut pending = self.pending_sync_index.write().await; + if let Some(relay_batches) = pending.get_mut(relay_url) { + if let Some(batch) = + relay_batches.iter_mut().find(|b| b.batch_id == batch_id) + { + batch.outstanding_subs.extend(subscription_ids.clone()); + // Store requested event IDs for validation after EOSE + batch.requested_event_ids = + Some(all_remote_ids.iter().cloned().collect()); + batch.received_event_ids = Some(HashSet::new()); + } } } + tracing::debug!( + relay = %relay_url, + batch_id = batch_id, + subscription_ids = subscription_ids.len(), + events = all_remote_ids.len(), + "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE" + ); } - tracing::debug!( - relay = %relay_url, - batch_id = batch_id, - subscription_ids = subscription_ids.len(), - events = all_remote_ids.len(), - "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE" - ); - } else { + } + + // Use REQ+EOSE if negentropy was not attempted or failed + if !negentropy_succeeded { // Traditional REQ+EOSE path tracing::debug!( relay = %relay_url, -- cgit v1.2.3