From fbcf2e3896b008f15f02a0df804405a346fd3656 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 21 Jan 2026 13:20:14 +0000 Subject: fix: fall back to REQ+EOSE when negentropy retry fails When negentropy retry makes no progress (relay returns zero events), this indicates the relay's negentropy implementation is broken. Instead of marking the batch as failed, we now: 1. Mark the relay as not supporting NIP-77 so future batches skip negentropy and use REQ+EOSE directly 2. Fall back to REQ+EOSE using semantic filters (kind/author/tags) for the current batch, which may succeed where ID-based queries fail This addresses the issue where some relays (e.g., azzamo.net, snort.social) return event IDs during negentropy diff but fail to serve those events when requested by ID. --- src/sync/mod.rs | 124 ++++++++++++++++++++++++++++++++++++++----- src/sync/relay_connection.rs | 12 +++++ 2 files changed, 123 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3213dfb..bc8c428 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -904,10 +904,9 @@ impl SyncManager { // (REQ by ID) show they DO have these events. This appears to be relay- // specific behavior where the relay refuses to serve events via negentropy // retry for unknown reasons (rate limiting, negentropy implementation bugs, - // or other internal logic). We abort here to prevent infinite loops, but - // future enhancement could fall back to REQ+EOSE when retry returns zero. + // or other internal logic). When retry returns zero, fall back to REQ+EOSE. if retry_count > 0 && received_count == 0 { - tracing::error!( + tracing::info!( relay = %relay_url, batch_id = batch.batch_id, retry_count = retry_count, @@ -915,20 +914,119 @@ impl SyncManager { missing_count = missing.len(), missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::>(), "Negentropy retry made no progress - relay returned zero requested events. \ - Aborting retry to prevent infinite loop. Completing batch with partial results." - // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) + Marking relay as not supporting negentropy and falling back to REQ+EOSE." ); - // Extract and complete batch with partial results, marking as failed - let batch_idx_for_completion = batch_idx; - let mut completed_batch = batches.remove(batch_idx_for_completion); - completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition - if batches.is_empty() { - pending.remove(relay_url); + // Mark relay as not supporting negentropy so future batches skip it + if let Some(conn) = self.connections.get(relay_url) { + conn.mark_negentropy_unsupported(); } + + // Prepare for REQ+EOSE fallback using semantic filters + // (not ID-based queries which already failed) + let relay_url_for_fallback = relay_url.to_string(); + let batch_id = batch.batch_id; + let batch_repos = batch.items.repos.clone(); + let batch_root_events = batch.items.root_events.clone(); + let missing_count = missing.len(); + + // Drop the lock before async operations drop(pending); - self.confirm_batch(relay_url, completed_batch).await; - return; + + // Create REQ+EOSE subscriptions using original semantic filters + // This queries by kind/author/tags instead of by ID, which may + // succeed even when ID-based queries fail + let fallback_filters = filters::build_layer2_and_layer3_filters( + &batch_repos, + &batch_root_events, + None, + ); + + if fallback_filters.is_empty() { + tracing::warn!( + relay = %relay_url_for_fallback, + batch_id = batch_id, + repos = batch_repos.len(), + root_events = batch_root_events.len(), + "Cannot create semantic fallback filters - no repos or root_events in batch" + ); + // Fall through to ID-based fallback as last resort + } + + let mut new_sub_ids = HashSet::new(); + if let Some(conn) = self.connections.get(&relay_url_for_fallback) { + for filter in fallback_filters { + match conn.subscribe_filter(filter, true).await { + Ok(sub_id) => { + new_sub_ids.insert(sub_id); + } + Err(e) => { + tracing::error!( + relay = %relay_url_for_fallback, + batch_id = batch_id, + error = %e, + "Failed to create REQ+EOSE fallback subscription" + ); + } + } + } + } + + if !new_sub_ids.is_empty() { + // Re-acquire lock and update batch to use REQ+EOSE + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(&relay_url_for_fallback) { + if let Some(batch) = + batches.iter_mut().find(|b| b.batch_id == batch_id) + { + // Switch to REQ+EOSE sync method + batch.sync_method = SyncMethod::ReqEose; + // Clear negentropy-specific tracking + batch.requested_event_ids = None; + batch.received_event_ids = None; + // Reset retry count for REQ+EOSE flow + batch.retry_count = 0; + // Add new subscriptions to outstanding_subs + batch.outstanding_subs.extend(new_sub_ids.clone()); + + tracing::info!( + relay = %relay_url_for_fallback, + batch_id = batch_id, + fallback_subs = new_sub_ids.len(), + missing_events = missing_count, + "Switched batch to REQ+EOSE fallback, waiting for EOSE" + ); + } + } + // Early return - batch not complete yet, waiting for REQ+EOSE EOSE + return; + } else { + // Failed to create any fallback subscriptions, mark as failed + tracing::error!( + relay = %relay_url_for_fallback, + batch_id = batch_id, + missing_count = missing_count, + "Failed to create REQ+EOSE fallback subscriptions - completing batch with partial results" + ); + + // Re-acquire lock to extract the batch + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(&relay_url_for_fallback) { + if let Some(idx) = + batches.iter().position(|b| b.batch_id == batch_id) + { + let mut completed_batch = batches.remove(idx); + completed_batch.failed = true; // Mark as failed + if batches.is_empty() { + pending.remove(&relay_url_for_fallback); + } + drop(pending); + self.confirm_batch(&relay_url_for_fallback, completed_batch) + .await; + } + } + return; + } } tracing::warn!( diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 99fc4ea..82e85ee 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -478,6 +478,18 @@ impl RelayConnection { true } + /// Mark this relay as not supporting NIP-77 negentropy (for external callers) + /// + /// This is called by SyncManager when negentropy retry returns zero events, + /// indicating the relay's negentropy implementation is broken. Future batches + /// will skip negentropy and use REQ+EOSE directly. + /// + /// Note: Internal code in this struct uses direct field access instead. + pub fn mark_negentropy_unsupported(&self) { + self.nip77_supported + .store(2, std::sync::atomic::Ordering::Relaxed); + } + /// Perform a negentropy sync diff (dry run) to identify missing events /// /// This method performs NIP-77 negentropy reconciliation without downloading events. -- cgit v1.2.3 From 214fa5cbb7dacedfb3227e623e0542351c7d7956 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 21 Jan 2026 13:23:41 +0000 Subject: refactor: use mark_negentropy_unsupported() consistently Refactor internal code to use the mark_negentropy_unsupported() method instead of direct field access for improved readability. --- src/sync/relay_connection.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 82e85ee..5bc0fa3 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -282,9 +282,7 @@ impl RelayConnection { || msg.contains("negentropy"); if is_negentropy_notice { - // Mark relay as not supporting NIP-77 - self.nip77_supported - .store(2, std::sync::atomic::Ordering::Relaxed); + self.mark_negentropy_unsupported(); tracing::info!( relay = %url, @@ -478,13 +476,14 @@ impl RelayConnection { true } - /// Mark this relay as not supporting NIP-77 negentropy (for external callers) + /// Mark this relay as not supporting NIP-77 negentropy /// - /// This is called by SyncManager when negentropy retry returns zero events, - /// indicating the relay's negentropy implementation is broken. Future batches - /// will skip negentropy and use REQ+EOSE directly. + /// Called when we detect negentropy isn't working for this relay: + /// - NOTICE message contains negentropy-related error + /// - negentropy_sync_diff() fails + /// - Negentropy retry returns zero events /// - /// Note: Internal code in this struct uses direct field access instead. + /// Future batches will skip negentropy and use REQ+EOSE directly. pub fn mark_negentropy_unsupported(&self) { self.nip77_supported .store(2, std::sync::atomic::Ordering::Relaxed); @@ -576,9 +575,7 @@ impl RelayConnection { Ok(reconciliation) } Err(e) => { - // Mark relay as not supporting NIP-77 - self.nip77_supported - .store(2, std::sync::atomic::Ordering::Relaxed); + self.mark_negentropy_unsupported(); // Log warning only once per relay to avoid spam if !self -- cgit v1.2.3