upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs185
1 files changed, 91 insertions, 94 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index ed3b78c..07527c7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds};
27pub use metrics::SyncMetrics; 27pub use metrics::SyncMetrics;
28 28
29// Re-export rejected index types 29// Re-export rejected index types
30pub use rejected_index::{RejectionReason}; 30pub use rejected_index::RejectionReason;
31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used 31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used
32// Current code still uses the simple HashSet type alias below 32// Current code still uses the simple HashSet type alias below
33 33
@@ -73,7 +73,7 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
73/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. 73/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync.
74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing 74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing
75/// to avoid repeatedly fetching and rejecting the same events. 75/// to avoid repeatedly fetching and rejecting the same events.
76/// 76///
77/// Uses the two-tier RejectedEventsIndex from rejected_index.rs: 77/// Uses the two-tier RejectedEventsIndex from rejected_index.rs:
78/// - Hot cache: Full events for 2 minutes (enables immediate re-processing) 78/// - Hot cache: Full events for 2 minutes (enables immediate re-processing)
79/// - Cold index: Metadata for 7 days (prevents repeated downloads) 79/// - Cold index: Metadata for 7 days (prevents repeated downloads)
@@ -113,7 +113,9 @@ impl ConnectionStatus {
113 pub fn is_live_sync_active(&self) -> bool { 113 pub fn is_live_sync_active(&self) -> bool {
114 matches!( 114 matches!(
115 self, 115 self,
116 ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedHistoricSyncFailures 116 ConnectionStatus::Syncing
117 | ConnectionStatus::Connected
118 | ConnectionStatus::ConnectedHistoricSyncFailures
117 ) 119 )
118 } 120 }
119} 121}
@@ -384,9 +386,7 @@ async fn run_rejected_index_cleanup(
384 let hot_cache_interval = Duration::from_secs(60); 386 let hot_cache_interval = Duration::from_secs(60);
385 let cold_index_interval = Duration::from_secs(86400); // 24 hours 387 let cold_index_interval = Duration::from_secs(86400); // 24 hours
386 388
387 tracing::info!( 389 tracing::info!("Rejected index cleanup started (hot cache: 60s, cold index: daily)");
388 "Rejected index cleanup started (hot cache: 60s, cold index: daily)"
389 );
390 390
391 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); 391 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval);
392 let mut cold_index_timer = tokio::time::interval(cold_index_interval); 392 let mut cold_index_timer = tokio::time::interval(cold_index_interval);
@@ -399,7 +399,7 @@ async fn run_rejected_index_cleanup(
399 tokio::select! { 399 tokio::select! {
400 _ = hot_cache_timer.tick() => { 400 _ = hot_cache_timer.tick() => {
401 let manager = sync_manager.lock().await; 401 let manager = sync_manager.lock().await;
402 402
403 // Clean up announcements index 403 // Clean up announcements index
404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); 404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired();
405 if hot_expired > 0 { 405 if hot_expired > 0 {
@@ -408,7 +408,7 @@ async fn run_rejected_index_cleanup(
408 hot_expired 408 hot_expired
409 ); 409 );
410 } 410 }
411 411
412 // Clean up states index 412 // Clean up states index
413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); 413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired();
414 if states_hot_expired > 0 { 414 if states_hot_expired > 0 {
@@ -420,7 +420,7 @@ async fn run_rejected_index_cleanup(
420 } 420 }
421 _ = cold_index_timer.tick() => { 421 _ = cold_index_timer.tick() => {
422 let manager = sync_manager.lock().await; 422 let manager = sync_manager.lock().await;
423 423
424 // Clean up announcements index 424 // Clean up announcements index
425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); 425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired();
426 if cold_expired > 0 { 426 if cold_expired > 0 {
@@ -429,7 +429,7 @@ async fn run_rejected_index_cleanup(
429 cold_expired 429 cold_expired
430 ); 430 );
431 } 431 }
432 432
433 // Clean up states index 433 // Clean up states index
434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); 434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired();
435 if states_cold_expired > 0 { 435 if states_cold_expired > 0 {
@@ -799,8 +799,7 @@ impl SyncManager {
799 if let (Some(requested), Some(received)) = 799 if let (Some(requested), Some(received)) =
800 (&batch.requested_event_ids, &batch.received_event_ids) 800 (&batch.requested_event_ids, &batch.received_event_ids)
801 { 801 {
802 let missing: Vec<EventId> = 802 let missing: Vec<EventId> = requested.difference(received).cloned().collect();
803 requested.difference(received).cloned().collect();
804 803
805 if !missing.is_empty() { 804 if !missing.is_empty() {
806 let requested_count = requested.len(); 805 let requested_count = requested.len();
@@ -884,13 +883,11 @@ impl SyncManager {
884 // Re-acquire lock and update batch with new subscriptions 883 // Re-acquire lock and update batch with new subscriptions
885 let mut pending = self.pending_sync_index.write().await; 884 let mut pending = self.pending_sync_index.write().await;
886 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 885 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
887 if let Some(batch) = 886 if let Some(batch) = batches.iter_mut().find(|b| b.batch_id == batch_id)
888 batches.iter_mut().find(|b| b.batch_id == batch_id)
889 { 887 {
890 batch.outstanding_subs.extend(new_sub_ids.clone()); 888 batch.outstanding_subs.extend(new_sub_ids.clone());
891 // Update requested_event_ids to only include missing ones 889 // Update requested_event_ids to only include missing ones
892 batch.requested_event_ids = 890 batch.requested_event_ids = Some(missing.iter().cloned().collect());
893 Some(missing.iter().cloned().collect());
894 // Clear received_event_ids for fresh tracking 891 // Clear received_event_ids for fresh tracking
895 batch.received_event_ids = Some(HashSet::new()); 892 batch.received_event_ids = Some(HashSet::new());
896 // Increment retry counter 893 // Increment retry counter
@@ -921,14 +918,14 @@ impl SyncManager {
921 // Re-acquire lock to extract the batch 918 // Re-acquire lock to extract the batch
922 let mut pending = self.pending_sync_index.write().await; 919 let mut pending = self.pending_sync_index.write().await;
923 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 920 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
924 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) 921 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
925 {
926 let completed_batch = batches.remove(idx); 922 let completed_batch = batches.remove(idx);
927 if batches.is_empty() { 923 if batches.is_empty() {
928 pending.remove(&relay_url_for_retry); 924 pending.remove(&relay_url_for_retry);
929 } 925 }
930 drop(pending); 926 drop(pending);
931 self.confirm_batch(&relay_url_for_retry, completed_batch).await; 927 self.confirm_batch(&relay_url_for_retry, completed_batch)
928 .await;
932 } 929 }
933 } 930 }
934 return; 931 return;
@@ -1023,17 +1020,17 @@ impl SyncManager {
1023 "Batch completed but no RelayState found for relay" 1020 "Batch completed but no RelayState found for relay"
1024 ); 1021 );
1025 } 1022 }
1026 1023
1027 // Release lock before checking if historic sync is complete 1024 // Release lock before checking if historic sync is complete
1028 drop(relay_index); 1025 drop(relay_index);
1029 1026
1030 // Spawn background task to check if historic sync is complete 1027 // Spawn background task to check if historic sync is complete
1031 // This avoids blocking the confirm_batch flow for 6 seconds 1028 // This avoids blocking the confirm_batch flow for 6 seconds
1032 let relay_url = relay_url.to_string(); 1029 let relay_url = relay_url.to_string();
1033 let pending_index = self.pending_sync_index.clone(); 1030 let pending_index = self.pending_sync_index.clone();
1034 let relay_index = self.relay_sync_index.clone(); 1031 let relay_index = self.relay_sync_index.clone();
1035 let metrics = self.metrics.clone(); 1032 let metrics = self.metrics.clone();
1036 1033
1037 tokio::spawn(async move { 1034 tokio::spawn(async move {
1038 Self::check_and_complete_historic_sync_impl( 1035 Self::check_and_complete_historic_sync_impl(
1039 &relay_url, 1036 &relay_url,
@@ -1073,29 +1070,33 @@ impl SyncManager {
1073 // First check: Are there any pending batches? 1070 // First check: Are there any pending batches?
1074 let has_pending = { 1071 let has_pending = {
1075 let pending = pending_index.read().await; 1072 let pending = pending_index.read().await;
1076 pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) 1073 pending
1074 .get(relay_url)
1075 .is_some_and(|batches| !batches.is_empty())
1077 }; 1076 };
1078 1077
1079 if has_pending { 1078 if has_pending {
1080 // Still syncing, don't transition yet 1079 // Still syncing, don't transition yet
1081 return; 1080 return;
1082 } 1081 }
1083 1082
1084 // Wait for self-subscriber batch window + buffer to catch any in-flight events 1083 // Wait for self-subscriber batch window + buffer to catch any in-flight events
1085 // that might create new Layer 2/3 filters 1084 // that might create new Layer 2/3 filters
1086 tokio::time::sleep(Duration::from_millis(6000)).await; 1085 tokio::time::sleep(Duration::from_millis(6000)).await;
1087 1086
1088 // Second check: Are there still no pending batches? 1087 // Second check: Are there still no pending batches?
1089 let has_pending = { 1088 let has_pending = {
1090 let pending = pending_index.read().await; 1089 let pending = pending_index.read().await;
1091 pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) 1090 pending
1091 .get(relay_url)
1092 .is_some_and(|batches| !batches.is_empty())
1092 }; 1093 };
1093 1094
1094 if has_pending { 1095 if has_pending {
1095 // New batches appeared during the wait - still syncing 1096 // New batches appeared during the wait - still syncing
1096 return; 1097 return;
1097 } 1098 }
1098 1099
1099 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded 1100 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded
1100 let mut relay_index_guard = relay_index.write().await; 1101 let mut relay_index_guard = relay_index.write().await;
1101 if let Some(state) = relay_index_guard.get_mut(relay_url) { 1102 if let Some(state) = relay_index_guard.get_mut(relay_url) {
@@ -1106,11 +1107,11 @@ impl SyncManager {
1106 } else { 1107 } else {
1107 ConnectionStatus::Connected 1108 ConnectionStatus::Connected
1108 }; 1109 };
1109 1110
1110 state.connection_status = new_status; 1111 state.connection_status = new_status;
1111 state.historic_sync_completed = true; 1112 state.historic_sync_completed = true;
1112 state.historic_sync_completed_at = Some(Timestamp::now()); 1113 state.historic_sync_completed_at = Some(Timestamp::now());
1113 1114
1114 tracing::info!( 1115 tracing::info!(
1115 relay = %relay_url, 1116 relay = %relay_url,
1116 repos_synced = state.repos.len(), 1117 repos_synced = state.repos.len(),
@@ -1120,7 +1121,7 @@ impl SyncManager {
1120 "Historic sync complete - transitioned to {} status", 1121 "Historic sync complete - transitioned to {} status",
1121 if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } 1122 if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" }
1122 ); 1123 );
1123 1124
1124 // Update metrics 1125 // Update metrics
1125 if let Some(ref metrics) = metrics { 1126 if let Some(ref metrics) = metrics {
1126 metrics.record_connection_status(relay_url, new_status); 1127 metrics.record_connection_status(relay_url, new_status);
@@ -1362,8 +1363,8 @@ impl SyncManager {
1362 ); 1363 );
1363 return; 1364 return;
1364 } 1365 }
1365 Some(ConnectionStatus::Syncing) 1366 Some(ConnectionStatus::Syncing)
1366 | Some(ConnectionStatus::Connected) 1367 | Some(ConnectionStatus::Connected)
1367 | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { 1368 | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => {
1368 // Continue to subscribe - live sync is active, can accept new filters 1369 // Continue to subscribe - live sync is active, can accept new filters
1369 } 1370 }
@@ -1468,7 +1469,8 @@ impl SyncManager {
1468 match relay_event { 1469 match relay_event {
1469 RelayEvent::Event(event, subscription_id) => { 1470 RelayEvent::Event(event, subscription_id) => {
1470 // Skip events we've already rejected (announcements only) 1471 // Skip events we've already rejected (announcements only)
1471 if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState) 1472 if (event.kind == Kind::GitRepoAnnouncement
1473 || event.kind == Kind::RepoState)
1472 && rejected_events_index.contains(&event.id) 1474 && rejected_events_index.contains(&event.id)
1473 { 1475 {
1474 tracing::trace!( 1476 tracing::trace!(
@@ -1479,7 +1481,7 @@ impl SyncManager {
1479 ); 1481 );
1480 continue; 1482 continue;
1481 } 1483 }
1482 1484
1483 let result = Self::process_event_static( 1485 let result = Self::process_event_static(
1484 &event, 1486 &event,
1485 &relay_url_clone, 1487 &relay_url_clone,
@@ -1863,11 +1865,16 @@ impl SyncManager {
1863 // Create RelayConnection if not exists 1865 // Create RelayConnection if not exists
1864 if !self.connections.contains_key(&relay_url) { 1866 if !self.connections.contains_key(&relay_url) {
1865 // Get relay owner keys for NIP-42 authentication 1867 // Get relay owner keys for NIP-42 authentication
1866 let keys = self.config.relay_owner_keys() 1868 let keys = self
1869 .config
1870 .relay_owner_keys()
1867 .expect("relay_owner_keys should be available"); 1871 .expect("relay_owner_keys should be available");
1868 1872
1869 let connection = 1873 let connection = RelayConnection::new_with_database(
1870 RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database), keys); 1874 relay_url.clone(),
1875 Arc::clone(&self.database),
1876 keys,
1877 );
1871 self.connections.insert(relay_url.clone(), connection); 1878 self.connections.insert(relay_url.clone(), connection);
1872 tracing::debug!(relay = %relay_url, "Registered new relay connection"); 1879 tracing::debug!(relay = %relay_url, "Registered new relay connection");
1873 } 1880 }
@@ -1919,7 +1926,7 @@ impl SyncManager {
1919 state.connection_status = ConnectionStatus::Connecting; 1926 state.connection_status = ConnectionStatus::Connecting;
1920 } 1927 }
1921 } 1928 }
1922 1929
1923 // Update metrics to show connecting status 1930 // Update metrics to show connecting status
1924 if let Some(ref metrics) = self.metrics { 1931 if let Some(ref metrics) = self.metrics {
1925 metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); 1932 metrics.record_connection_status(relay_url, ConnectionStatus::Connecting);
@@ -1974,7 +1981,8 @@ impl SyncManager {
1974 if let Some(ref metrics) = self.metrics { 1981 if let Some(ref metrics) = self.metrics {
1975 metrics.record_connection_attempt(relay_url, false); 1982 metrics.record_connection_attempt(relay_url, false);
1976 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); 1983 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
1977 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 1984 metrics
1985 .record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1978 } 1986 }
1979 } 1987 }
1980 } 1988 }
@@ -2172,7 +2180,7 @@ impl SyncManager {
2172 // immediately and should now pass validation. 2180 // immediately and should now pass validation.
2173 if event.kind == Kind::GitRepoAnnouncement { 2181 if event.kind == Kind::GitRepoAnnouncement {
2174 use crate::nostr::events::RepositoryAnnouncement; 2182 use crate::nostr::events::RepositoryAnnouncement;
2175 2183
2176 match RepositoryAnnouncement::from_event(event.clone()) { 2184 match RepositoryAnnouncement::from_event(event.clone()) {
2177 Ok(announcement) => { 2185 Ok(announcement) => {
2178 if !announcement.maintainers.is_empty() { 2186 if !announcement.maintainers.is_empty() {
@@ -2219,15 +2227,16 @@ impl SyncManager {
2219 // 2. Second attempt uses maintainer exception (different code path) 2227 // 2. Second attempt uses maintainer exception (different code path)
2220 // 3. If second attempt fails, stays in cold index only (no third attempt) 2228 // 3. If second attempt fails, stays in cold index only (no third attempt)
2221 // Use Box::pin to avoid infinitely sized future 2229 // Use Box::pin to avoid infinitely sized future
2222 let reprocess_result = Box::pin(Self::process_event_static( 2230 let reprocess_result =
2223 &maintainer_event, 2231 Box::pin(Self::process_event_static(
2224 relay_url, 2232 &maintainer_event,
2225 database, 2233 relay_url,
2226 write_policy, 2234 database,
2227 local_relay, 2235 write_policy,
2228 rejected_events_index, 2236 local_relay,
2229 )) 2237 rejected_events_index,
2230 .await; 2238 ))
2239 .await;
2231 2240
2232 match reprocess_result { 2241 match reprocess_result {
2233 ProcessResult::Saved => { 2242 ProcessResult::Saved => {
@@ -2275,7 +2284,7 @@ impl SyncManager {
2275 ); 2284 );
2276 } 2285 }
2277 } 2286 }
2278 2287
2279 // When a repository announcement is accepted, re-process any state events 2288 // When a repository announcement is accepted, re-process any state events
2280 // that were previously rejected because no announcement existed. 2289 // that were previously rejected because no announcement existed.
2281 // This handles the race condition where state events arrive before their 2290 // This handles the race condition where state events arrive before their
@@ -2284,7 +2293,10 @@ impl SyncManager {
2284 Ok(announcement) => { 2293 Ok(announcement) => {
2285 // Get the announcement author's state events that were rejected 2294 // Get the announcement author's state events that were rejected
2286 let (removed, hot_events) = rejected_events_index 2295 let (removed, hot_events) = rejected_events_index
2287 .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier); 2296 .invalidate_and_get_state_events(
2297 &event.pubkey,
2298 &announcement.identifier,
2299 );
2288 2300
2289 if removed > 0 { 2301 if removed > 0 {
2290 tracing::info!( 2302 tracing::info!(
@@ -2357,7 +2369,7 @@ impl SyncManager {
2357 } 2369 }
2358 } 2370 }
2359 } 2371 }
2360 2372
2361 // When a state event is accepted (git data arrived), re-process any other 2373 // When a state event is accepted (git data arrived), re-process any other
2362 // rejected state events for the same repository. This handles the case where 2374 // rejected state events for the same repository. This handles the case where
2363 // multiple state events arrive but only one has git data initially. 2375 // multiple state events arrive but only one has git data initially.
@@ -2454,7 +2466,7 @@ impl SyncManager {
2454 reason = %message, 2466 reason = %message,
2455 "Event rejected by write policy" 2467 "Event rejected by write policy"
2456 ); 2468 );
2457 2469
2458 // Track rejected announcement and state events to avoid re-fetching them 2470 // Track rejected announcement and state events to avoid re-fetching them
2459 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { 2471 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
2460 // Extract identifier from 'd' tag 2472 // Extract identifier from 'd' tag
@@ -2465,12 +2477,14 @@ impl SyncManager {
2465 .and_then(|t| t.content()) 2477 .and_then(|t| t.content())
2466 { 2478 {
2467 // Determine rejection reason based on message 2479 // Determine rejection reason based on message
2468 let reason = if message.contains("doesn't list this service") 2480 let reason = if message.contains("doesn't list this service")
2469 || message.contains("Announcement must list service") { 2481 || message.contains("Announcement must list service")
2482 {
2470 rejected_index::RejectionReason::DoesNotListService 2483 rejected_index::RejectionReason::DoesNotListService
2471 } else if message.contains("maintainer") 2484 } else if message.contains("maintainer")
2472 || message.contains("no announcement exists") 2485 || message.contains("no announcement exists")
2473 || message.contains("not authorized") { 2486 || message.contains("not authorized")
2487 {
2474 rejected_index::RejectionReason::MaintainerNotYetValid 2488 rejected_index::RejectionReason::MaintainerNotYetValid
2475 } else { 2489 } else {
2476 rejected_index::RejectionReason::Other 2490 rejected_index::RejectionReason::Other
@@ -2512,7 +2526,7 @@ impl SyncManager {
2512 ); 2526 );
2513 } 2527 }
2514 } 2528 }
2515 2529
2516 ProcessResult::Rejected 2530 ProcessResult::Rejected
2517 } 2531 }
2518 } 2532 }
@@ -3042,7 +3056,8 @@ impl SyncManager {
3042 // Get event IDs to exclude: purgatory + rejected announcements 3056 // Get event IDs to exclude: purgatory + rejected announcements
3043 let purgatory_ids = self.purgatory.event_ids(); 3057 let purgatory_ids = self.purgatory.event_ids();
3044 let rejected_ids = self.rejected_events_index.get_all_event_ids(); 3058 let rejected_ids = self.rejected_events_index.get_all_event_ids();
3045 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); 3059 let excluded_ids: HashSet<EventId> =
3060 purgatory_ids.union(&rejected_ids).cloned().collect();
3046 3061
3047 for (idx, result) in diff_results { 3062 for (idx, result) in diff_results {
3048 match result { 3063 match result {
@@ -3166,8 +3181,7 @@ impl SyncManager {
3166 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { 3181 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) {
3167 batch.outstanding_subs.extend(subscription_ids.clone()); 3182 batch.outstanding_subs.extend(subscription_ids.clone());
3168 // Store requested event IDs for validation after EOSE 3183 // Store requested event IDs for validation after EOSE
3169 batch.requested_event_ids = 3184 batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect());
3170 Some(all_remote_ids.iter().cloned().collect());
3171 batch.received_event_ids = Some(HashSet::new()); 3185 batch.received_event_ids = Some(HashSet::new());
3172 } 3186 }
3173 } 3187 }
@@ -3359,20 +3373,16 @@ mod tests {
3359 async fn test_rejected_events_excluded_from_negentropy() { 3373 async fn test_rejected_events_excluded_from_negentropy() {
3360 // Create indices 3374 // Create indices
3361 let purgatory_ids: HashSet<EventId> = HashSet::new(); 3375 let purgatory_ids: HashSet<EventId> = HashSet::new();
3362 let rejected_index = RejectedEventsIndex::new( 3376 let rejected_index =
3363 Duration::from_secs(120), 3377 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
3364 Duration::from_secs(604800),
3365 );
3366 3378
3367 // Create test event IDs 3379 // Create test event IDs
3368 let _rejected_id = EventId::from_hex( 3380 let _rejected_id =
3369 "0000000000000000000000000000000000000000000000000000000000000001", 3381 EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000001")
3370 ) 3382 .unwrap();
3371 .unwrap(); 3383 let valid_id =
3372 let valid_id = EventId::from_hex( 3384 EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000002")
3373 "0000000000000000000000000000000000000000000000000000000000000002", 3385 .unwrap();
3374 )
3375 .unwrap();
3376 3386
3377 // Add rejected event to index 3387 // Add rejected event to index
3378 let keys = Keys::generate(); 3388 let keys = Keys::generate();
@@ -3383,7 +3393,7 @@ mod tests {
3383 )) 3393 ))
3384 .sign_with_keys(&keys) 3394 .sign_with_keys(&keys)
3385 .unwrap(); 3395 .unwrap();
3386 3396
3387 // Override the event ID for testing (we need a specific ID) 3397 // Override the event ID for testing (we need a specific ID)
3388 // Since we can't override the ID, let's use the actual event ID 3398 // Since we can't override the ID, let's use the actual event ID
3389 let rejected_id = rejected_event.id; 3399 let rejected_id = rejected_event.id;
@@ -3403,8 +3413,7 @@ mod tests {
3403 remote_ids.insert(valid_id); 3413 remote_ids.insert(valid_id);
3404 3414
3405 // Exclude rejected and purgatory events 3415 // Exclude rejected and purgatory events
3406 let excluded_ids: HashSet<EventId> = 3416 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect();
3407 purgatory_ids.union(&rejected_ids).cloned().collect();
3408 let filtered_ids: HashSet<EventId> = 3417 let filtered_ids: HashSet<EventId> =
3409 remote_ids.difference(&excluded_ids).cloned().collect(); 3418 remote_ids.difference(&excluded_ids).cloned().collect();
3410 3419
@@ -3422,22 +3431,14 @@ mod tests {
3422 // Requested 5 events from negentropy diff 3431 // Requested 5 events from negentropy diff
3423 let mut requested: HashSet<EventId> = HashSet::new(); 3432 let mut requested: HashSet<EventId> = HashSet::new();
3424 for i in 1u8..=5 { 3433 for i in 1u8..=5 {
3425 let id = EventId::from_hex(&format!( 3434 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3426 "{:0>64}",
3427 format!("{:x}", i)
3428 ))
3429 .unwrap();
3430 requested.insert(id); 3435 requested.insert(id);
3431 } 3436 }
3432 3437
3433 // Only received 3 events (simulating relay limit) 3438 // Only received 3 events (simulating relay limit)
3434 let mut received: HashSet<EventId> = HashSet::new(); 3439 let mut received: HashSet<EventId> = HashSet::new();
3435 for i in 1u8..=3 { 3440 for i in 1u8..=3 {
3436 let id = EventId::from_hex(&format!( 3441 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3437 "{:0>64}",
3438 format!("{:x}", i)
3439 ))
3440 .unwrap();
3441 received.insert(id); 3442 received.insert(id);
3442 } 3443 }
3443 3444
@@ -3461,11 +3462,7 @@ mod tests {
3461 // Simulate scenario where all requested events are received 3462 // Simulate scenario where all requested events are received
3462 let mut requested: HashSet<EventId> = HashSet::new(); 3463 let mut requested: HashSet<EventId> = HashSet::new();
3463 for i in 1u8..=3 { 3464 for i in 1u8..=3 {
3464 let id = EventId::from_hex(&format!( 3465 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3465 "{:0>64}",
3466 format!("{:x}", i)
3467 ))
3468 .unwrap();
3469 requested.insert(id); 3466 requested.insert(id);
3470 } 3467 }
3471 3468