upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 09:24:17 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 09:24:17 +0000
commiteb10e85f199266affd3bca0a3d4cd934f74f3e7f (patch)
tree8a09f93658be462bfb8196670195f1b8748c2183 /src/sync/mod.rs
parente1806540b5d905646b786e21a6060e4498e9aff1 (diff)
feat(sync): prevent infinite retry loop in negentropy validation
Add retry protection to negentropy event validation: - Track retry_count in PendingBatch (incremented on each retry attempt) - Detect when retry makes zero progress (relay returns no requested events) - Abort retry and complete batch with partial results when stuck - Log error with full details when retry protection triggers This prevents infinite loops when: - Relay has bugs and returns wrong events for ID queries - Relay is malicious and returns unrelated events - Relay has eventual consistency issues - Network corruption causes incorrect responses The protection triggers when received_count == 0 on a retry (relay returned nothing we asked for), indicating the relay will never provide the missing events. Future work: Track failed batches in Prometheus metrics (sync_failed_batches_total) for monitoring and alerting.
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs39
1 files changed, 39 insertions, 0 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index d33364f..00668ac 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -202,6 +202,9 @@ pub struct PendingBatch {
202 /// Event IDs actually received for this batch (None for REQ+EOSE) 202 /// Event IDs actually received for this batch (None for REQ+EOSE)
203 /// Compared against requested_event_ids to detect missing events 203 /// Compared against requested_event_ids to detect missing events
204 pub received_event_ids: Option<HashSet<EventId>>, 204 pub received_event_ids: Option<HashSet<EventId>>,
205 /// Number of retry attempts for missing events (Negentropy only)
206 /// Used to prevent infinite retry loops when relay consistently fails
207 pub retry_count: usize,
205} 208}
206 209
207/// Items included in a pending batch 210/// Items included in a pending batch
@@ -651,10 +654,38 @@ impl SyncManager {
651 if !missing.is_empty() { 654 if !missing.is_empty() {
652 let requested_count = requested.len(); 655 let requested_count = requested.len();
653 let received_count = received.len(); 656 let received_count = received.len();
657 let retry_count = batch.retry_count;
658
659 // Check if we made any progress (received ANY events we requested)
660 // If received_count is 0, relay returned nothing useful - abort retry
661 if retry_count > 0 && received_count == 0 {
662 tracing::error!(
663 relay = %relay_url,
664 batch_id = batch.batch_id,
665 retry_count = retry_count,
666 requested_count = requested_count,
667 missing_count = missing.len(),
668 missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
669 "Negentropy retry made no progress - relay returned zero requested events. \
670 Aborting retry to prevent infinite loop. Completing batch with partial results."
671 // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total)
672 );
673
674 // Extract and complete batch with partial results
675 let batch_idx_for_completion = batch_idx;
676 let completed_batch = batches.remove(batch_idx_for_completion);
677 if batches.is_empty() {
678 pending.remove(relay_url);
679 }
680 drop(pending);
681 self.confirm_batch(relay_url, completed_batch).await;
682 return;
683 }
654 684
655 tracing::warn!( 685 tracing::warn!(
656 relay = %relay_url, 686 relay = %relay_url,
657 batch_id = batch.batch_id, 687 batch_id = batch.batch_id,
688 retry_count = retry_count,
658 requested_count = requested_count, 689 requested_count = requested_count,
659 received_count = received_count, 690 received_count = received_count,
660 missing_count = missing.len(), 691 missing_count = missing.len(),
@@ -710,12 +741,15 @@ impl SyncManager {
710 Some(missing.iter().cloned().collect()); 741 Some(missing.iter().cloned().collect());
711 // Clear received_event_ids for fresh tracking 742 // Clear received_event_ids for fresh tracking
712 batch.received_event_ids = Some(HashSet::new()); 743 batch.received_event_ids = Some(HashSet::new());
744 // Increment retry counter
745 batch.retry_count += 1;
713 746
714 tracing::info!( 747 tracing::info!(
715 relay = %relay_url_for_retry, 748 relay = %relay_url_for_retry,
716 batch_id = batch_id, 749 batch_id = batch_id,
717 retry_subs = new_sub_ids.len(), 750 retry_subs = new_sub_ids.len(),
718 missing_events = missing.len(), 751 missing_events = missing.len(),
752 retry_attempt = batch.retry_count,
719 "Created retry subscriptions for missing negentropy events" 753 "Created retry subscriptions for missing negentropy events"
720 ); 754 );
721 } 755 }
@@ -2373,6 +2407,7 @@ impl SyncManager {
2373 pagination_state: HashMap::new(), // Negentropy doesn't use pagination 2407 pagination_state: HashMap::new(), // Negentropy doesn't use pagination
2374 requested_event_ids: None, // Will be set after negentropy diff 2408 requested_event_ids: None, // Will be set after negentropy diff
2375 received_event_ids: None, // Will be set after negentropy diff 2409 received_event_ids: None, // Will be set after negentropy diff
2410 retry_count: 0,
2376 }; 2411 };
2377 2412
2378 // Add to pending_sync_index 2413 // Add to pending_sync_index
@@ -2610,6 +2645,7 @@ impl SyncManager {
2610 pagination_state, 2645 pagination_state,
2611 requested_event_ids: None, // Not used for REQ+EOSE 2646 requested_event_ids: None, // Not used for REQ+EOSE
2612 received_event_ids: None, // Not used for REQ+EOSE 2647 received_event_ids: None, // Not used for REQ+EOSE
2648 retry_count: 0, // Not used for REQ+EOSE
2613 }; 2649 };
2614 2650
2615 // Add to pending_sync_index 2651 // Add to pending_sync_index
@@ -2822,11 +2858,13 @@ mod tests {
2822 pagination_state: HashMap::new(), 2858 pagination_state: HashMap::new(),
2823 requested_event_ids: Some(HashSet::new()), 2859 requested_event_ids: Some(HashSet::new()),
2824 received_event_ids: Some(HashSet::new()), 2860 received_event_ids: Some(HashSet::new()),
2861 retry_count: 0,
2825 }; 2862 };
2826 2863
2827 assert!(batch.requested_event_ids.is_some()); 2864 assert!(batch.requested_event_ids.is_some());
2828 assert!(batch.received_event_ids.is_some()); 2865 assert!(batch.received_event_ids.is_some());
2829 assert_eq!(batch.sync_method, SyncMethod::Negentropy); 2866 assert_eq!(batch.sync_method, SyncMethod::Negentropy);
2867 assert_eq!(batch.retry_count, 0);
2830 } 2868 }
2831 2869
2832 #[test] 2870 #[test]
@@ -2840,6 +2878,7 @@ mod tests {
2840 pagination_state: HashMap::new(), 2878 pagination_state: HashMap::new(),
2841 requested_event_ids: None, 2879 requested_event_ids: None,
2842 received_event_ids: None, 2880 received_event_ids: None,
2881 retry_count: 0,
2843 }; 2882 };
2844 2883
2845 assert!(batch.requested_event_ids.is_none()); 2884 assert!(batch.requested_event_ids.is_none());