upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs124
-rw-r--r--src/sync/relay_connection.rs12
2 files changed, 123 insertions, 13 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 3213dfb..bc8c428 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -904,10 +904,9 @@ impl SyncManager {
904 // (REQ by ID) show they DO have these events. This appears to be relay- 904 // (REQ by ID) show they DO have these events. This appears to be relay-
905 // specific behavior where the relay refuses to serve events via negentropy 905 // specific behavior where the relay refuses to serve events via negentropy
906 // retry for unknown reasons (rate limiting, negentropy implementation bugs, 906 // retry for unknown reasons (rate limiting, negentropy implementation bugs,
907 // or other internal logic). We abort here to prevent infinite loops, but 907 // or other internal logic). When retry returns zero, fall back to REQ+EOSE.
908 // future enhancement could fall back to REQ+EOSE when retry returns zero.
909 if retry_count > 0 && received_count == 0 { 908 if retry_count > 0 && received_count == 0 {
910 tracing::error!( 909 tracing::info!(
911 relay = %relay_url, 910 relay = %relay_url,
912 batch_id = batch.batch_id, 911 batch_id = batch.batch_id,
913 retry_count = retry_count, 912 retry_count = retry_count,
@@ -915,20 +914,119 @@ impl SyncManager {
915 missing_count = missing.len(), 914 missing_count = missing.len(),
916 missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), 915 missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
917 "Negentropy retry made no progress - relay returned zero requested events. \ 916 "Negentropy retry made no progress - relay returned zero requested events. \
918 Aborting retry to prevent infinite loop. Completing batch with partial results." 917 Marking relay as not supporting negentropy and falling back to REQ+EOSE."
919 // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total)
920 ); 918 );
921 919
922 // Extract and complete batch with partial results, marking as failed 920 // Mark relay as not supporting negentropy so future batches skip it
923 let batch_idx_for_completion = batch_idx; 921 if let Some(conn) = self.connections.get(relay_url) {
924 let mut completed_batch = batches.remove(batch_idx_for_completion); 922 conn.mark_negentropy_unsupported();
925 completed_batch.failed = true; // Mark as failed for ConnectedDegraded transition
926 if batches.is_empty() {
927 pending.remove(relay_url);
928 } 923 }
924
925 // Prepare for REQ+EOSE fallback using semantic filters
926 // (not ID-based queries which already failed)
927 let relay_url_for_fallback = relay_url.to_string();
928 let batch_id = batch.batch_id;
929 let batch_repos = batch.items.repos.clone();
930 let batch_root_events = batch.items.root_events.clone();
931 let missing_count = missing.len();
932
933 // Drop the lock before async operations
929 drop(pending); 934 drop(pending);
930 self.confirm_batch(relay_url, completed_batch).await; 935
931 return; 936 // Create REQ+EOSE subscriptions using original semantic filters
937 // This queries by kind/author/tags instead of by ID, which may
938 // succeed even when ID-based queries fail
939 let fallback_filters = filters::build_layer2_and_layer3_filters(
940 &batch_repos,
941 &batch_root_events,
942 None,
943 );
944
945 if fallback_filters.is_empty() {
946 tracing::warn!(
947 relay = %relay_url_for_fallback,
948 batch_id = batch_id,
949 repos = batch_repos.len(),
950 root_events = batch_root_events.len(),
951 "Cannot create semantic fallback filters - no repos or root_events in batch"
952 );
953 // Fall through to ID-based fallback as last resort
954 }
955
956 let mut new_sub_ids = HashSet::new();
957 if let Some(conn) = self.connections.get(&relay_url_for_fallback) {
958 for filter in fallback_filters {
959 match conn.subscribe_filter(filter, true).await {
960 Ok(sub_id) => {
961 new_sub_ids.insert(sub_id);
962 }
963 Err(e) => {
964 tracing::error!(
965 relay = %relay_url_for_fallback,
966 batch_id = batch_id,
967 error = %e,
968 "Failed to create REQ+EOSE fallback subscription"
969 );
970 }
971 }
972 }
973 }
974
975 if !new_sub_ids.is_empty() {
976 // Re-acquire lock and update batch to use REQ+EOSE
977 let mut pending = self.pending_sync_index.write().await;
978 if let Some(batches) = pending.get_mut(&relay_url_for_fallback) {
979 if let Some(batch) =
980 batches.iter_mut().find(|b| b.batch_id == batch_id)
981 {
982 // Switch to REQ+EOSE sync method
983 batch.sync_method = SyncMethod::ReqEose;
984 // Clear negentropy-specific tracking
985 batch.requested_event_ids = None;
986 batch.received_event_ids = None;
987 // Reset retry count for REQ+EOSE flow
988 batch.retry_count = 0;
989 // Add new subscriptions to outstanding_subs
990 batch.outstanding_subs.extend(new_sub_ids.clone());
991
992 tracing::info!(
993 relay = %relay_url_for_fallback,
994 batch_id = batch_id,
995 fallback_subs = new_sub_ids.len(),
996 missing_events = missing_count,
997 "Switched batch to REQ+EOSE fallback, waiting for EOSE"
998 );
999 }
1000 }
1001 // Early return - batch not complete yet, waiting for REQ+EOSE EOSE
1002 return;
1003 } else {
1004 // Failed to create any fallback subscriptions, mark as failed
1005 tracing::error!(
1006 relay = %relay_url_for_fallback,
1007 batch_id = batch_id,
1008 missing_count = missing_count,
1009 "Failed to create REQ+EOSE fallback subscriptions - completing batch with partial results"
1010 );
1011
1012 // Re-acquire lock to extract the batch
1013 let mut pending = self.pending_sync_index.write().await;
1014 if let Some(batches) = pending.get_mut(&relay_url_for_fallback) {
1015 if let Some(idx) =
1016 batches.iter().position(|b| b.batch_id == batch_id)
1017 {
1018 let mut completed_batch = batches.remove(idx);
1019 completed_batch.failed = true; // Mark as failed
1020 if batches.is_empty() {
1021 pending.remove(&relay_url_for_fallback);
1022 }
1023 drop(pending);
1024 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1025 .await;
1026 }
1027 }
1028 return;
1029 }
932 } 1030 }
933 1031
934 tracing::warn!( 1032 tracing::warn!(
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 99fc4ea..82e85ee 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -478,6 +478,18 @@ impl RelayConnection {
478 true 478 true
479 } 479 }
480 480
481 /// Mark this relay as not supporting NIP-77 negentropy (for external callers)
482 ///
483 /// This is called by SyncManager when negentropy retry returns zero events,
484 /// indicating the relay's negentropy implementation is broken. Future batches
485 /// will skip negentropy and use REQ+EOSE directly.
486 ///
487 /// Note: Internal code in this struct uses direct field access instead.
488 pub fn mark_negentropy_unsupported(&self) {
489 self.nip77_supported
490 .store(2, std::sync::atomic::Ordering::Relaxed);
491 }
492
481 /// Perform a negentropy sync diff (dry run) to identify missing events 493 /// Perform a negentropy sync diff (dry run) to identify missing events
482 /// 494 ///
483 /// This method performs NIP-77 negentropy reconciliation without downloading events. 495 /// This method performs NIP-77 negentropy reconciliation without downloading events.