diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-21 13:46:55 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-21 13:46:55 +0000 |
| commit | 6c3c93752e9ee8da7f16fbeda70f9eb7a0ca8eb0 (patch) | |
| tree | 2fc223a12691c62954316542fc29ee4df466d1fe /src/sync | |
| parent | 5913c5aab4d22fcb2984e9a34b439190639b346d (diff) | |
| parent | 214fa5cbb7dacedfb3227e623e0542351c7d7956 (diff) | |
Fix negentropy fallback to REQ+EOSE when retry fails
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 124 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 21 |
2 files changed, 126 insertions, 19 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3213dfb..bc8c428 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -904,10 +904,9 @@ impl SyncManager { | |||
| 904 | // (REQ by ID) show they DO have these events. This appears to be relay- | 904 | // (REQ by ID) show they DO have these events. This appears to be relay- |
| 905 | // specific behavior where the relay refuses to serve events via negentropy | 905 | // specific behavior where the relay refuses to serve events via negentropy |
| 906 | // retry for unknown reasons (rate limiting, negentropy implementation bugs, | 906 | // retry for unknown reasons (rate limiting, negentropy implementation bugs, |
| 907 | // or other internal logic). We abort here to prevent infinite loops, but | 907 | // or other internal logic). When retry returns zero, fall back to REQ+EOSE. |
| 908 | // future enhancement could fall back to REQ+EOSE when retry returns zero. | ||
| 909 | if retry_count > 0 && received_count == 0 { | 908 | if retry_count > 0 && received_count == 0 { |
| 910 | tracing::error!( | 909 | tracing::info!( |
| 911 | relay = %relay_url, | 910 | relay = %relay_url, |
| 912 | batch_id = batch.batch_id, | 911 | batch_id = batch.batch_id, |
| 913 | retry_count = retry_count, | 912 | retry_count = retry_count, |
| @@ -915,20 +914,119 @@ impl SyncManager { | |||
| 915 | missing_count = missing.len(), | 914 | missing_count = missing.len(), |
| 916 | missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | 915 | missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), |
| 917 | "Negentropy retry made no progress - relay returned zero requested events. \ | 916 | "Negentropy retry made no progress - relay returned zero requested events. \ |
| 918 | Aborting retry to prevent infinite loop. Completing batch with partial results." | 917 | Marking relay as not supporting negentropy and falling back to REQ+EOSE." |
| 919 | // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) | ||
| 920 | ); | 918 | ); |
| 921 | 919 | ||
| 922 | // Extract and complete batch with partial results, marking as failed | 920 | // Mark relay as not supporting negentropy so future batches skip it |
| 923 | let batch_idx_for_completion = batch_idx; | 921 | if let Some(conn) = self.connections.get(relay_url) { |
| 924 | let mut completed_batch = batches.remove(batch_idx_for_completion); | 922 | conn.mark_negentropy_unsupported(); |
| 925 | completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition | ||
| 926 | if batches.is_empty() { | ||
| 927 | pending.remove(relay_url); | ||
| 928 | } | 923 | } |
| 924 | |||
| 925 | // Prepare for REQ+EOSE fallback using semantic filters | ||
| 926 | // (not ID-based queries which already failed) | ||
| 927 | let relay_url_for_fallback = relay_url.to_string(); | ||
| 928 | let batch_id = batch.batch_id; | ||
| 929 | let batch_repos = batch.items.repos.clone(); | ||
| 930 | let batch_root_events = batch.items.root_events.clone(); | ||
| 931 | let missing_count = missing.len(); | ||
| 932 | |||
| 933 | // Drop the lock before async operations | ||
| 929 | drop(pending); | 934 | drop(pending); |
| 930 | self.confirm_batch(relay_url, completed_batch).await; | 935 | |
| 931 | return; | 936 | // Create REQ+EOSE subscriptions using original semantic filters |
| 937 | // This queries by kind/author/tags instead of by ID, which may | ||
| 938 | // succeed even when ID-based queries fail | ||
| 939 | let fallback_filters = filters::build_layer2_and_layer3_filters( | ||
| 940 | &batch_repos, | ||
| 941 | &batch_root_events, | ||
| 942 | None, | ||
| 943 | ); | ||
| 944 | |||
| 945 | if fallback_filters.is_empty() { | ||
| 946 | tracing::warn!( | ||
| 947 | relay = %relay_url_for_fallback, | ||
| 948 | batch_id = batch_id, | ||
| 949 | repos = batch_repos.len(), | ||
| 950 | root_events = batch_root_events.len(), | ||
| 951 | "Cannot create semantic fallback filters - no repos or root_events in batch" | ||
| 952 | ); | ||
| 953 | // Fall through to ID-based fallback as last resort | ||
| 954 | } | ||
| 955 | |||
| 956 | let mut new_sub_ids = HashSet::new(); | ||
| 957 | if let Some(conn) = self.connections.get(&relay_url_for_fallback) { | ||
| 958 | for filter in fallback_filters { | ||
| 959 | match conn.subscribe_filter(filter, true).await { | ||
| 960 | Ok(sub_id) => { | ||
| 961 | new_sub_ids.insert(sub_id); | ||
| 962 | } | ||
| 963 | Err(e) => { | ||
| 964 | tracing::error!( | ||
| 965 | relay = %relay_url_for_fallback, | ||
| 966 | batch_id = batch_id, | ||
| 967 | error = %e, | ||
| 968 | "Failed to create REQ+EOSE fallback subscription" | ||
| 969 | ); | ||
| 970 | } | ||
| 971 | } | ||
| 972 | } | ||
| 973 | } | ||
| 974 | |||
| 975 | if !new_sub_ids.is_empty() { | ||
| 976 | // Re-acquire lock and update batch to use REQ+EOSE | ||
| 977 | let mut pending = self.pending_sync_index.write().await; | ||
| 978 | if let Some(batches) = pending.get_mut(&relay_url_for_fallback) { | ||
| 979 | if let Some(batch) = | ||
| 980 | batches.iter_mut().find(|b| b.batch_id == batch_id) | ||
| 981 | { | ||
| 982 | // Switch to REQ+EOSE sync method | ||
| 983 | batch.sync_method = SyncMethod::ReqEose; | ||
| 984 | // Clear negentropy-specific tracking | ||
| 985 | batch.requested_event_ids = None; | ||
| 986 | batch.received_event_ids = None; | ||
| 987 | // Reset retry count for REQ+EOSE flow | ||
| 988 | batch.retry_count = 0; | ||
| 989 | // Add new subscriptions to outstanding_subs | ||
| 990 | batch.outstanding_subs.extend(new_sub_ids.clone()); | ||
| 991 | |||
| 992 | tracing::info!( | ||
| 993 | relay = %relay_url_for_fallback, | ||
| 994 | batch_id = batch_id, | ||
| 995 | fallback_subs = new_sub_ids.len(), | ||
| 996 | missing_events = missing_count, | ||
| 997 | "Switched batch to REQ+EOSE fallback, waiting for EOSE" | ||
| 998 | ); | ||
| 999 | } | ||
| 1000 | } | ||
| 1001 | // Early return - batch not complete yet, waiting for REQ+EOSE EOSE | ||
| 1002 | return; | ||
| 1003 | } else { | ||
| 1004 | // Failed to create any fallback subscriptions, mark as failed | ||
| 1005 | tracing::error!( | ||
| 1006 | relay = %relay_url_for_fallback, | ||
| 1007 | batch_id = batch_id, | ||
| 1008 | missing_count = missing_count, | ||
| 1009 | "Failed to create REQ+EOSE fallback subscriptions - completing batch with partial results" | ||
| 1010 | ); | ||
| 1011 | |||
| 1012 | // Re-acquire lock to extract the batch | ||
| 1013 | let mut pending = self.pending_sync_index.write().await; | ||
| 1014 | if let Some(batches) = pending.get_mut(&relay_url_for_fallback) { | ||
| 1015 | if let Some(idx) = | ||
| 1016 | batches.iter().position(|b| b.batch_id == batch_id) | ||
| 1017 | { | ||
| 1018 | let mut completed_batch = batches.remove(idx); | ||
| 1019 | completed_batch.failed = true; // Mark as failed | ||
| 1020 | if batches.is_empty() { | ||
| 1021 | pending.remove(&relay_url_for_fallback); | ||
| 1022 | } | ||
| 1023 | drop(pending); | ||
| 1024 | self.confirm_batch(&relay_url_for_fallback, completed_batch) | ||
| 1025 | .await; | ||
| 1026 | } | ||
| 1027 | } | ||
| 1028 | return; | ||
| 1029 | } | ||
| 932 | } | 1030 | } |
| 933 | 1031 | ||
| 934 | tracing::warn!( | 1032 | tracing::warn!( |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 99fc4ea..5bc0fa3 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -282,9 +282,7 @@ impl RelayConnection { | |||
| 282 | || msg.contains("negentropy"); | 282 | || msg.contains("negentropy"); |
| 283 | 283 | ||
| 284 | if is_negentropy_notice { | 284 | if is_negentropy_notice { |
| 285 | // Mark relay as not supporting NIP-77 | 285 | self.mark_negentropy_unsupported(); |
| 286 | self.nip77_supported | ||
| 287 | .store(2, std::sync::atomic::Ordering::Relaxed); | ||
| 288 | 286 | ||
| 289 | tracing::info!( | 287 | tracing::info!( |
| 290 | relay = %url, | 288 | relay = %url, |
| @@ -478,6 +476,19 @@ impl RelayConnection { | |||
| 478 | true | 476 | true |
| 479 | } | 477 | } |
| 480 | 478 | ||
| 479 | /// Mark this relay as not supporting NIP-77 negentropy | ||
| 480 | /// | ||
| 481 | /// Called when we detect negentropy isn't working for this relay: | ||
| 482 | /// - NOTICE message contains negentropy-related error | ||
| 483 | /// - negentropy_sync_diff() fails | ||
| 484 | /// - Negentropy retry returns zero events | ||
| 485 | /// | ||
| 486 | /// Future batches will skip negentropy and use REQ+EOSE directly. | ||
| 487 | pub fn mark_negentropy_unsupported(&self) { | ||
| 488 | self.nip77_supported | ||
| 489 | .store(2, std::sync::atomic::Ordering::Relaxed); | ||
| 490 | } | ||
| 491 | |||
| 481 | /// Perform a negentropy sync diff (dry run) to identify missing events | 492 | /// Perform a negentropy sync diff (dry run) to identify missing events |
| 482 | /// | 493 | /// |
| 483 | /// This method performs NIP-77 negentropy reconciliation without downloading events. | 494 | /// This method performs NIP-77 negentropy reconciliation without downloading events. |
| @@ -564,9 +575,7 @@ impl RelayConnection { | |||
| 564 | Ok(reconciliation) | 575 | Ok(reconciliation) |
| 565 | } | 576 | } |
| 566 | Err(e) => { | 577 | Err(e) => { |
| 567 | // Mark relay as not supporting NIP-77 | 578 | self.mark_negentropy_unsupported(); |
| 568 | self.nip77_supported | ||
| 569 | .store(2, std::sync::atomic::Ordering::Relaxed); | ||
| 570 | 579 | ||
| 571 | // Log warning only once per relay to avoid spam | 580 | // Log warning only once per relay to avoid spam |
| 572 | if !self | 581 | if !self |