diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 09:21:55 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 09:21:55 +0000 |
| commit | e1806540b5d905646b786e21a6060e4498e9aff1 (patch) | |
| tree | f9a9745484772351e5a21ee16600ab064a15bcd5 /src | |
| parent | 5eb736e1184e313efa65237bf1973dee21afb43f (diff) | |
feat(sync): validate negentropy event receipt and retry missing events
Add validation that all events requested by ID during negentropy sync
are actually received from the relay. When events are missing:
- Log detailed information (requested/received/missing counts and IDs)
- Create retry subscriptions for missing events (chunked by 300)
- Update batch to track only missing events in next round
- Only complete batch after all events received or retry fails
This handles relays that have limits on ID-based queries (e.g., max 150
events per query) by automatically retrying in smaller chunks.
Also excludes purgatory and rejected announcement events from negentropy
requests to avoid re-requesting events we know we can't/won't store.
Note: Current implementation lacks retry limit - infinite loop protection
needed (tracked as future work).
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/algorithms.rs | 4 | ||||
| -rw-r--r-- | src/sync/mod.rs | 249 |
2 files changed, 248 insertions, 5 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index f4b1f5c..4679986 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -402,6 +402,8 @@ mod tests { | |||
| 402 | outstanding_subs: HashSet::new(), | 402 | outstanding_subs: HashSet::new(), |
| 403 | sync_method: SyncMethod::ReqEose, | 403 | sync_method: SyncMethod::ReqEose, |
| 404 | pagination_state: HashMap::new(), | 404 | pagination_state: HashMap::new(), |
| 405 | requested_event_ids: None, | ||
| 406 | received_event_ids: None, | ||
| 405 | }], | 407 | }], |
| 406 | ); | 408 | ); |
| 407 | 409 | ||
| @@ -514,6 +516,8 @@ mod tests { | |||
| 514 | outstanding_subs: HashSet::new(), | 516 | outstanding_subs: HashSet::new(), |
| 515 | sync_method: SyncMethod::ReqEose, | 517 | sync_method: SyncMethod::ReqEose, |
| 516 | pagination_state: HashMap::new(), | 518 | pagination_state: HashMap::new(), |
| 519 | requested_event_ids: None, | ||
| 520 | received_event_ids: None, | ||
| 517 | }], | 521 | }], |
| 518 | ); | 522 | ); |
| 519 | 523 | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ecfd020..d33364f 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -196,6 +196,12 @@ pub struct PendingBatch { | |||
| 196 | /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy) | 196 | /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy) |
| 197 | /// Maps subscription ID to its pagination state | 197 | /// Maps subscription ID to its pagination state |
| 198 | pub pagination_state: HashMap<SubscriptionId, PaginationState>, | 198 | pub pagination_state: HashMap<SubscriptionId, PaginationState>, |
| 199 | /// Event IDs requested via negentropy ID-based fetch (None for REQ+EOSE) | ||
| 200 | /// Used to validate that all requested events were received | ||
| 201 | pub requested_event_ids: Option<HashSet<EventId>>, | ||
| 202 | /// Event IDs actually received for this batch (None for REQ+EOSE) | ||
| 203 | /// Compared against requested_event_ids to detect missing events | ||
| 204 | pub received_event_ids: Option<HashSet<EventId>>, | ||
| 199 | } | 205 | } |
| 200 | 206 | ||
| 201 | /// Items included in a pending batch | 207 | /// Items included in a pending batch |
| @@ -633,7 +639,119 @@ impl SyncManager { | |||
| 633 | return; | 639 | return; |
| 634 | } | 640 | } |
| 635 | 641 | ||
| 636 | // 2. Batch complete - extract and remove | 642 | // 2. Batch complete - validate negentropy ID fetches before confirming |
| 643 | // For negentropy batches, check if all requested events were received | ||
| 644 | if batch.sync_method == SyncMethod::Negentropy { | ||
| 645 | if let (Some(requested), Some(received)) = | ||
| 646 | (&batch.requested_event_ids, &batch.received_event_ids) | ||
| 647 | { | ||
| 648 | let missing: Vec<EventId> = | ||
| 649 | requested.difference(received).cloned().collect(); | ||
| 650 | |||
| 651 | if !missing.is_empty() { | ||
| 652 | let requested_count = requested.len(); | ||
| 653 | let received_count = received.len(); | ||
| 654 | |||
| 655 | tracing::warn!( | ||
| 656 | relay = %relay_url, | ||
| 657 | batch_id = batch.batch_id, | ||
| 658 | requested_count = requested_count, | ||
| 659 | received_count = received_count, | ||
| 660 | missing_count = missing.len(), | ||
| 661 | missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | ||
| 662 | "Negentropy sync incomplete - relay returned fewer events than requested. \ | ||
| 663 | This may indicate a relay limit on ID-based queries. \ | ||
| 664 | Retrying missing events." | ||
| 665 | ); | ||
| 666 | |||
| 667 | // Create retry subscription for missing events | ||
| 668 | // Chunk by 300 to avoid overly large filters | ||
| 669 | let relay_url_for_retry = relay_url.to_string(); | ||
| 670 | let batch_id = batch.batch_id; | ||
| 671 | |||
| 672 | // Drop the lock before async operations | ||
| 673 | drop(pending); | ||
| 674 | |||
| 675 | // Create new subscriptions for missing events | ||
| 676 | let retry_filters: Vec<_> = missing | ||
| 677 | .chunks(300) | ||
| 678 | .map(|c| Filter::new().ids(c.iter().copied())) | ||
| 679 | .collect(); | ||
| 680 | |||
| 681 | let mut new_sub_ids = HashSet::new(); | ||
| 682 | if let Some(conn) = self.connections.get(&relay_url_for_retry) { | ||
| 683 | for filter in retry_filters { | ||
| 684 | match conn.subscribe_filter(filter, true).await { | ||
| 685 | Ok(sub_id) => { | ||
| 686 | new_sub_ids.insert(sub_id); | ||
| 687 | } | ||
| 688 | Err(e) => { | ||
| 689 | tracing::error!( | ||
| 690 | relay = %relay_url_for_retry, | ||
| 691 | batch_id = batch_id, | ||
| 692 | error = %e, | ||
| 693 | "Failed to create retry subscription for missing events" | ||
| 694 | ); | ||
| 695 | } | ||
| 696 | } | ||
| 697 | } | ||
| 698 | } | ||
| 699 | |||
| 700 | if !new_sub_ids.is_empty() { | ||
| 701 | // Re-acquire lock and update batch with new subscriptions | ||
| 702 | let mut pending = self.pending_sync_index.write().await; | ||
| 703 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | ||
| 704 | if let Some(batch) = | ||
| 705 | batches.iter_mut().find(|b| b.batch_id == batch_id) | ||
| 706 | { | ||
| 707 | batch.outstanding_subs.extend(new_sub_ids.clone()); | ||
| 708 | // Update requested_event_ids to only include missing ones | ||
| 709 | batch.requested_event_ids = | ||
| 710 | Some(missing.iter().cloned().collect()); | ||
| 711 | // Clear received_event_ids for fresh tracking | ||
| 712 | batch.received_event_ids = Some(HashSet::new()); | ||
| 713 | |||
| 714 | tracing::info!( | ||
| 715 | relay = %relay_url_for_retry, | ||
| 716 | batch_id = batch_id, | ||
| 717 | retry_subs = new_sub_ids.len(), | ||
| 718 | missing_events = missing.len(), | ||
| 719 | "Created retry subscriptions for missing negentropy events" | ||
| 720 | ); | ||
| 721 | } | ||
| 722 | } | ||
| 723 | // Early return - batch not complete yet, waiting for retry EOSE | ||
| 724 | return; | ||
| 725 | } else { | ||
| 726 | // Failed to create retry subscriptions, log and continue to confirm | ||
| 727 | // with partial results | ||
| 728 | tracing::error!( | ||
| 729 | relay = %relay_url_for_retry, | ||
| 730 | batch_id = batch_id, | ||
| 731 | missing_count = missing.len(), | ||
| 732 | "Failed to retry missing events - confirming batch with partial results" | ||
| 733 | ); | ||
| 734 | |||
| 735 | // Re-acquire lock to extract the batch | ||
| 736 | let mut pending = self.pending_sync_index.write().await; | ||
| 737 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | ||
| 738 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) | ||
| 739 | { | ||
| 740 | let completed_batch = batches.remove(idx); | ||
| 741 | if batches.is_empty() { | ||
| 742 | pending.remove(&relay_url_for_retry); | ||
| 743 | } | ||
| 744 | drop(pending); | ||
| 745 | self.confirm_batch(&relay_url_for_retry, completed_batch).await; | ||
| 746 | } | ||
| 747 | } | ||
| 748 | return; | ||
| 749 | } | ||
| 750 | } | ||
| 751 | } | ||
| 752 | } | ||
| 753 | |||
| 754 | // 3. Batch complete - extract and remove | ||
| 637 | let completed_batch = batches.remove(batch_idx); | 755 | let completed_batch = batches.remove(batch_idx); |
| 638 | 756 | ||
| 639 | // Clean up empty relay entry | 757 | // Clean up empty relay entry |
| @@ -644,7 +762,7 @@ impl SyncManager { | |||
| 644 | // Drop the pending lock before confirm_batch | 762 | // Drop the pending lock before confirm_batch |
| 645 | drop(pending); | 763 | drop(pending); |
| 646 | 764 | ||
| 647 | // 3. Confirm the batch (moves items to RelayState) | 765 | // 4. Confirm the batch (moves items to RelayState) |
| 648 | self.confirm_batch(relay_url, completed_batch).await; | 766 | self.confirm_batch(relay_url, completed_batch).await; |
| 649 | } | 767 | } |
| 650 | 768 | ||
| @@ -1104,11 +1222,13 @@ impl SyncManager { | |||
| 1104 | } | 1222 | } |
| 1105 | } | 1223 | } |
| 1106 | 1224 | ||
| 1107 | // Track pagination state for this subscription | 1225 | // Track pagination state for this subscription (REQ+EOSE) |
| 1226 | // and received event IDs for negentropy batches | ||
| 1108 | if result == ProcessResult::Saved || result == ProcessResult::Duplicate { | 1227 | if result == ProcessResult::Saved || result == ProcessResult::Duplicate { |
| 1109 | let mut pending = pending_sync_index.write().await; | 1228 | let mut pending = pending_sync_index.write().await; |
| 1110 | if let Some(batches) = pending.get_mut(&relay_url_clone) { | 1229 | if let Some(batches) = pending.get_mut(&relay_url_clone) { |
| 1111 | for batch in batches.iter_mut() { | 1230 | for batch in batches.iter_mut() { |
| 1231 | // Track pagination state (REQ+EOSE path) | ||
| 1112 | if let Some(state) = | 1232 | if let Some(state) = |
| 1113 | batch.pagination_state.get_mut(&subscription_id) | 1233 | batch.pagination_state.get_mut(&subscription_id) |
| 1114 | { | 1234 | { |
| @@ -1122,6 +1242,17 @@ impl SyncManager { | |||
| 1122 | _ => {} | 1242 | _ => {} |
| 1123 | } | 1243 | } |
| 1124 | } | 1244 | } |
| 1245 | |||
| 1246 | // Track received event IDs (negentropy path) | ||
| 1247 | // Only track if this batch has requested_event_ids set | ||
| 1248 | // and the subscription is one we're waiting on | ||
| 1249 | if batch.requested_event_ids.is_some() | ||
| 1250 | && batch.outstanding_subs.contains(&subscription_id) | ||
| 1251 | { | ||
| 1252 | if let Some(ref mut received) = batch.received_event_ids { | ||
| 1253 | received.insert(event.id); | ||
| 1254 | } | ||
| 1255 | } | ||
| 1125 | } | 1256 | } |
| 1126 | } | 1257 | } |
| 1127 | } | 1258 | } |
| @@ -2240,6 +2371,8 @@ impl SyncManager { | |||
| 2240 | outstanding_subs: HashSet::new(), | 2371 | outstanding_subs: HashSet::new(), |
| 2241 | sync_method: SyncMethod::Negentropy, | 2372 | sync_method: SyncMethod::Negentropy, |
| 2242 | pagination_state: HashMap::new(), // Negentropy doesn't use pagination | 2373 | pagination_state: HashMap::new(), // Negentropy doesn't use pagination |
| 2374 | requested_event_ids: None, // Will be set after negentropy diff | ||
| 2375 | received_event_ids: None, // Will be set after negentropy diff | ||
| 2243 | }; | 2376 | }; |
| 2244 | 2377 | ||
| 2245 | // Add to pending_sync_index | 2378 | // Add to pending_sync_index |
| @@ -2395,6 +2528,10 @@ impl SyncManager { | |||
| 2395 | if let Some(relay_batches) = pending.get_mut(relay_url) { | 2528 | if let Some(relay_batches) = pending.get_mut(relay_url) { |
| 2396 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { | 2529 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { |
| 2397 | batch.outstanding_subs.extend(subscription_ids.clone()); | 2530 | batch.outstanding_subs.extend(subscription_ids.clone()); |
| 2531 | // Store requested event IDs for validation after EOSE | ||
| 2532 | batch.requested_event_ids = | ||
| 2533 | Some(all_remote_ids.iter().cloned().collect()); | ||
| 2534 | batch.received_event_ids = Some(HashSet::new()); | ||
| 2398 | } | 2535 | } |
| 2399 | } | 2536 | } |
| 2400 | } | 2537 | } |
| @@ -2403,7 +2540,7 @@ impl SyncManager { | |||
| 2403 | batch_id = batch_id, | 2540 | batch_id = batch_id, |
| 2404 | subscription_ids = subscription_ids.len(), | 2541 | subscription_ids = subscription_ids.len(), |
| 2405 | events = all_remote_ids.len(), | 2542 | events = all_remote_ids.len(), |
| 2406 | "historic_sync (Negentropy) created subscritions to fetch missing events by id, awaiting EOSE" | 2543 | "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE" |
| 2407 | ); | 2544 | ); |
| 2408 | } else { | 2545 | } else { |
| 2409 | // Traditional REQ+EOSE path | 2546 | // Traditional REQ+EOSE path |
| @@ -2471,6 +2608,8 @@ impl SyncManager { | |||
| 2471 | outstanding_subs: subscription_ids, | 2608 | outstanding_subs: subscription_ids, |
| 2472 | sync_method: SyncMethod::ReqEose, | 2609 | sync_method: SyncMethod::ReqEose, |
| 2473 | pagination_state, | 2610 | pagination_state, |
| 2611 | requested_event_ids: None, // Not used for REQ+EOSE | ||
| 2612 | received_event_ids: None, // Not used for REQ+EOSE | ||
| 2474 | }; | 2613 | }; |
| 2475 | 2614 | ||
| 2476 | // Add to pending_sync_index | 2615 | // Add to pending_sync_index |
| @@ -2540,7 +2679,6 @@ impl SyncManager { | |||
| 2540 | #[cfg(test)] | 2679 | #[cfg(test)] |
| 2541 | mod tests { | 2680 | mod tests { |
| 2542 | use super::*; | 2681 | use super::*; |
| 2543 | use nostr_sdk::prelude::*; | ||
| 2544 | 2682 | ||
| 2545 | #[tokio::test] | 2683 | #[tokio::test] |
| 2546 | async fn test_rejected_events_index_tracks_announcements() { | 2684 | async fn test_rejected_events_index_tracks_announcements() { |
| @@ -2607,4 +2745,105 @@ mod tests { | |||
| 2607 | assert!(filtered_ids.contains(&valid_id)); | 2745 | assert!(filtered_ids.contains(&valid_id)); |
| 2608 | assert_eq!(filtered_ids.len(), 1); | 2746 | assert_eq!(filtered_ids.len(), 1); |
| 2609 | } | 2747 | } |
| 2748 | |||
| 2749 | #[test] | ||
| 2750 | fn test_negentropy_missing_event_detection() { | ||
| 2751 | // Simulate scenario where relay returns fewer events than requested | ||
| 2752 | // This tests the core logic for detecting missing events | ||
| 2753 | |||
| 2754 | // Requested 5 events from negentropy diff | ||
| 2755 | let mut requested: HashSet<EventId> = HashSet::new(); | ||
| 2756 | for i in 1u8..=5 { | ||
| 2757 | let id = EventId::from_hex(&format!( | ||
| 2758 | "{:0>64}", | ||
| 2759 | format!("{:x}", i) | ||
| 2760 | )) | ||
| 2761 | .unwrap(); | ||
| 2762 | requested.insert(id); | ||
| 2763 | } | ||
| 2764 | |||
| 2765 | // Only received 3 events (simulating relay limit) | ||
| 2766 | let mut received: HashSet<EventId> = HashSet::new(); | ||
| 2767 | for i in 1u8..=3 { | ||
| 2768 | let id = EventId::from_hex(&format!( | ||
| 2769 | "{:0>64}", | ||
| 2770 | format!("{:x}", i) | ||
| 2771 | )) | ||
| 2772 | .unwrap(); | ||
| 2773 | received.insert(id); | ||
| 2774 | } | ||
| 2775 | |||
| 2776 | // Calculate missing events | ||
| 2777 | let missing: Vec<EventId> = requested.difference(&received).cloned().collect(); | ||
| 2778 | |||
| 2779 | // Should have 2 missing events (IDs 4 and 5) | ||
| 2780 | assert_eq!(missing.len(), 2); | ||
| 2781 | assert_eq!(requested.len(), 5); | ||
| 2782 | assert_eq!(received.len(), 3); | ||
| 2783 | |||
| 2784 | // Verify the specific missing IDs | ||
| 2785 | let id_4 = EventId::from_hex(&format!("{:0>64}", format!("{:x}", 4u8))).unwrap(); | ||
| 2786 | let id_5 = EventId::from_hex(&format!("{:0>64}", format!("{:x}", 5u8))).unwrap(); | ||
| 2787 | assert!(missing.contains(&id_4)); | ||
| 2788 | assert!(missing.contains(&id_5)); | ||
| 2789 | } | ||
| 2790 | |||
| 2791 | #[test] | ||
| 2792 | fn test_negentropy_all_events_received() { | ||
| 2793 | // Simulate scenario where all requested events are received | ||
| 2794 | let mut requested: HashSet<EventId> = HashSet::new(); | ||
| 2795 | for i in 1u8..=3 { | ||
| 2796 | let id = EventId::from_hex(&format!( | ||
| 2797 | "{:0>64}", | ||
| 2798 | format!("{:x}", i) | ||
| 2799 | )) | ||
| 2800 | .unwrap(); | ||
| 2801 | requested.insert(id); | ||
| 2802 | } | ||
| 2803 | |||
| 2804 | // Received all 3 events | ||
| 2805 | let received = requested.clone(); | ||
| 2806 | |||
| 2807 | // Calculate missing events | ||
| 2808 | let missing: Vec<EventId> = requested.difference(&received).cloned().collect(); | ||
| 2809 | |||
| 2810 | // Should have no missing events | ||
| 2811 | assert!(missing.is_empty()); | ||
| 2812 | } | ||
| 2813 | |||
| 2814 | #[test] | ||
| 2815 | fn test_pending_batch_negentropy_fields() { | ||
| 2816 | // Test that PendingBatch properly tracks negentropy-specific fields | ||
| 2817 | let batch = PendingBatch { | ||
| 2818 | batch_id: 1, | ||
| 2819 | items: PendingItems::default(), | ||
| 2820 | outstanding_subs: HashSet::new(), | ||
| 2821 | sync_method: SyncMethod::Negentropy, | ||
| 2822 | pagination_state: HashMap::new(), | ||
| 2823 | requested_event_ids: Some(HashSet::new()), | ||
| 2824 | received_event_ids: Some(HashSet::new()), | ||
| 2825 | }; | ||
| 2826 | |||
| 2827 | assert!(batch.requested_event_ids.is_some()); | ||
| 2828 | assert!(batch.received_event_ids.is_some()); | ||
| 2829 | assert_eq!(batch.sync_method, SyncMethod::Negentropy); | ||
| 2830 | } | ||
| 2831 | |||
| 2832 | #[test] | ||
| 2833 | fn test_pending_batch_req_eose_fields() { | ||
| 2834 | // Test that REQ+EOSE batches don't use negentropy fields | ||
| 2835 | let batch = PendingBatch { | ||
| 2836 | batch_id: 1, | ||
| 2837 | items: PendingItems::default(), | ||
| 2838 | outstanding_subs: HashSet::new(), | ||
| 2839 | sync_method: SyncMethod::ReqEose, | ||
| 2840 | pagination_state: HashMap::new(), | ||
| 2841 | requested_event_ids: None, | ||
| 2842 | received_event_ids: None, | ||
| 2843 | }; | ||
| 2844 | |||
| 2845 | assert!(batch.requested_event_ids.is_none()); | ||
| 2846 | assert!(batch.received_event_ids.is_none()); | ||
| 2847 | assert_eq!(batch.sync_method, SyncMethod::ReqEose); | ||
| 2848 | } | ||
| 2610 | } | 2849 | } |