diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-21 13:20:14 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-21 13:45:30 +0000 |
| commit | fbcf2e3896b008f15f02a0df804405a346fd3656 (patch) | |
| tree | 1ebe746658ed930c468361e88b4eba705cbffd5b /src | |
| parent | 5913c5aab4d22fcb2984e9a34b439190639b346d (diff) | |
fix: fall back to REQ+EOSE when negentropy retry fails
When negentropy retry makes no progress (relay returns zero events),
this indicates the relay's negentropy implementation is broken. Instead
of marking the batch as failed, we now:
1. Mark the relay as not supporting NIP-77 so future batches skip
negentropy and use REQ+EOSE directly
2. Fall back to REQ+EOSE using semantic filters (kind/author/tags)
for the current batch, which may succeed where ID-based queries fail
This addresses the issue where some relays (e.g., azzamo.net, snort.social)
return event IDs during negentropy diff but fail to serve those events
when requested by ID.
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/mod.rs | 124 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 12 |
2 files changed, 123 insertions, 13 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3213dfb..bc8c428 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -904,10 +904,9 @@ impl SyncManager { | |||
| 904 | // (REQ by ID) show they DO have these events. This appears to be relay- | 904 | // (REQ by ID) show they DO have these events. This appears to be relay- |
| 905 | // specific behavior where the relay refuses to serve events via negentropy | 905 | // specific behavior where the relay refuses to serve events via negentropy |
| 906 | // retry for unknown reasons (rate limiting, negentropy implementation bugs, | 906 | // retry for unknown reasons (rate limiting, negentropy implementation bugs, |
| 907 | // or other internal logic). We abort here to prevent infinite loops, but | 907 | // or other internal logic). When retry returns zero, fall back to REQ+EOSE. |
| 908 | // future enhancement could fall back to REQ+EOSE when retry returns zero. | ||
| 909 | if retry_count > 0 && received_count == 0 { | 908 | if retry_count > 0 && received_count == 0 { |
| 910 | tracing::error!( | 909 | tracing::info!( |
| 911 | relay = %relay_url, | 910 | relay = %relay_url, |
| 912 | batch_id = batch.batch_id, | 911 | batch_id = batch.batch_id, |
| 913 | retry_count = retry_count, | 912 | retry_count = retry_count, |
| @@ -915,20 +914,119 @@ impl SyncManager { | |||
| 915 | missing_count = missing.len(), | 914 | missing_count = missing.len(), |
| 916 | missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | 915 | missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), |
| 917 | "Negentropy retry made no progress - relay returned zero requested events. \ | 916 | "Negentropy retry made no progress - relay returned zero requested events. \ |
| 918 | Aborting retry to prevent infinite loop. Completing batch with partial results." | 917 | Marking relay as not supporting negentropy and falling back to REQ+EOSE." |
| 919 | // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) | ||
| 920 | ); | 918 | ); |
| 921 | 919 | ||
| 922 | // Extract and complete batch with partial results, marking as failed | 920 | // Mark relay as not supporting negentropy so future batches skip it |
| 923 | let batch_idx_for_completion = batch_idx; | 921 | if let Some(conn) = self.connections.get(relay_url) { |
| 924 | let mut completed_batch = batches.remove(batch_idx_for_completion); | 922 | conn.mark_negentropy_unsupported(); |
| 925 | completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition | ||
| 926 | if batches.is_empty() { | ||
| 927 | pending.remove(relay_url); | ||
| 928 | } | 923 | } |
| 924 | |||
| 925 | // Prepare for REQ+EOSE fallback using semantic filters | ||
| 926 | // (not ID-based queries which already failed) | ||
| 927 | let relay_url_for_fallback = relay_url.to_string(); | ||
| 928 | let batch_id = batch.batch_id; | ||
| 929 | let batch_repos = batch.items.repos.clone(); | ||
| 930 | let batch_root_events = batch.items.root_events.clone(); | ||
| 931 | let missing_count = missing.len(); | ||
| 932 | |||
| 933 | // Drop the lock before async operations | ||
| 929 | drop(pending); | 934 | drop(pending); |
| 930 | self.confirm_batch(relay_url, completed_batch).await; | 935 | |
| 931 | return; | 936 | // Create REQ+EOSE subscriptions using original semantic filters |
| 937 | // This queries by kind/author/tags instead of by ID, which may | ||
| 938 | // succeed even when ID-based queries fail | ||
| 939 | let fallback_filters = filters::build_layer2_and_layer3_filters( | ||
| 940 | &batch_repos, | ||
| 941 | &batch_root_events, | ||
| 942 | None, | ||
| 943 | ); | ||
| 944 | |||
| 945 | if fallback_filters.is_empty() { | ||
| 946 | tracing::warn!( | ||
| 947 | relay = %relay_url_for_fallback, | ||
| 948 | batch_id = batch_id, | ||
| 949 | repos = batch_repos.len(), | ||
| 950 | root_events = batch_root_events.len(), | ||
| 951 | "Cannot create semantic fallback filters - no repos or root_events in batch" | ||
| 952 | ); | ||
| 953 | // Fall through to ID-based fallback as last resort | ||
| 954 | } | ||
| 955 | |||
| 956 | let mut new_sub_ids = HashSet::new(); | ||
| 957 | if let Some(conn) = self.connections.get(&relay_url_for_fallback) { | ||
| 958 | for filter in fallback_filters { | ||
| 959 | match conn.subscribe_filter(filter, true).await { | ||
| 960 | Ok(sub_id) => { | ||
| 961 | new_sub_ids.insert(sub_id); | ||
| 962 | } | ||
| 963 | Err(e) => { | ||
| 964 | tracing::error!( | ||
| 965 | relay = %relay_url_for_fallback, | ||
| 966 | batch_id = batch_id, | ||
| 967 | error = %e, | ||
| 968 | "Failed to create REQ+EOSE fallback subscription" | ||
| 969 | ); | ||
| 970 | } | ||
| 971 | } | ||
| 972 | } | ||
| 973 | } | ||
| 974 | |||
| 975 | if !new_sub_ids.is_empty() { | ||
| 976 | // Re-acquire lock and update batch to use REQ+EOSE | ||
| 977 | let mut pending = self.pending_sync_index.write().await; | ||
| 978 | if let Some(batches) = pending.get_mut(&relay_url_for_fallback) { | ||
| 979 | if let Some(batch) = | ||
| 980 | batches.iter_mut().find(|b| b.batch_id == batch_id) | ||
| 981 | { | ||
| 982 | // Switch to REQ+EOSE sync method | ||
| 983 | batch.sync_method = SyncMethod::ReqEose; | ||
| 984 | // Clear negentropy-specific tracking | ||
| 985 | batch.requested_event_ids = None; | ||
| 986 | batch.received_event_ids = None; | ||
| 987 | // Reset retry count for REQ+EOSE flow | ||
| 988 | batch.retry_count = 0; | ||
| 989 | // Add new subscriptions to outstanding_subs | ||
| 990 | batch.outstanding_subs.extend(new_sub_ids.clone()); | ||
| 991 | |||
| 992 | tracing::info!( | ||
| 993 | relay = %relay_url_for_fallback, | ||
| 994 | batch_id = batch_id, | ||
| 995 | fallback_subs = new_sub_ids.len(), | ||
| 996 | missing_events = missing_count, | ||
| 997 | "Switched batch to REQ+EOSE fallback, waiting for EOSE" | ||
| 998 | ); | ||
| 999 | } | ||
| 1000 | } | ||
| 1001 | // Early return - batch not complete yet, waiting for REQ+EOSE EOSE | ||
| 1002 | return; | ||
| 1003 | } else { | ||
| 1004 | // Failed to create any fallback subscriptions, mark as failed | ||
| 1005 | tracing::error!( | ||
| 1006 | relay = %relay_url_for_fallback, | ||
| 1007 | batch_id = batch_id, | ||
| 1008 | missing_count = missing_count, | ||
| 1009 | "Failed to create REQ+EOSE fallback subscriptions - completing batch with partial results" | ||
| 1010 | ); | ||
| 1011 | |||
| 1012 | // Re-acquire lock to extract the batch | ||
| 1013 | let mut pending = self.pending_sync_index.write().await; | ||
| 1014 | if let Some(batches) = pending.get_mut(&relay_url_for_fallback) { | ||
| 1015 | if let Some(idx) = | ||
| 1016 | batches.iter().position(|b| b.batch_id == batch_id) | ||
| 1017 | { | ||
| 1018 | let mut completed_batch = batches.remove(idx); | ||
| 1019 | completed_batch.failed = true; // Mark as failed | ||
| 1020 | if batches.is_empty() { | ||
| 1021 | pending.remove(&relay_url_for_fallback); | ||
| 1022 | } | ||
| 1023 | drop(pending); | ||
| 1024 | self.confirm_batch(&relay_url_for_fallback, completed_batch) | ||
| 1025 | .await; | ||
| 1026 | } | ||
| 1027 | } | ||
| 1028 | return; | ||
| 1029 | } | ||
| 932 | } | 1030 | } |
| 933 | 1031 | ||
| 934 | tracing::warn!( | 1032 | tracing::warn!( |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 99fc4ea..82e85ee 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -478,6 +478,18 @@ impl RelayConnection { | |||
| 478 | true | 478 | true |
| 479 | } | 479 | } |
| 480 | 480 | ||
| 481 | /// Mark this relay as not supporting NIP-77 negentropy (for external callers) | ||
| 482 | /// | ||
| 483 | /// This is called by SyncManager when negentropy retry returns zero events, | ||
| 484 | /// indicating the relay's negentropy implementation is broken. Future batches | ||
| 485 | /// will skip negentropy and use REQ+EOSE directly. | ||
| 486 | /// | ||
| 487 | /// Note: Internal code in this struct uses direct field access instead. | ||
| 488 | pub fn mark_negentropy_unsupported(&self) { | ||
| 489 | self.nip77_supported | ||
| 490 | .store(2, std::sync::atomic::Ordering::Relaxed); | ||
| 491 | } | ||
| 492 | |||
| 481 | /// Perform a negentropy sync diff (dry run) to identify missing events | 493 | /// Perform a negentropy sync diff (dry run) to identify missing events |
| 482 | /// | 494 | /// |
| 483 | /// This method performs NIP-77 negentropy reconciliation without downloading events. | 495 | /// This method performs NIP-77 negentropy reconciliation without downloading events. |