diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 09:24:17 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 09:24:17 +0000 |
| commit | eb10e85f199266affd3bca0a3d4cd934f74f3e7f (patch) | |
| tree | 8a09f93658be462bfb8196670195f1b8748c2183 /src/sync/mod.rs | |
| parent | e1806540b5d905646b786e21a6060e4498e9aff1 (diff) | |
feat(sync): prevent infinite retry loop in negentropy validation
Add retry protection to negentropy event validation:
- Track retry_count in PendingBatch (incremented on each retry attempt)
- Detect when retry makes zero progress (relay returns no requested events)
- Abort retry and complete batch with partial results when stuck
- Log error with full details when retry protection triggers
This prevents infinite loops when:
- Relay has bugs and returns wrong events for ID queries
- Relay is malicious and returns unrelated events
- Relay has eventual consistency issues
- Network corruption causes incorrect responses
The protection triggers when received_count == 0 on a retry (relay
returned nothing we asked for), indicating the relay will never
provide the missing events.
Future work: Track failed batches in Prometheus metrics
(sync_failed_batches_total) for monitoring and alerting.
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d33364f..00668ac 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -202,6 +202,9 @@ pub struct PendingBatch { | |||
| 202 | /// Event IDs actually received for this batch (None for REQ+EOSE) | 202 | /// Event IDs actually received for this batch (None for REQ+EOSE) |
| 203 | /// Compared against requested_event_ids to detect missing events | 203 | /// Compared against requested_event_ids to detect missing events |
| 204 | pub received_event_ids: Option<HashSet<EventId>>, | 204 | pub received_event_ids: Option<HashSet<EventId>>, |
| 205 | /// Number of retry attempts for missing events (Negentropy only) | ||
| 206 | /// Used to prevent infinite retry loops when relay consistently fails | ||
| 207 | pub retry_count: usize, | ||
| 205 | } | 208 | } |
| 206 | 209 | ||
| 207 | /// Items included in a pending batch | 210 | /// Items included in a pending batch |
| @@ -651,10 +654,38 @@ impl SyncManager { | |||
| 651 | if !missing.is_empty() { | 654 | if !missing.is_empty() { |
| 652 | let requested_count = requested.len(); | 655 | let requested_count = requested.len(); |
| 653 | let received_count = received.len(); | 656 | let received_count = received.len(); |
| 657 | let retry_count = batch.retry_count; | ||
| 658 | |||
| 659 | // Check if we made any progress (received ANY events we requested) | ||
| 660 | // If received_count is 0, relay returned nothing useful - abort retry | ||
| 661 | if retry_count > 0 && received_count == 0 { | ||
| 662 | tracing::error!( | ||
| 663 | relay = %relay_url, | ||
| 664 | batch_id = batch.batch_id, | ||
| 665 | retry_count = retry_count, | ||
| 666 | requested_count = requested_count, | ||
| 667 | missing_count = missing.len(), | ||
| 668 | missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | ||
| 669 | "Negentropy retry made no progress - relay returned zero requested events. \ | ||
| 670 | Aborting retry to prevent infinite loop. Completing batch with partial results." | ||
| 671 | // TODO: Track this failure in Prometheus metrics (sync_failed_batches_total) | ||
| 672 | ); | ||
| 673 | |||
| 674 | // Extract and complete batch with partial results | ||
| 675 | let batch_idx_for_completion = batch_idx; | ||
| 676 | let completed_batch = batches.remove(batch_idx_for_completion); | ||
| 677 | if batches.is_empty() { | ||
| 678 | pending.remove(relay_url); | ||
| 679 | } | ||
| 680 | drop(pending); | ||
| 681 | self.confirm_batch(relay_url, completed_batch).await; | ||
| 682 | return; | ||
| 683 | } | ||
| 654 | 684 | ||
| 655 | tracing::warn!( | 685 | tracing::warn!( |
| 656 | relay = %relay_url, | 686 | relay = %relay_url, |
| 657 | batch_id = batch.batch_id, | 687 | batch_id = batch.batch_id, |
| 688 | retry_count = retry_count, | ||
| 658 | requested_count = requested_count, | 689 | requested_count = requested_count, |
| 659 | received_count = received_count, | 690 | received_count = received_count, |
| 660 | missing_count = missing.len(), | 691 | missing_count = missing.len(), |
| @@ -710,12 +741,15 @@ impl SyncManager { | |||
| 710 | Some(missing.iter().cloned().collect()); | 741 | Some(missing.iter().cloned().collect()); |
| 711 | // Clear received_event_ids for fresh tracking | 742 | // Clear received_event_ids for fresh tracking |
| 712 | batch.received_event_ids = Some(HashSet::new()); | 743 | batch.received_event_ids = Some(HashSet::new()); |
| 744 | // Increment retry counter | ||
| 745 | batch.retry_count += 1; | ||
| 713 | 746 | ||
| 714 | tracing::info!( | 747 | tracing::info!( |
| 715 | relay = %relay_url_for_retry, | 748 | relay = %relay_url_for_retry, |
| 716 | batch_id = batch_id, | 749 | batch_id = batch_id, |
| 717 | retry_subs = new_sub_ids.len(), | 750 | retry_subs = new_sub_ids.len(), |
| 718 | missing_events = missing.len(), | 751 | missing_events = missing.len(), |
| 752 | retry_attempt = batch.retry_count, | ||
| 719 | "Created retry subscriptions for missing negentropy events" | 753 | "Created retry subscriptions for missing negentropy events" |
| 720 | ); | 754 | ); |
| 721 | } | 755 | } |
| @@ -2373,6 +2407,7 @@ impl SyncManager { | |||
| 2373 | pagination_state: HashMap::new(), // Negentropy doesn't use pagination | 2407 | pagination_state: HashMap::new(), // Negentropy doesn't use pagination |
| 2374 | requested_event_ids: None, // Will be set after negentropy diff | 2408 | requested_event_ids: None, // Will be set after negentropy diff |
| 2375 | received_event_ids: None, // Will be set after negentropy diff | 2409 | received_event_ids: None, // Will be set after negentropy diff |
| 2410 | retry_count: 0, | ||
| 2376 | }; | 2411 | }; |
| 2377 | 2412 | ||
| 2378 | // Add to pending_sync_index | 2413 | // Add to pending_sync_index |
| @@ -2610,6 +2645,7 @@ impl SyncManager { | |||
| 2610 | pagination_state, | 2645 | pagination_state, |
| 2611 | requested_event_ids: None, // Not used for REQ+EOSE | 2646 | requested_event_ids: None, // Not used for REQ+EOSE |
| 2612 | received_event_ids: None, // Not used for REQ+EOSE | 2647 | received_event_ids: None, // Not used for REQ+EOSE |
| 2648 | retry_count: 0, // Not used for REQ+EOSE | ||
| 2613 | }; | 2649 | }; |
| 2614 | 2650 | ||
| 2615 | // Add to pending_sync_index | 2651 | // Add to pending_sync_index |
| @@ -2822,11 +2858,13 @@ mod tests { | |||
| 2822 | pagination_state: HashMap::new(), | 2858 | pagination_state: HashMap::new(), |
| 2823 | requested_event_ids: Some(HashSet::new()), | 2859 | requested_event_ids: Some(HashSet::new()), |
| 2824 | received_event_ids: Some(HashSet::new()), | 2860 | received_event_ids: Some(HashSet::new()), |
| 2861 | retry_count: 0, | ||
| 2825 | }; | 2862 | }; |
| 2826 | 2863 | ||
| 2827 | assert!(batch.requested_event_ids.is_some()); | 2864 | assert!(batch.requested_event_ids.is_some()); |
| 2828 | assert!(batch.received_event_ids.is_some()); | 2865 | assert!(batch.received_event_ids.is_some()); |
| 2829 | assert_eq!(batch.sync_method, SyncMethod::Negentropy); | 2866 | assert_eq!(batch.sync_method, SyncMethod::Negentropy); |
| 2867 | assert_eq!(batch.retry_count, 0); | ||
| 2830 | } | 2868 | } |
| 2831 | 2869 | ||
| 2832 | #[test] | 2870 | #[test] |
| @@ -2840,6 +2878,7 @@ mod tests { | |||
| 2840 | pagination_state: HashMap::new(), | 2878 | pagination_state: HashMap::new(), |
| 2841 | requested_event_ids: None, | 2879 | requested_event_ids: None, |
| 2842 | received_event_ids: None, | 2880 | received_event_ids: None, |
| 2881 | retry_count: 0, | ||
| 2843 | }; | 2882 | }; |
| 2844 | 2883 | ||
| 2845 | assert!(batch.requested_event_ids.is_none()); | 2884 | assert!(batch.requested_event_ids.is_none()); |