upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 09:21:55 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 09:21:55 +0000
commite1806540b5d905646b786e21a6060e4498e9aff1 (patch)
treef9a9745484772351e5a21ee16600ab064a15bcd5 /src/sync/mod.rs
parent5eb736e1184e313efa65237bf1973dee21afb43f (diff)
feat(sync): validate negentropy event receipt and retry missing events
Add validation that all events requested by ID during negentropy sync are actually received from the relay. When events are missing: - Log detailed information (requested/received/missing counts and IDs) - Create retry subscriptions for missing events (chunked by 300) - Update batch to track only missing events in next round - Only complete batch after all events received or retry fails This handles relays that have limits on ID-based queries (e.g., max 150 events per query) by automatically retrying in smaller chunks. Also excludes purgatory and rejected announcement events from negentropy requests to avoid re-requesting events we know we can't/won't store. Note: Current implementation lacks retry limit - infinite loop protection needed (tracked as future work).
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs249
1 files changed, 244 insertions, 5 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index ecfd020..d33364f 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -196,6 +196,12 @@ pub struct PendingBatch {
196 /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy) 196 /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy)
197 /// Maps subscription ID to its pagination state 197 /// Maps subscription ID to its pagination state
198 pub pagination_state: HashMap<SubscriptionId, PaginationState>, 198 pub pagination_state: HashMap<SubscriptionId, PaginationState>,
199 /// Event IDs requested via negentropy ID-based fetch (None for REQ+EOSE)
200 /// Used to validate that all requested events were received
201 pub requested_event_ids: Option<HashSet<EventId>>,
202 /// Event IDs actually received for this batch (None for REQ+EOSE)
203 /// Compared against requested_event_ids to detect missing events
204 pub received_event_ids: Option<HashSet<EventId>>,
199} 205}
200 206
201/// Items included in a pending batch 207/// Items included in a pending batch
@@ -633,7 +639,119 @@ impl SyncManager {
633 return; 639 return;
634 } 640 }
635 641
636 // 2. Batch complete - extract and remove 642 // 2. Batch complete - validate negentropy ID fetches before confirming
643 // For negentropy batches, check if all requested events were received
644 if batch.sync_method == SyncMethod::Negentropy {
645 if let (Some(requested), Some(received)) =
646 (&batch.requested_event_ids, &batch.received_event_ids)
647 {
648 let missing: Vec<EventId> =
649 requested.difference(received).cloned().collect();
650
651 if !missing.is_empty() {
652 let requested_count = requested.len();
653 let received_count = received.len();
654
655 tracing::warn!(
656 relay = %relay_url,
657 batch_id = batch.batch_id,
658 requested_count = requested_count,
659 received_count = received_count,
660 missing_count = missing.len(),
661 missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
662 "Negentropy sync incomplete - relay returned fewer events than requested. \
663 This may indicate a relay limit on ID-based queries. \
664 Retrying missing events."
665 );
666
667 // Create retry subscription for missing events
668 // Chunk by 300 to avoid overly large filters
669 let relay_url_for_retry = relay_url.to_string();
670 let batch_id = batch.batch_id;
671
672 // Drop the lock before async operations
673 drop(pending);
674
675 // Create new subscriptions for missing events
676 let retry_filters: Vec<_> = missing
677 .chunks(300)
678 .map(|c| Filter::new().ids(c.iter().copied()))
679 .collect();
680
681 let mut new_sub_ids = HashSet::new();
682 if let Some(conn) = self.connections.get(&relay_url_for_retry) {
683 for filter in retry_filters {
684 match conn.subscribe_filter(filter, true).await {
685 Ok(sub_id) => {
686 new_sub_ids.insert(sub_id);
687 }
688 Err(e) => {
689 tracing::error!(
690 relay = %relay_url_for_retry,
691 batch_id = batch_id,
692 error = %e,
693 "Failed to create retry subscription for missing events"
694 );
695 }
696 }
697 }
698 }
699
700 if !new_sub_ids.is_empty() {
701 // Re-acquire lock and update batch with new subscriptions
702 let mut pending = self.pending_sync_index.write().await;
703 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
704 if let Some(batch) =
705 batches.iter_mut().find(|b| b.batch_id == batch_id)
706 {
707 batch.outstanding_subs.extend(new_sub_ids.clone());
708 // Update requested_event_ids to only include missing ones
709 batch.requested_event_ids =
710 Some(missing.iter().cloned().collect());
711 // Clear received_event_ids for fresh tracking
712 batch.received_event_ids = Some(HashSet::new());
713
714 tracing::info!(
715 relay = %relay_url_for_retry,
716 batch_id = batch_id,
717 retry_subs = new_sub_ids.len(),
718 missing_events = missing.len(),
719 "Created retry subscriptions for missing negentropy events"
720 );
721 }
722 }
723 // Early return - batch not complete yet, waiting for retry EOSE
724 return;
725 } else {
726 // Failed to create retry subscriptions, log and continue to confirm
727 // with partial results
728 tracing::error!(
729 relay = %relay_url_for_retry,
730 batch_id = batch_id,
731 missing_count = missing.len(),
732 "Failed to retry missing events - confirming batch with partial results"
733 );
734
735 // Re-acquire lock to extract the batch
736 let mut pending = self.pending_sync_index.write().await;
737 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
738 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id)
739 {
740 let completed_batch = batches.remove(idx);
741 if batches.is_empty() {
742 pending.remove(&relay_url_for_retry);
743 }
744 drop(pending);
745 self.confirm_batch(&relay_url_for_retry, completed_batch).await;
746 }
747 }
748 return;
749 }
750 }
751 }
752 }
753
754 // 3. Batch complete - extract and remove
637 let completed_batch = batches.remove(batch_idx); 755 let completed_batch = batches.remove(batch_idx);
638 756
639 // Clean up empty relay entry 757 // Clean up empty relay entry
@@ -644,7 +762,7 @@ impl SyncManager {
644 // Drop the pending lock before confirm_batch 762 // Drop the pending lock before confirm_batch
645 drop(pending); 763 drop(pending);
646 764
647 // 3. Confirm the batch (moves items to RelayState) 765 // 4. Confirm the batch (moves items to RelayState)
648 self.confirm_batch(relay_url, completed_batch).await; 766 self.confirm_batch(relay_url, completed_batch).await;
649 } 767 }
650 768
@@ -1104,11 +1222,13 @@ impl SyncManager {
1104 } 1222 }
1105 } 1223 }
1106 1224
1107 // Track pagination state for this subscription 1225 // Track pagination state for this subscription (REQ+EOSE)
1226 // and received event IDs for negentropy batches
1108 if result == ProcessResult::Saved || result == ProcessResult::Duplicate { 1227 if result == ProcessResult::Saved || result == ProcessResult::Duplicate {
1109 let mut pending = pending_sync_index.write().await; 1228 let mut pending = pending_sync_index.write().await;
1110 if let Some(batches) = pending.get_mut(&relay_url_clone) { 1229 if let Some(batches) = pending.get_mut(&relay_url_clone) {
1111 for batch in batches.iter_mut() { 1230 for batch in batches.iter_mut() {
1231 // Track pagination state (REQ+EOSE path)
1112 if let Some(state) = 1232 if let Some(state) =
1113 batch.pagination_state.get_mut(&subscription_id) 1233 batch.pagination_state.get_mut(&subscription_id)
1114 { 1234 {
@@ -1122,6 +1242,17 @@ impl SyncManager {
1122 _ => {} 1242 _ => {}
1123 } 1243 }
1124 } 1244 }
1245
1246 // Track received event IDs (negentropy path)
1247 // Only track if this batch has requested_event_ids set
1248 // and the subscription is one we're waiting on
1249 if batch.requested_event_ids.is_some()
1250 && batch.outstanding_subs.contains(&subscription_id)
1251 {
1252 if let Some(ref mut received) = batch.received_event_ids {
1253 received.insert(event.id);
1254 }
1255 }
1125 } 1256 }
1126 } 1257 }
1127 } 1258 }
@@ -2240,6 +2371,8 @@ impl SyncManager {
2240 outstanding_subs: HashSet::new(), 2371 outstanding_subs: HashSet::new(),
2241 sync_method: SyncMethod::Negentropy, 2372 sync_method: SyncMethod::Negentropy,
2242 pagination_state: HashMap::new(), // Negentropy doesn't use pagination 2373 pagination_state: HashMap::new(), // Negentropy doesn't use pagination
2374 requested_event_ids: None, // Will be set after negentropy diff
2375 received_event_ids: None, // Will be set after negentropy diff
2243 }; 2376 };
2244 2377
2245 // Add to pending_sync_index 2378 // Add to pending_sync_index
@@ -2395,6 +2528,10 @@ impl SyncManager {
2395 if let Some(relay_batches) = pending.get_mut(relay_url) { 2528 if let Some(relay_batches) = pending.get_mut(relay_url) {
2396 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { 2529 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) {
2397 batch.outstanding_subs.extend(subscription_ids.clone()); 2530 batch.outstanding_subs.extend(subscription_ids.clone());
2531 // Store requested event IDs for validation after EOSE
2532 batch.requested_event_ids =
2533 Some(all_remote_ids.iter().cloned().collect());
2534 batch.received_event_ids = Some(HashSet::new());
2398 } 2535 }
2399 } 2536 }
2400 } 2537 }
@@ -2403,7 +2540,7 @@ impl SyncManager {
2403 batch_id = batch_id, 2540 batch_id = batch_id,
2404 subscription_ids = subscription_ids.len(), 2541 subscription_ids = subscription_ids.len(),
2405 events = all_remote_ids.len(), 2542 events = all_remote_ids.len(),
2406 "historic_sync (Negentropy) created subscritions to fetch missing events by id, awaiting EOSE" 2543 "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE"
2407 ); 2544 );
2408 } else { 2545 } else {
2409 // Traditional REQ+EOSE path 2546 // Traditional REQ+EOSE path
@@ -2471,6 +2608,8 @@ impl SyncManager {
2471 outstanding_subs: subscription_ids, 2608 outstanding_subs: subscription_ids,
2472 sync_method: SyncMethod::ReqEose, 2609 sync_method: SyncMethod::ReqEose,
2473 pagination_state, 2610 pagination_state,
2611 requested_event_ids: None, // Not used for REQ+EOSE
2612 received_event_ids: None, // Not used for REQ+EOSE
2474 }; 2613 };
2475 2614
2476 // Add to pending_sync_index 2615 // Add to pending_sync_index
@@ -2540,7 +2679,6 @@ impl SyncManager {
2540#[cfg(test)] 2679#[cfg(test)]
2541mod tests { 2680mod tests {
2542 use super::*; 2681 use super::*;
2543 use nostr_sdk::prelude::*;
2544 2682
2545 #[tokio::test] 2683 #[tokio::test]
2546 async fn test_rejected_events_index_tracks_announcements() { 2684 async fn test_rejected_events_index_tracks_announcements() {
@@ -2607,4 +2745,105 @@ mod tests {
2607 assert!(filtered_ids.contains(&valid_id)); 2745 assert!(filtered_ids.contains(&valid_id));
2608 assert_eq!(filtered_ids.len(), 1); 2746 assert_eq!(filtered_ids.len(), 1);
2609 } 2747 }
2748
2749 #[test]
2750 fn test_negentropy_missing_event_detection() {
2751 // Simulate scenario where relay returns fewer events than requested
2752 // This tests the core logic for detecting missing events
2753
2754 // Requested 5 events from negentropy diff
2755 let mut requested: HashSet<EventId> = HashSet::new();
2756 for i in 1u8..=5 {
2757 let id = EventId::from_hex(&format!(
2758 "{:0>64}",
2759 format!("{:x}", i)
2760 ))
2761 .unwrap();
2762 requested.insert(id);
2763 }
2764
2765 // Only received 3 events (simulating relay limit)
2766 let mut received: HashSet<EventId> = HashSet::new();
2767 for i in 1u8..=3 {
2768 let id = EventId::from_hex(&format!(
2769 "{:0>64}",
2770 format!("{:x}", i)
2771 ))
2772 .unwrap();
2773 received.insert(id);
2774 }
2775
2776 // Calculate missing events
2777 let missing: Vec<EventId> = requested.difference(&received).cloned().collect();
2778
2779 // Should have 2 missing events (IDs 4 and 5)
2780 assert_eq!(missing.len(), 2);
2781 assert_eq!(requested.len(), 5);
2782 assert_eq!(received.len(), 3);
2783
2784 // Verify the specific missing IDs
2785 let id_4 = EventId::from_hex(&format!("{:0>64}", format!("{:x}", 4u8))).unwrap();
2786 let id_5 = EventId::from_hex(&format!("{:0>64}", format!("{:x}", 5u8))).unwrap();
2787 assert!(missing.contains(&id_4));
2788 assert!(missing.contains(&id_5));
2789 }
2790
2791 #[test]
2792 fn test_negentropy_all_events_received() {
2793 // Simulate scenario where all requested events are received
2794 let mut requested: HashSet<EventId> = HashSet::new();
2795 for i in 1u8..=3 {
2796 let id = EventId::from_hex(&format!(
2797 "{:0>64}",
2798 format!("{:x}", i)
2799 ))
2800 .unwrap();
2801 requested.insert(id);
2802 }
2803
2804 // Received all 3 events
2805 let received = requested.clone();
2806
2807 // Calculate missing events
2808 let missing: Vec<EventId> = requested.difference(&received).cloned().collect();
2809
2810 // Should have no missing events
2811 assert!(missing.is_empty());
2812 }
2813
2814 #[test]
2815 fn test_pending_batch_negentropy_fields() {
2816 // Test that PendingBatch properly tracks negentropy-specific fields
2817 let batch = PendingBatch {
2818 batch_id: 1,
2819 items: PendingItems::default(),
2820 outstanding_subs: HashSet::new(),
2821 sync_method: SyncMethod::Negentropy,
2822 pagination_state: HashMap::new(),
2823 requested_event_ids: Some(HashSet::new()),
2824 received_event_ids: Some(HashSet::new()),
2825 };
2826
2827 assert!(batch.requested_event_ids.is_some());
2828 assert!(batch.received_event_ids.is_some());
2829 assert_eq!(batch.sync_method, SyncMethod::Negentropy);
2830 }
2831
2832 #[test]
2833 fn test_pending_batch_req_eose_fields() {
2834 // Test that REQ+EOSE batches don't use negentropy fields
2835 let batch = PendingBatch {
2836 batch_id: 1,
2837 items: PendingItems::default(),
2838 outstanding_subs: HashSet::new(),
2839 sync_method: SyncMethod::ReqEose,
2840 pagination_state: HashMap::new(),
2841 requested_event_ids: None,
2842 received_event_ids: None,
2843 };
2844
2845 assert!(batch.requested_event_ids.is_none());
2846 assert!(batch.received_event_ids.is_none());
2847 assert_eq!(batch.sync_method, SyncMethod::ReqEose);
2848 }
2610} 2849}