From e1806540b5d905646b786e21a6060e4498e9aff1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 09:21:55 +0000 Subject: feat(sync): validate negentropy event receipt and retry missing events Add validation that all events requested by ID during negentropy sync are actually received from the relay. When events are missing: - Log detailed information (requested/received/missing counts and IDs) - Create retry subscriptions for missing events (chunked by 300) - Update batch to track only missing events in next round - Only complete batch after all events received or retry fails This handles relays that have limits on ID-based queries (e.g., max 150 events per query) by automatically retrying in smaller chunks. Also excludes purgatory and rejected announcement events from negentropy requests to avoid re-requesting events we know we can't/won't store. Note: Current implementation lacks retry limit - infinite loop protection needed (tracked as future work). --- src/sync/algorithms.rs | 4 + src/sync/mod.rs | 249 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 248 insertions(+), 5 deletions(-) (limited to 'src/sync') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index f4b1f5c..4679986 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -402,6 +402,8 @@ mod tests { outstanding_subs: HashSet::new(), sync_method: SyncMethod::ReqEose, pagination_state: HashMap::new(), + requested_event_ids: None, + received_event_ids: None, }], ); @@ -514,6 +516,8 @@ mod tests { outstanding_subs: HashSet::new(), sync_method: SyncMethod::ReqEose, pagination_state: HashMap::new(), + requested_event_ids: None, + received_event_ids: None, }], ); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ecfd020..d33364f 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -196,6 +196,12 @@ pub struct PendingBatch { /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy) /// Maps subscription ID to its pagination state pub pagination_state: HashMap, + /// Event IDs requested via negentropy ID-based fetch (None for REQ+EOSE) + /// Used to validate that all requested events were received + pub requested_event_ids: Option>, + /// Event IDs actually received for this batch (None for REQ+EOSE) + /// Compared against requested_event_ids to detect missing events + pub received_event_ids: Option>, } /// Items included in a pending batch @@ -633,7 +639,119 @@ impl SyncManager { return; } - // 2. Batch complete - extract and remove + // 2. Batch complete - validate negentropy ID fetches before confirming + // For negentropy batches, check if all requested events were received + if batch.sync_method == SyncMethod::Negentropy { + if let (Some(requested), Some(received)) = + (&batch.requested_event_ids, &batch.received_event_ids) + { + let missing: Vec = + requested.difference(received).cloned().collect(); + + if !missing.is_empty() { + let requested_count = requested.len(); + let received_count = received.len(); + + tracing::warn!( + relay = %relay_url, + batch_id = batch.batch_id, + requested_count = requested_count, + received_count = received_count, + missing_count = missing.len(), + missing_ids = ?missing.iter().map(|id| id.to_hex()).collect::>(), + "Negentropy sync incomplete - relay returned fewer events than requested. \ + This may indicate a relay limit on ID-based queries. \ + Retrying missing events." + ); + + // Create retry subscription for missing events + // Chunk by 300 to avoid overly large filters + let relay_url_for_retry = relay_url.to_string(); + let batch_id = batch.batch_id; + + // Drop the lock before async operations + drop(pending); + + // Create new subscriptions for missing events + let retry_filters: Vec<_> = missing + .chunks(300) + .map(|c| Filter::new().ids(c.iter().copied())) + .collect(); + + let mut new_sub_ids = HashSet::new(); + if let Some(conn) = self.connections.get(&relay_url_for_retry) { + for filter in retry_filters { + match conn.subscribe_filter(filter, true).await { + Ok(sub_id) => { + new_sub_ids.insert(sub_id); + } + Err(e) => { + tracing::error!( + relay = %relay_url_for_retry, + batch_id = batch_id, + error = %e, + "Failed to create retry subscription for missing events" + ); + } + } + } + } + + if !new_sub_ids.is_empty() { + // Re-acquire lock and update batch with new subscriptions + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(&relay_url_for_retry) { + if let Some(batch) = + batches.iter_mut().find(|b| b.batch_id == batch_id) + { + batch.outstanding_subs.extend(new_sub_ids.clone()); + // Update requested_event_ids to only include missing ones + batch.requested_event_ids = + Some(missing.iter().cloned().collect()); + // Clear received_event_ids for fresh tracking + batch.received_event_ids = Some(HashSet::new()); + + tracing::info!( + relay = %relay_url_for_retry, + batch_id = batch_id, + retry_subs = new_sub_ids.len(), + missing_events = missing.len(), + "Created retry subscriptions for missing negentropy events" + ); + } + } + // Early return - batch not complete yet, waiting for retry EOSE + return; + } else { + // Failed to create retry subscriptions, log and continue to confirm + // with partial results + tracing::error!( + relay = %relay_url_for_retry, + batch_id = batch_id, + missing_count = missing.len(), + "Failed to retry missing events - confirming batch with partial results" + ); + + // Re-acquire lock to extract the batch + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(&relay_url_for_retry) { + if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) + { + let completed_batch = batches.remove(idx); + if batches.is_empty() { + pending.remove(&relay_url_for_retry); + } + drop(pending); + self.confirm_batch(&relay_url_for_retry, completed_batch).await; + } + } + return; + } + } + } + } + + // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); // Clean up empty relay entry @@ -644,7 +762,7 @@ impl SyncManager { // Drop the pending lock before confirm_batch drop(pending); - // 3. Confirm the batch (moves items to RelayState) + // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; } @@ -1104,11 +1222,13 @@ impl SyncManager { } } - // Track pagination state for this subscription + // Track pagination state for this subscription (REQ+EOSE) + // and received event IDs for negentropy batches if result == ProcessResult::Saved || result == ProcessResult::Duplicate { let mut pending = pending_sync_index.write().await; if let Some(batches) = pending.get_mut(&relay_url_clone) { for batch in batches.iter_mut() { + // Track pagination state (REQ+EOSE path) if let Some(state) = batch.pagination_state.get_mut(&subscription_id) { @@ -1122,6 +1242,17 @@ impl SyncManager { _ => {} } } + + // Track received event IDs (negentropy path) + // Only track if this batch has requested_event_ids set + // and the subscription is one we're waiting on + if batch.requested_event_ids.is_some() + && batch.outstanding_subs.contains(&subscription_id) + { + if let Some(ref mut received) = batch.received_event_ids { + received.insert(event.id); + } + } } } } @@ -2240,6 +2371,8 @@ impl SyncManager { outstanding_subs: HashSet::new(), sync_method: SyncMethod::Negentropy, pagination_state: HashMap::new(), // Negentropy doesn't use pagination + requested_event_ids: None, // Will be set after negentropy diff + received_event_ids: None, // Will be set after negentropy diff }; // Add to pending_sync_index @@ -2395,6 +2528,10 @@ impl SyncManager { if let Some(relay_batches) = pending.get_mut(relay_url) { if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { batch.outstanding_subs.extend(subscription_ids.clone()); + // Store requested event IDs for validation after EOSE + batch.requested_event_ids = + Some(all_remote_ids.iter().cloned().collect()); + batch.received_event_ids = Some(HashSet::new()); } } } @@ -2403,7 +2540,7 @@ impl SyncManager { batch_id = batch_id, subscription_ids = subscription_ids.len(), events = all_remote_ids.len(), - "historic_sync (Negentropy) created subscritions to fetch missing events by id, awaiting EOSE" + "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE" ); } else { // Traditional REQ+EOSE path @@ -2471,6 +2608,8 @@ impl SyncManager { outstanding_subs: subscription_ids, sync_method: SyncMethod::ReqEose, pagination_state, + requested_event_ids: None, // Not used for REQ+EOSE + received_event_ids: None, // Not used for REQ+EOSE }; // Add to pending_sync_index @@ -2540,7 +2679,6 @@ impl SyncManager { #[cfg(test)] mod tests { use super::*; - use nostr_sdk::prelude::*; #[tokio::test] async fn test_rejected_events_index_tracks_announcements() { @@ -2607,4 +2745,105 @@ mod tests { assert!(filtered_ids.contains(&valid_id)); assert_eq!(filtered_ids.len(), 1); } + + #[test] + fn test_negentropy_missing_event_detection() { + // Simulate scenario where relay returns fewer events than requested + // This tests the core logic for detecting missing events + + // Requested 5 events from negentropy diff + let mut requested: HashSet = HashSet::new(); + for i in 1u8..=5 { + let id = EventId::from_hex(&format!( + "{:0>64}", + format!("{:x}", i) + )) + .unwrap(); + requested.insert(id); + } + + // Only received 3 events (simulating relay limit) + let mut received: HashSet = HashSet::new(); + for i in 1u8..=3 { + let id = EventId::from_hex(&format!( + "{:0>64}", + format!("{:x}", i) + )) + .unwrap(); + received.insert(id); + } + + // Calculate missing events + let missing: Vec = requested.difference(&received).cloned().collect(); + + // Should have 2 missing events (IDs 4 and 5) + assert_eq!(missing.len(), 2); + assert_eq!(requested.len(), 5); + assert_eq!(received.len(), 3); + + // Verify the specific missing IDs + let id_4 = EventId::from_hex(&format!("{:0>64}", format!("{:x}", 4u8))).unwrap(); + let id_5 = EventId::from_hex(&format!("{:0>64}", format!("{:x}", 5u8))).unwrap(); + assert!(missing.contains(&id_4)); + assert!(missing.contains(&id_5)); + } + + #[test] + fn test_negentropy_all_events_received() { + // Simulate scenario where all requested events are received + let mut requested: HashSet = HashSet::new(); + for i in 1u8..=3 { + let id = EventId::from_hex(&format!( + "{:0>64}", + format!("{:x}", i) + )) + .unwrap(); + requested.insert(id); + } + + // Received all 3 events + let received = requested.clone(); + + // Calculate missing events + let missing: Vec = requested.difference(&received).cloned().collect(); + + // Should have no missing events + assert!(missing.is_empty()); + } + + #[test] + fn test_pending_batch_negentropy_fields() { + // Test that PendingBatch properly tracks negentropy-specific fields + let batch = PendingBatch { + batch_id: 1, + items: PendingItems::default(), + outstanding_subs: HashSet::new(), + sync_method: SyncMethod::Negentropy, + pagination_state: HashMap::new(), + requested_event_ids: Some(HashSet::new()), + received_event_ids: Some(HashSet::new()), + }; + + assert!(batch.requested_event_ids.is_some()); + assert!(batch.received_event_ids.is_some()); + assert_eq!(batch.sync_method, SyncMethod::Negentropy); + } + + #[test] + fn test_pending_batch_req_eose_fields() { + // Test that REQ+EOSE batches don't use negentropy fields + let batch = PendingBatch { + batch_id: 1, + items: PendingItems::default(), + outstanding_subs: HashSet::new(), + sync_method: SyncMethod::ReqEose, + pagination_state: HashMap::new(), + requested_event_ids: None, + received_event_ids: None, + }; + + assert!(batch.requested_event_ids.is_none()); + assert!(batch.received_event_ids.is_none()); + assert_eq!(batch.sync_method, SyncMethod::ReqEose); + } } -- cgit v1.2.3