diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/metrics.rs | 37 | ||||
| -rw-r--r-- | src/sync/mod.rs | 185 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 57 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 30 |
4 files changed, 156 insertions, 153 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 7d6d42d..2ed983e 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -156,19 +156,25 @@ impl SyncMetrics { | |||
| 156 | "ngit_sync_rejected_announcements_hot_cache_hits_total", | 156 | "ngit_sync_rejected_announcements_hot_cache_hits_total", |
| 157 | "Total hot cache hits (events re-processed from cache)", | 157 | "Total hot cache hits (events re-processed from cache)", |
| 158 | ))?; | 158 | ))?; |
| 159 | registry.register(Box::new(rejected_announcements_hot_cache_hits_total.clone()))?; | 159 | registry.register(Box::new( |
| 160 | rejected_announcements_hot_cache_hits_total.clone(), | ||
| 161 | ))?; | ||
| 160 | 162 | ||
| 161 | let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new( | 163 | let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new( |
| 162 | "ngit_sync_rejected_announcements_hot_cache_misses_total", | 164 | "ngit_sync_rejected_announcements_hot_cache_misses_total", |
| 163 | "Total hot cache misses (events not in cache when invalidated)", | 165 | "Total hot cache misses (events not in cache when invalidated)", |
| 164 | ))?; | 166 | ))?; |
| 165 | registry.register(Box::new(rejected_announcements_hot_cache_misses_total.clone()))?; | 167 | registry.register(Box::new( |
| 168 | rejected_announcements_hot_cache_misses_total.clone(), | ||
| 169 | ))?; | ||
| 166 | 170 | ||
| 167 | let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new( | 171 | let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new( |
| 168 | "ngit_sync_rejected_announcements_hot_cache_expired_total", | 172 | "ngit_sync_rejected_announcements_hot_cache_expired_total", |
| 169 | "Total expired entries removed from hot cache", | 173 | "Total expired entries removed from hot cache", |
| 170 | ))?; | 174 | ))?; |
| 171 | registry.register(Box::new(rejected_announcements_hot_cache_expired_total.clone()))?; | 175 | registry.register(Box::new( |
| 176 | rejected_announcements_hot_cache_expired_total.clone(), | ||
| 177 | ))?; | ||
| 172 | 178 | ||
| 173 | let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new( | 179 | let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new( |
| 174 | "ngit_sync_rejected_announcements_cold_index_current", | 180 | "ngit_sync_rejected_announcements_cold_index_current", |
| @@ -180,7 +186,9 @@ impl SyncMetrics { | |||
| 180 | "ngit_sync_rejected_announcements_cold_index_expired_total", | 186 | "ngit_sync_rejected_announcements_cold_index_expired_total", |
| 181 | "Total expired entries removed from cold index", | 187 | "Total expired entries removed from cold index", |
| 182 | ))?; | 188 | ))?; |
| 183 | registry.register(Box::new(rejected_announcements_cold_index_expired_total.clone()))?; | 189 | registry.register(Box::new( |
| 190 | rejected_announcements_cold_index_expired_total.clone(), | ||
| 191 | ))?; | ||
| 184 | 192 | ||
| 185 | let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new( | 193 | let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new( |
| 186 | "ngit_sync_rejected_announcements_invalidated_total", | 194 | "ngit_sync_rejected_announcements_invalidated_total", |
| @@ -430,7 +438,8 @@ impl SyncMetrics { | |||
| 430 | 438 | ||
| 431 | /// Update hot cache current size gauge. | 439 | /// Update hot cache current size gauge. |
| 432 | pub fn update_hot_cache_size(&self, size: usize) { | 440 | pub fn update_hot_cache_size(&self, size: usize) { |
| 433 | self.rejected_announcements_hot_cache_current.set(size as i64); | 441 | self.rejected_announcements_hot_cache_current |
| 442 | .set(size as i64); | ||
| 434 | } | 443 | } |
| 435 | 444 | ||
| 436 | /// Record hot cache hit (event re-processed from cache). | 445 | /// Record hot cache hit (event re-processed from cache). |
| @@ -445,22 +454,26 @@ impl SyncMetrics { | |||
| 445 | 454 | ||
| 446 | /// Record hot cache expired entries. | 455 | /// Record hot cache expired entries. |
| 447 | pub fn record_hot_cache_expired(&self, count: usize) { | 456 | pub fn record_hot_cache_expired(&self, count: usize) { |
| 448 | self.rejected_announcements_hot_cache_expired_total.inc_by(count as u64); | 457 | self.rejected_announcements_hot_cache_expired_total |
| 458 | .inc_by(count as u64); | ||
| 449 | } | 459 | } |
| 450 | 460 | ||
| 451 | /// Update cold index current size gauge. | 461 | /// Update cold index current size gauge. |
| 452 | pub fn update_cold_index_size(&self, size: usize) { | 462 | pub fn update_cold_index_size(&self, size: usize) { |
| 453 | self.rejected_announcements_cold_index_current.set(size as i64); | 463 | self.rejected_announcements_cold_index_current |
| 464 | .set(size as i64); | ||
| 454 | } | 465 | } |
| 455 | 466 | ||
| 456 | /// Record cold index expired entries. | 467 | /// Record cold index expired entries. |
| 457 | pub fn record_cold_index_expired(&self, count: usize) { | 468 | pub fn record_cold_index_expired(&self, count: usize) { |
| 458 | self.rejected_announcements_cold_index_expired_total.inc_by(count as u64); | 469 | self.rejected_announcements_cold_index_expired_total |
| 470 | .inc_by(count as u64); | ||
| 459 | } | 471 | } |
| 460 | 472 | ||
| 461 | /// Record invalidation (maintainer announcement invalidated). | 473 | /// Record invalidation (maintainer announcement invalidated). |
| 462 | pub fn record_invalidation(&self, count: usize) { | 474 | pub fn record_invalidation(&self, count: usize) { |
| 463 | self.rejected_announcements_invalidated_total.inc_by(count as u64); | 475 | self.rejected_announcements_invalidated_total |
| 476 | .inc_by(count as u64); | ||
| 464 | } | 477 | } |
| 465 | 478 | ||
| 466 | // === Rejected States Recording Methods === | 479 | // === Rejected States Recording Methods === |
| @@ -482,7 +495,8 @@ impl SyncMetrics { | |||
| 482 | 495 | ||
| 483 | /// Record state event hot cache expired entries. | 496 | /// Record state event hot cache expired entries. |
| 484 | pub fn record_states_hot_cache_expired(&self, count: usize) { | 497 | pub fn record_states_hot_cache_expired(&self, count: usize) { |
| 485 | self.rejected_states_hot_cache_expired_total.inc_by(count as u64); | 498 | self.rejected_states_hot_cache_expired_total |
| 499 | .inc_by(count as u64); | ||
| 486 | } | 500 | } |
| 487 | 501 | ||
| 488 | /// Update state events cold index current size gauge. | 502 | /// Update state events cold index current size gauge. |
| @@ -492,7 +506,8 @@ impl SyncMetrics { | |||
| 492 | 506 | ||
| 493 | /// Record state event cold index expired entries. | 507 | /// Record state event cold index expired entries. |
| 494 | pub fn record_states_cold_index_expired(&self, count: usize) { | 508 | pub fn record_states_cold_index_expired(&self, count: usize) { |
| 495 | self.rejected_states_cold_index_expired_total.inc_by(count as u64); | 509 | self.rejected_states_cold_index_expired_total |
| 510 | .inc_by(count as u64); | ||
| 496 | } | 511 | } |
| 497 | 512 | ||
| 498 | /// Record state event invalidation. | 513 | /// Record state event invalidation. |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed3b78c..07527c7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; | |||
| 27 | pub use metrics::SyncMetrics; | 27 | pub use metrics::SyncMetrics; |
| 28 | 28 | ||
| 29 | // Re-export rejected index types | 29 | // Re-export rejected index types |
| 30 | pub use rejected_index::{RejectionReason}; | 30 | pub use rejected_index::RejectionReason; |
| 31 | // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used | 31 | // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used |
| 32 | // Current code still uses the simple HashSet type alias below | 32 | // Current code still uses the simple HashSet type alias below |
| 33 | 33 | ||
| @@ -73,7 +73,7 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | |||
| 73 | /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. | 73 | /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. |
| 74 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing | 74 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing |
| 75 | /// to avoid repeatedly fetching and rejecting the same events. | 75 | /// to avoid repeatedly fetching and rejecting the same events. |
| 76 | /// | 76 | /// |
| 77 | /// Uses the two-tier RejectedEventsIndex from rejected_index.rs: | 77 | /// Uses the two-tier RejectedEventsIndex from rejected_index.rs: |
| 78 | /// - Hot cache: Full events for 2 minutes (enables immediate re-processing) | 78 | /// - Hot cache: Full events for 2 minutes (enables immediate re-processing) |
| 79 | /// - Cold index: Metadata for 7 days (prevents repeated downloads) | 79 | /// - Cold index: Metadata for 7 days (prevents repeated downloads) |
| @@ -113,7 +113,9 @@ impl ConnectionStatus { | |||
| 113 | pub fn is_live_sync_active(&self) -> bool { | 113 | pub fn is_live_sync_active(&self) -> bool { |
| 114 | matches!( | 114 | matches!( |
| 115 | self, | 115 | self, |
| 116 | ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedHistoricSyncFailures | 116 | ConnectionStatus::Syncing |
| 117 | | ConnectionStatus::Connected | ||
| 118 | | ConnectionStatus::ConnectedHistoricSyncFailures | ||
| 117 | ) | 119 | ) |
| 118 | } | 120 | } |
| 119 | } | 121 | } |
| @@ -384,9 +386,7 @@ async fn run_rejected_index_cleanup( | |||
| 384 | let hot_cache_interval = Duration::from_secs(60); | 386 | let hot_cache_interval = Duration::from_secs(60); |
| 385 | let cold_index_interval = Duration::from_secs(86400); // 24 hours | 387 | let cold_index_interval = Duration::from_secs(86400); // 24 hours |
| 386 | 388 | ||
| 387 | tracing::info!( | 389 | tracing::info!("Rejected index cleanup started (hot cache: 60s, cold index: daily)"); |
| 388 | "Rejected index cleanup started (hot cache: 60s, cold index: daily)" | ||
| 389 | ); | ||
| 390 | 390 | ||
| 391 | let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); | 391 | let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); |
| 392 | let mut cold_index_timer = tokio::time::interval(cold_index_interval); | 392 | let mut cold_index_timer = tokio::time::interval(cold_index_interval); |
| @@ -399,7 +399,7 @@ async fn run_rejected_index_cleanup( | |||
| 399 | tokio::select! { | 399 | tokio::select! { |
| 400 | _ = hot_cache_timer.tick() => { | 400 | _ = hot_cache_timer.tick() => { |
| 401 | let manager = sync_manager.lock().await; | 401 | let manager = sync_manager.lock().await; |
| 402 | 402 | ||
| 403 | // Clean up announcements index | 403 | // Clean up announcements index |
| 404 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); | 404 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); |
| 405 | if hot_expired > 0 { | 405 | if hot_expired > 0 { |
| @@ -408,7 +408,7 @@ async fn run_rejected_index_cleanup( | |||
| 408 | hot_expired | 408 | hot_expired |
| 409 | ); | 409 | ); |
| 410 | } | 410 | } |
| 411 | 411 | ||
| 412 | // Clean up states index | 412 | // Clean up states index |
| 413 | let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); | 413 | let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); |
| 414 | if states_hot_expired > 0 { | 414 | if states_hot_expired > 0 { |
| @@ -420,7 +420,7 @@ async fn run_rejected_index_cleanup( | |||
| 420 | } | 420 | } |
| 421 | _ = cold_index_timer.tick() => { | 421 | _ = cold_index_timer.tick() => { |
| 422 | let manager = sync_manager.lock().await; | 422 | let manager = sync_manager.lock().await; |
| 423 | 423 | ||
| 424 | // Clean up announcements index | 424 | // Clean up announcements index |
| 425 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); | 425 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); |
| 426 | if cold_expired > 0 { | 426 | if cold_expired > 0 { |
| @@ -429,7 +429,7 @@ async fn run_rejected_index_cleanup( | |||
| 429 | cold_expired | 429 | cold_expired |
| 430 | ); | 430 | ); |
| 431 | } | 431 | } |
| 432 | 432 | ||
| 433 | // Clean up states index | 433 | // Clean up states index |
| 434 | let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); | 434 | let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); |
| 435 | if states_cold_expired > 0 { | 435 | if states_cold_expired > 0 { |
| @@ -799,8 +799,7 @@ impl SyncManager { | |||
| 799 | if let (Some(requested), Some(received)) = | 799 | if let (Some(requested), Some(received)) = |
| 800 | (&batch.requested_event_ids, &batch.received_event_ids) | 800 | (&batch.requested_event_ids, &batch.received_event_ids) |
| 801 | { | 801 | { |
| 802 | let missing: Vec<EventId> = | 802 | let missing: Vec<EventId> = requested.difference(received).cloned().collect(); |
| 803 | requested.difference(received).cloned().collect(); | ||
| 804 | 803 | ||
| 805 | if !missing.is_empty() { | 804 | if !missing.is_empty() { |
| 806 | let requested_count = requested.len(); | 805 | let requested_count = requested.len(); |
| @@ -884,13 +883,11 @@ impl SyncManager { | |||
| 884 | // Re-acquire lock and update batch with new subscriptions | 883 | // Re-acquire lock and update batch with new subscriptions |
| 885 | let mut pending = self.pending_sync_index.write().await; | 884 | let mut pending = self.pending_sync_index.write().await; |
| 886 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | 885 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { |
| 887 | if let Some(batch) = | 886 | if let Some(batch) = batches.iter_mut().find(|b| b.batch_id == batch_id) |
| 888 | batches.iter_mut().find(|b| b.batch_id == batch_id) | ||
| 889 | { | 887 | { |
| 890 | batch.outstanding_subs.extend(new_sub_ids.clone()); | 888 | batch.outstanding_subs.extend(new_sub_ids.clone()); |
| 891 | // Update requested_event_ids to only include missing ones | 889 | // Update requested_event_ids to only include missing ones |
| 892 | batch.requested_event_ids = | 890 | batch.requested_event_ids = Some(missing.iter().cloned().collect()); |
| 893 | Some(missing.iter().cloned().collect()); | ||
| 894 | // Clear received_event_ids for fresh tracking | 891 | // Clear received_event_ids for fresh tracking |
| 895 | batch.received_event_ids = Some(HashSet::new()); | 892 | batch.received_event_ids = Some(HashSet::new()); |
| 896 | // Increment retry counter | 893 | // Increment retry counter |
| @@ -921,14 +918,14 @@ impl SyncManager { | |||
| 921 | // Re-acquire lock to extract the batch | 918 | // Re-acquire lock to extract the batch |
| 922 | let mut pending = self.pending_sync_index.write().await; | 919 | let mut pending = self.pending_sync_index.write().await; |
| 923 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | 920 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { |
| 924 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) | 921 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { |
| 925 | { | ||
| 926 | let completed_batch = batches.remove(idx); | 922 | let completed_batch = batches.remove(idx); |
| 927 | if batches.is_empty() { | 923 | if batches.is_empty() { |
| 928 | pending.remove(&relay_url_for_retry); | 924 | pending.remove(&relay_url_for_retry); |
| 929 | } | 925 | } |
| 930 | drop(pending); | 926 | drop(pending); |
| 931 | self.confirm_batch(&relay_url_for_retry, completed_batch).await; | 927 | self.confirm_batch(&relay_url_for_retry, completed_batch) |
| 928 | .await; | ||
| 932 | } | 929 | } |
| 933 | } | 930 | } |
| 934 | return; | 931 | return; |
| @@ -1023,17 +1020,17 @@ impl SyncManager { | |||
| 1023 | "Batch completed but no RelayState found for relay" | 1020 | "Batch completed but no RelayState found for relay" |
| 1024 | ); | 1021 | ); |
| 1025 | } | 1022 | } |
| 1026 | 1023 | ||
| 1027 | // Release lock before checking if historic sync is complete | 1024 | // Release lock before checking if historic sync is complete |
| 1028 | drop(relay_index); | 1025 | drop(relay_index); |
| 1029 | 1026 | ||
| 1030 | // Spawn background task to check if historic sync is complete | 1027 | // Spawn background task to check if historic sync is complete |
| 1031 | // This avoids blocking the confirm_batch flow for 6 seconds | 1028 | // This avoids blocking the confirm_batch flow for 6 seconds |
| 1032 | let relay_url = relay_url.to_string(); | 1029 | let relay_url = relay_url.to_string(); |
| 1033 | let pending_index = self.pending_sync_index.clone(); | 1030 | let pending_index = self.pending_sync_index.clone(); |
| 1034 | let relay_index = self.relay_sync_index.clone(); | 1031 | let relay_index = self.relay_sync_index.clone(); |
| 1035 | let metrics = self.metrics.clone(); | 1032 | let metrics = self.metrics.clone(); |
| 1036 | 1033 | ||
| 1037 | tokio::spawn(async move { | 1034 | tokio::spawn(async move { |
| 1038 | Self::check_and_complete_historic_sync_impl( | 1035 | Self::check_and_complete_historic_sync_impl( |
| 1039 | &relay_url, | 1036 | &relay_url, |
| @@ -1073,29 +1070,33 @@ impl SyncManager { | |||
| 1073 | // First check: Are there any pending batches? | 1070 | // First check: Are there any pending batches? |
| 1074 | let has_pending = { | 1071 | let has_pending = { |
| 1075 | let pending = pending_index.read().await; | 1072 | let pending = pending_index.read().await; |
| 1076 | pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) | 1073 | pending |
| 1074 | .get(relay_url) | ||
| 1075 | .is_some_and(|batches| !batches.is_empty()) | ||
| 1077 | }; | 1076 | }; |
| 1078 | 1077 | ||
| 1079 | if has_pending { | 1078 | if has_pending { |
| 1080 | // Still syncing, don't transition yet | 1079 | // Still syncing, don't transition yet |
| 1081 | return; | 1080 | return; |
| 1082 | } | 1081 | } |
| 1083 | 1082 | ||
| 1084 | // Wait for self-subscriber batch window + buffer to catch any in-flight events | 1083 | // Wait for self-subscriber batch window + buffer to catch any in-flight events |
| 1085 | // that might create new Layer 2/3 filters | 1084 | // that might create new Layer 2/3 filters |
| 1086 | tokio::time::sleep(Duration::from_millis(6000)).await; | 1085 | tokio::time::sleep(Duration::from_millis(6000)).await; |
| 1087 | 1086 | ||
| 1088 | // Second check: Are there still no pending batches? | 1087 | // Second check: Are there still no pending batches? |
| 1089 | let has_pending = { | 1088 | let has_pending = { |
| 1090 | let pending = pending_index.read().await; | 1089 | let pending = pending_index.read().await; |
| 1091 | pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) | 1090 | pending |
| 1091 | .get(relay_url) | ||
| 1092 | .is_some_and(|batches| !batches.is_empty()) | ||
| 1092 | }; | 1093 | }; |
| 1093 | 1094 | ||
| 1094 | if has_pending { | 1095 | if has_pending { |
| 1095 | // New batches appeared during the wait - still syncing | 1096 | // New batches appeared during the wait - still syncing |
| 1096 | return; | 1097 | return; |
| 1097 | } | 1098 | } |
| 1098 | 1099 | ||
| 1099 | // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded | 1100 | // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded |
| 1100 | let mut relay_index_guard = relay_index.write().await; | 1101 | let mut relay_index_guard = relay_index.write().await; |
| 1101 | if let Some(state) = relay_index_guard.get_mut(relay_url) { | 1102 | if let Some(state) = relay_index_guard.get_mut(relay_url) { |
| @@ -1106,11 +1107,11 @@ impl SyncManager { | |||
| 1106 | } else { | 1107 | } else { |
| 1107 | ConnectionStatus::Connected | 1108 | ConnectionStatus::Connected |
| 1108 | }; | 1109 | }; |
| 1109 | 1110 | ||
| 1110 | state.connection_status = new_status; | 1111 | state.connection_status = new_status; |
| 1111 | state.historic_sync_completed = true; | 1112 | state.historic_sync_completed = true; |
| 1112 | state.historic_sync_completed_at = Some(Timestamp::now()); | 1113 | state.historic_sync_completed_at = Some(Timestamp::now()); |
| 1113 | 1114 | ||
| 1114 | tracing::info!( | 1115 | tracing::info!( |
| 1115 | relay = %relay_url, | 1116 | relay = %relay_url, |
| 1116 | repos_synced = state.repos.len(), | 1117 | repos_synced = state.repos.len(), |
| @@ -1120,7 +1121,7 @@ impl SyncManager { | |||
| 1120 | "Historic sync complete - transitioned to {} status", | 1121 | "Historic sync complete - transitioned to {} status", |
| 1121 | if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } | 1122 | if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } |
| 1122 | ); | 1123 | ); |
| 1123 | 1124 | ||
| 1124 | // Update metrics | 1125 | // Update metrics |
| 1125 | if let Some(ref metrics) = metrics { | 1126 | if let Some(ref metrics) = metrics { |
| 1126 | metrics.record_connection_status(relay_url, new_status); | 1127 | metrics.record_connection_status(relay_url, new_status); |
| @@ -1362,8 +1363,8 @@ impl SyncManager { | |||
| 1362 | ); | 1363 | ); |
| 1363 | return; | 1364 | return; |
| 1364 | } | 1365 | } |
| 1365 | Some(ConnectionStatus::Syncing) | 1366 | Some(ConnectionStatus::Syncing) |
| 1366 | | Some(ConnectionStatus::Connected) | 1367 | | Some(ConnectionStatus::Connected) |
| 1367 | | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { | 1368 | | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { |
| 1368 | // Continue to subscribe - live sync is active, can accept new filters | 1369 | // Continue to subscribe - live sync is active, can accept new filters |
| 1369 | } | 1370 | } |
| @@ -1468,7 +1469,8 @@ impl SyncManager { | |||
| 1468 | match relay_event { | 1469 | match relay_event { |
| 1469 | RelayEvent::Event(event, subscription_id) => { | 1470 | RelayEvent::Event(event, subscription_id) => { |
| 1470 | // Skip events we've already rejected (announcements only) | 1471 | // Skip events we've already rejected (announcements only) |
| 1471 | if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState) | 1472 | if (event.kind == Kind::GitRepoAnnouncement |
| 1473 | || event.kind == Kind::RepoState) | ||
| 1472 | && rejected_events_index.contains(&event.id) | 1474 | && rejected_events_index.contains(&event.id) |
| 1473 | { | 1475 | { |
| 1474 | tracing::trace!( | 1476 | tracing::trace!( |
| @@ -1479,7 +1481,7 @@ impl SyncManager { | |||
| 1479 | ); | 1481 | ); |
| 1480 | continue; | 1482 | continue; |
| 1481 | } | 1483 | } |
| 1482 | 1484 | ||
| 1483 | let result = Self::process_event_static( | 1485 | let result = Self::process_event_static( |
| 1484 | &event, | 1486 | &event, |
| 1485 | &relay_url_clone, | 1487 | &relay_url_clone, |
| @@ -1863,11 +1865,16 @@ impl SyncManager { | |||
| 1863 | // Create RelayConnection if not exists | 1865 | // Create RelayConnection if not exists |
| 1864 | if !self.connections.contains_key(&relay_url) { | 1866 | if !self.connections.contains_key(&relay_url) { |
| 1865 | // Get relay owner keys for NIP-42 authentication | 1867 | // Get relay owner keys for NIP-42 authentication |
| 1866 | let keys = self.config.relay_owner_keys() | 1868 | let keys = self |
| 1869 | .config | ||
| 1870 | .relay_owner_keys() | ||
| 1867 | .expect("relay_owner_keys should be available"); | 1871 | .expect("relay_owner_keys should be available"); |
| 1868 | 1872 | ||
| 1869 | let connection = | 1873 | let connection = RelayConnection::new_with_database( |
| 1870 | RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database), keys); | 1874 | relay_url.clone(), |
| 1875 | Arc::clone(&self.database), | ||
| 1876 | keys, | ||
| 1877 | ); | ||
| 1871 | self.connections.insert(relay_url.clone(), connection); | 1878 | self.connections.insert(relay_url.clone(), connection); |
| 1872 | tracing::debug!(relay = %relay_url, "Registered new relay connection"); | 1879 | tracing::debug!(relay = %relay_url, "Registered new relay connection"); |
| 1873 | } | 1880 | } |
| @@ -1919,7 +1926,7 @@ impl SyncManager { | |||
| 1919 | state.connection_status = ConnectionStatus::Connecting; | 1926 | state.connection_status = ConnectionStatus::Connecting; |
| 1920 | } | 1927 | } |
| 1921 | } | 1928 | } |
| 1922 | 1929 | ||
| 1923 | // Update metrics to show connecting status | 1930 | // Update metrics to show connecting status |
| 1924 | if let Some(ref metrics) = self.metrics { | 1931 | if let Some(ref metrics) = self.metrics { |
| 1925 | metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); | 1932 | metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); |
| @@ -1974,7 +1981,8 @@ impl SyncManager { | |||
| 1974 | if let Some(ref metrics) = self.metrics { | 1981 | if let Some(ref metrics) = self.metrics { |
| 1975 | metrics.record_connection_attempt(relay_url, false); | 1982 | metrics.record_connection_attempt(relay_url, false); |
| 1976 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); | 1983 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 1977 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1984 | metrics |
| 1985 | .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1978 | } | 1986 | } |
| 1979 | } | 1987 | } |
| 1980 | } | 1988 | } |
| @@ -2172,7 +2180,7 @@ impl SyncManager { | |||
| 2172 | // immediately and should now pass validation. | 2180 | // immediately and should now pass validation. |
| 2173 | if event.kind == Kind::GitRepoAnnouncement { | 2181 | if event.kind == Kind::GitRepoAnnouncement { |
| 2174 | use crate::nostr::events::RepositoryAnnouncement; | 2182 | use crate::nostr::events::RepositoryAnnouncement; |
| 2175 | 2183 | ||
| 2176 | match RepositoryAnnouncement::from_event(event.clone()) { | 2184 | match RepositoryAnnouncement::from_event(event.clone()) { |
| 2177 | Ok(announcement) => { | 2185 | Ok(announcement) => { |
| 2178 | if !announcement.maintainers.is_empty() { | 2186 | if !announcement.maintainers.is_empty() { |
| @@ -2219,15 +2227,16 @@ impl SyncManager { | |||
| 2219 | // 2. Second attempt uses maintainer exception (different code path) | 2227 | // 2. Second attempt uses maintainer exception (different code path) |
| 2220 | // 3. If second attempt fails, stays in cold index only (no third attempt) | 2228 | // 3. If second attempt fails, stays in cold index only (no third attempt) |
| 2221 | // Use Box::pin to avoid infinitely sized future | 2229 | // Use Box::pin to avoid infinitely sized future |
| 2222 | let reprocess_result = Box::pin(Self::process_event_static( | 2230 | let reprocess_result = |
| 2223 | &maintainer_event, | 2231 | Box::pin(Self::process_event_static( |
| 2224 | relay_url, | 2232 | &maintainer_event, |
| 2225 | database, | 2233 | relay_url, |
| 2226 | write_policy, | 2234 | database, |
| 2227 | local_relay, | 2235 | write_policy, |
| 2228 | rejected_events_index, | 2236 | local_relay, |
| 2229 | )) | 2237 | rejected_events_index, |
| 2230 | .await; | 2238 | )) |
| 2239 | .await; | ||
| 2231 | 2240 | ||
| 2232 | match reprocess_result { | 2241 | match reprocess_result { |
| 2233 | ProcessResult::Saved => { | 2242 | ProcessResult::Saved => { |
| @@ -2275,7 +2284,7 @@ impl SyncManager { | |||
| 2275 | ); | 2284 | ); |
| 2276 | } | 2285 | } |
| 2277 | } | 2286 | } |
| 2278 | 2287 | ||
| 2279 | // When a repository announcement is accepted, re-process any state events | 2288 | // When a repository announcement is accepted, re-process any state events |
| 2280 | // that were previously rejected because no announcement existed. | 2289 | // that were previously rejected because no announcement existed. |
| 2281 | // This handles the race condition where state events arrive before their | 2290 | // This handles the race condition where state events arrive before their |
| @@ -2284,7 +2293,10 @@ impl SyncManager { | |||
| 2284 | Ok(announcement) => { | 2293 | Ok(announcement) => { |
| 2285 | // Get the announcement author's state events that were rejected | 2294 | // Get the announcement author's state events that were rejected |
| 2286 | let (removed, hot_events) = rejected_events_index | 2295 | let (removed, hot_events) = rejected_events_index |
| 2287 | .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier); | 2296 | .invalidate_and_get_state_events( |
| 2297 | &event.pubkey, | ||
| 2298 | &announcement.identifier, | ||
| 2299 | ); | ||
| 2288 | 2300 | ||
| 2289 | if removed > 0 { | 2301 | if removed > 0 { |
| 2290 | tracing::info!( | 2302 | tracing::info!( |
| @@ -2357,7 +2369,7 @@ impl SyncManager { | |||
| 2357 | } | 2369 | } |
| 2358 | } | 2370 | } |
| 2359 | } | 2371 | } |
| 2360 | 2372 | ||
| 2361 | // When a state event is accepted (git data arrived), re-process any other | 2373 | // When a state event is accepted (git data arrived), re-process any other |
| 2362 | // rejected state events for the same repository. This handles the case where | 2374 | // rejected state events for the same repository. This handles the case where |
| 2363 | // multiple state events arrive but only one has git data initially. | 2375 | // multiple state events arrive but only one has git data initially. |
| @@ -2454,7 +2466,7 @@ impl SyncManager { | |||
| 2454 | reason = %message, | 2466 | reason = %message, |
| 2455 | "Event rejected by write policy" | 2467 | "Event rejected by write policy" |
| 2456 | ); | 2468 | ); |
| 2457 | 2469 | ||
| 2458 | // Track rejected announcement and state events to avoid re-fetching them | 2470 | // Track rejected announcement and state events to avoid re-fetching them |
| 2459 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | 2471 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { |
| 2460 | // Extract identifier from 'd' tag | 2472 | // Extract identifier from 'd' tag |
| @@ -2465,12 +2477,14 @@ impl SyncManager { | |||
| 2465 | .and_then(|t| t.content()) | 2477 | .and_then(|t| t.content()) |
| 2466 | { | 2478 | { |
| 2467 | // Determine rejection reason based on message | 2479 | // Determine rejection reason based on message |
| 2468 | let reason = if message.contains("doesn't list this service") | 2480 | let reason = if message.contains("doesn't list this service") |
| 2469 | || message.contains("Announcement must list service") { | 2481 | || message.contains("Announcement must list service") |
| 2482 | { | ||
| 2470 | rejected_index::RejectionReason::DoesNotListService | 2483 | rejected_index::RejectionReason::DoesNotListService |
| 2471 | } else if message.contains("maintainer") | 2484 | } else if message.contains("maintainer") |
| 2472 | || message.contains("no announcement exists") | 2485 | || message.contains("no announcement exists") |
| 2473 | || message.contains("not authorized") { | 2486 | || message.contains("not authorized") |
| 2487 | { | ||
| 2474 | rejected_index::RejectionReason::MaintainerNotYetValid | 2488 | rejected_index::RejectionReason::MaintainerNotYetValid |
| 2475 | } else { | 2489 | } else { |
| 2476 | rejected_index::RejectionReason::Other | 2490 | rejected_index::RejectionReason::Other |
| @@ -2512,7 +2526,7 @@ impl SyncManager { | |||
| 2512 | ); | 2526 | ); |
| 2513 | } | 2527 | } |
| 2514 | } | 2528 | } |
| 2515 | 2529 | ||
| 2516 | ProcessResult::Rejected | 2530 | ProcessResult::Rejected |
| 2517 | } | 2531 | } |
| 2518 | } | 2532 | } |
| @@ -3042,7 +3056,8 @@ impl SyncManager { | |||
| 3042 | // Get event IDs to exclude: purgatory + rejected announcements | 3056 | // Get event IDs to exclude: purgatory + rejected announcements |
| 3043 | let purgatory_ids = self.purgatory.event_ids(); | 3057 | let purgatory_ids = self.purgatory.event_ids(); |
| 3044 | let rejected_ids = self.rejected_events_index.get_all_event_ids(); | 3058 | let rejected_ids = self.rejected_events_index.get_all_event_ids(); |
| 3045 | let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); | 3059 | let excluded_ids: HashSet<EventId> = |
| 3060 | purgatory_ids.union(&rejected_ids).cloned().collect(); | ||
| 3046 | 3061 | ||
| 3047 | for (idx, result) in diff_results { | 3062 | for (idx, result) in diff_results { |
| 3048 | match result { | 3063 | match result { |
| @@ -3166,8 +3181,7 @@ impl SyncManager { | |||
| 3166 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { | 3181 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { |
| 3167 | batch.outstanding_subs.extend(subscription_ids.clone()); | 3182 | batch.outstanding_subs.extend(subscription_ids.clone()); |
| 3168 | // Store requested event IDs for validation after EOSE | 3183 | // Store requested event IDs for validation after EOSE |
| 3169 | batch.requested_event_ids = | 3184 | batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect()); |
| 3170 | Some(all_remote_ids.iter().cloned().collect()); | ||
| 3171 | batch.received_event_ids = Some(HashSet::new()); | 3185 | batch.received_event_ids = Some(HashSet::new()); |
| 3172 | } | 3186 | } |
| 3173 | } | 3187 | } |
| @@ -3359,20 +3373,16 @@ mod tests { | |||
| 3359 | async fn test_rejected_events_excluded_from_negentropy() { | 3373 | async fn test_rejected_events_excluded_from_negentropy() { |
| 3360 | // Create indices | 3374 | // Create indices |
| 3361 | let purgatory_ids: HashSet<EventId> = HashSet::new(); | 3375 | let purgatory_ids: HashSet<EventId> = HashSet::new(); |
| 3362 | let rejected_index = RejectedEventsIndex::new( | 3376 | let rejected_index = |
| 3363 | Duration::from_secs(120), | 3377 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 3364 | Duration::from_secs(604800), | ||
| 3365 | ); | ||
| 3366 | 3378 | ||
| 3367 | // Create test event IDs | 3379 | // Create test event IDs |
| 3368 | let _rejected_id = EventId::from_hex( | 3380 | let _rejected_id = |
| 3369 | "0000000000000000000000000000000000000000000000000000000000000001", | 3381 | EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000001") |
| 3370 | ) | 3382 | .unwrap(); |
| 3371 | .unwrap(); | 3383 | let valid_id = |
| 3372 | let valid_id = EventId::from_hex( | 3384 | EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000002") |
| 3373 | "0000000000000000000000000000000000000000000000000000000000000002", | 3385 | .unwrap(); |
| 3374 | ) | ||
| 3375 | .unwrap(); | ||
| 3376 | 3386 | ||
| 3377 | // Add rejected event to index | 3387 | // Add rejected event to index |
| 3378 | let keys = Keys::generate(); | 3388 | let keys = Keys::generate(); |
| @@ -3383,7 +3393,7 @@ mod tests { | |||
| 3383 | )) | 3393 | )) |
| 3384 | .sign_with_keys(&keys) | 3394 | .sign_with_keys(&keys) |
| 3385 | .unwrap(); | 3395 | .unwrap(); |
| 3386 | 3396 | ||
| 3387 | // Override the event ID for testing (we need a specific ID) | 3397 | // Override the event ID for testing (we need a specific ID) |
| 3388 | // Since we can't override the ID, let's use the actual event ID | 3398 | // Since we can't override the ID, let's use the actual event ID |
| 3389 | let rejected_id = rejected_event.id; | 3399 | let rejected_id = rejected_event.id; |
| @@ -3403,8 +3413,7 @@ mod tests { | |||
| 3403 | remote_ids.insert(valid_id); | 3413 | remote_ids.insert(valid_id); |
| 3404 | 3414 | ||
| 3405 | // Exclude rejected and purgatory events | 3415 | // Exclude rejected and purgatory events |
| 3406 | let excluded_ids: HashSet<EventId> = | 3416 | let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); |
| 3407 | purgatory_ids.union(&rejected_ids).cloned().collect(); | ||
| 3408 | let filtered_ids: HashSet<EventId> = | 3417 | let filtered_ids: HashSet<EventId> = |
| 3409 | remote_ids.difference(&excluded_ids).cloned().collect(); | 3418 | remote_ids.difference(&excluded_ids).cloned().collect(); |
| 3410 | 3419 | ||
| @@ -3422,22 +3431,14 @@ mod tests { | |||
| 3422 | // Requested 5 events from negentropy diff | 3431 | // Requested 5 events from negentropy diff |
| 3423 | let mut requested: HashSet<EventId> = HashSet::new(); | 3432 | let mut requested: HashSet<EventId> = HashSet::new(); |
| 3424 | for i in 1u8..=5 { | 3433 | for i in 1u8..=5 { |
| 3425 | let id = EventId::from_hex(&format!( | 3434 | let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); |
| 3426 | "{:0>64}", | ||
| 3427 | format!("{:x}", i) | ||
| 3428 | )) | ||
| 3429 | .unwrap(); | ||
| 3430 | requested.insert(id); | 3435 | requested.insert(id); |
| 3431 | } | 3436 | } |
| 3432 | 3437 | ||
| 3433 | // Only received 3 events (simulating relay limit) | 3438 | // Only received 3 events (simulating relay limit) |
| 3434 | let mut received: HashSet<EventId> = HashSet::new(); | 3439 | let mut received: HashSet<EventId> = HashSet::new(); |
| 3435 | for i in 1u8..=3 { | 3440 | for i in 1u8..=3 { |
| 3436 | let id = EventId::from_hex(&format!( | 3441 | let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); |
| 3437 | "{:0>64}", | ||
| 3438 | format!("{:x}", i) | ||
| 3439 | )) | ||
| 3440 | .unwrap(); | ||
| 3441 | received.insert(id); | 3442 | received.insert(id); |
| 3442 | } | 3443 | } |
| 3443 | 3444 | ||
| @@ -3461,11 +3462,7 @@ mod tests { | |||
| 3461 | // Simulate scenario where all requested events are received | 3462 | // Simulate scenario where all requested events are received |
| 3462 | let mut requested: HashSet<EventId> = HashSet::new(); | 3463 | let mut requested: HashSet<EventId> = HashSet::new(); |
| 3463 | for i in 1u8..=3 { | 3464 | for i in 1u8..=3 { |
| 3464 | let id = EventId::from_hex(&format!( | 3465 | let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); |
| 3465 | "{:0>64}", | ||
| 3466 | format!("{:x}", i) | ||
| 3467 | )) | ||
| 3468 | .unwrap(); | ||
| 3469 | requested.insert(id); | 3466 | requested.insert(id); |
| 3470 | } | 3467 | } |
| 3471 | 3468 | ||
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index a9d7a4d..403792a 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -190,9 +190,7 @@ impl HotCache { | |||
| 190 | let now = Instant::now(); | 190 | let now = Instant::now(); |
| 191 | let initial_count = entries.len(); | 191 | let initial_count = entries.len(); |
| 192 | 192 | ||
| 193 | entries.retain(|_, entry| { | 193 | entries.retain(|_, entry| now.duration_since(entry.cached_at) < self.expiry_duration); |
| 194 | now.duration_since(entry.cached_at) < self.expiry_duration | ||
| 195 | }); | ||
| 196 | 194 | ||
| 197 | initial_count - entries.len() | 195 | initial_count - entries.len() |
| 198 | } | 196 | } |
| @@ -284,9 +282,7 @@ impl ColdIndex { | |||
| 284 | let now = Instant::now(); | 282 | let now = Instant::now(); |
| 285 | let initial_count = entries.len(); | 283 | let initial_count = entries.len(); |
| 286 | 284 | ||
| 287 | entries.retain(|_, entry| { | 285 | entries.retain(|_, entry| now.duration_since(entry.rejected_at) < self.expiry_duration); |
| 288 | now.duration_since(entry.rejected_at) < self.expiry_duration | ||
| 289 | }); | ||
| 290 | 286 | ||
| 291 | initial_count - entries.len() | 287 | initial_count - entries.len() |
| 292 | } | 288 | } |
| @@ -389,12 +385,8 @@ impl RejectedEventsIndex { | |||
| 389 | reason: RejectionReason, | 385 | reason: RejectionReason, |
| 390 | ) { | 386 | ) { |
| 391 | // Add to hot cache (full event) | 387 | // Add to hot cache (full event) |
| 392 | self.hot_cache.add( | 388 | self.hot_cache |
| 393 | event.clone(), | 389 | .add(event.clone(), pubkey, identifier.clone(), reason); |
| 394 | pubkey, | ||
| 395 | identifier.clone(), | ||
| 396 | reason, | ||
| 397 | ); | ||
| 398 | 390 | ||
| 399 | // Add to cold index (metadata only) | 391 | // Add to cold index (metadata only) |
| 400 | self.cold_index.add(event.id, pubkey, identifier, reason); | 392 | self.cold_index.add(event.id, pubkey, identifier, reason); |
| @@ -419,12 +411,8 @@ impl RejectedEventsIndex { | |||
| 419 | reason: RejectionReason, | 411 | reason: RejectionReason, |
| 420 | ) { | 412 | ) { |
| 421 | // Add to hot cache (full event) | 413 | // Add to hot cache (full event) |
| 422 | self.hot_cache.add( | 414 | self.hot_cache |
| 423 | event.clone(), | 415 | .add(event.clone(), pubkey, identifier.clone(), reason); |
| 424 | pubkey, | ||
| 425 | identifier.clone(), | ||
| 426 | reason, | ||
| 427 | ); | ||
| 428 | 416 | ||
| 429 | // Add to cold index (metadata only) | 417 | // Add to cold index (metadata only) |
| 430 | self.cold_index.add(event.id, pubkey, identifier, reason); | 418 | self.cold_index.add(event.id, pubkey, identifier, reason); |
| @@ -608,8 +596,7 @@ mod tests { | |||
| 608 | 596 | ||
| 609 | async fn create_test_event() -> Event { | 597 | async fn create_test_event() -> Event { |
| 610 | let keys = Keys::generate(); | 598 | let keys = Keys::generate(); |
| 611 | let unsigned = nostr_sdk::EventBuilder::text_note("test") | 599 | let unsigned = nostr_sdk::EventBuilder::text_note("test").build(keys.public_key()); |
| 612 | .build(keys.public_key()); | ||
| 613 | keys.sign_event(unsigned).await.unwrap() | 600 | keys.sign_event(unsigned).await.unwrap() |
| 614 | } | 601 | } |
| 615 | 602 | ||
| @@ -695,10 +682,7 @@ mod tests { | |||
| 695 | 682 | ||
| 696 | #[tokio::test] | 683 | #[tokio::test] |
| 697 | async fn test_two_tier_index_add_and_contains() { | 684 | async fn test_two_tier_index_add_and_contains() { |
| 698 | let index = RejectedEventsIndex::new( | 685 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 699 | Duration::from_secs(120), | ||
| 700 | Duration::from_secs(604800), | ||
| 701 | ); | ||
| 702 | let event = create_test_event().await; | 686 | let event = create_test_event().await; |
| 703 | 687 | ||
| 704 | index.add_announcement( | 688 | index.add_announcement( |
| @@ -715,10 +699,7 @@ mod tests { | |||
| 715 | 699 | ||
| 716 | #[tokio::test] | 700 | #[tokio::test] |
| 717 | async fn test_invalidate_and_get_events() { | 701 | async fn test_invalidate_and_get_events() { |
| 718 | let index = RejectedEventsIndex::new( | 702 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 719 | Duration::from_secs(120), | ||
| 720 | Duration::from_secs(604800), | ||
| 721 | ); | ||
| 722 | let event = create_test_event().await; | 703 | let event = create_test_event().await; |
| 723 | let pubkey = event.pubkey; | 704 | let pubkey = event.pubkey; |
| 724 | let identifier = "test-repo".to_string(); | 705 | let identifier = "test-repo".to_string(); |
| @@ -773,10 +754,8 @@ mod tests { | |||
| 773 | 754 | ||
| 774 | #[tokio::test] | 755 | #[tokio::test] |
| 775 | async fn test_hot_cache_miss_after_expiry() { | 756 | async fn test_hot_cache_miss_after_expiry() { |
| 776 | let index = RejectedEventsIndex::new( | 757 | let index = |
| 777 | Duration::from_millis(50), | 758 | RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800)); |
| 778 | Duration::from_secs(604800), | ||
| 779 | ); | ||
| 780 | let event = create_test_event().await; | 759 | let event = create_test_event().await; |
| 781 | let pubkey = event.pubkey; | 760 | let pubkey = event.pubkey; |
| 782 | let identifier = "test-repo".to_string(); | 761 | let identifier = "test-repo".to_string(); |
| @@ -801,20 +780,15 @@ mod tests { | |||
| 801 | 780 | ||
| 802 | #[tokio::test] | 781 | #[tokio::test] |
| 803 | async fn test_multiple_maintainer_repos() { | 782 | async fn test_multiple_maintainer_repos() { |
| 804 | let index = RejectedEventsIndex::new( | 783 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 805 | Duration::from_secs(120), | ||
| 806 | Duration::from_secs(604800), | ||
| 807 | ); | ||
| 808 | 784 | ||
| 809 | let keys1 = Keys::generate(); | 785 | let keys1 = Keys::generate(); |
| 810 | let keys2 = Keys::generate(); | 786 | let keys2 = Keys::generate(); |
| 811 | 787 | ||
| 812 | let unsigned1 = nostr_sdk::EventBuilder::text_note("test1") | 788 | let unsigned1 = nostr_sdk::EventBuilder::text_note("test1").build(keys1.public_key()); |
| 813 | .build(keys1.public_key()); | ||
| 814 | let event1 = keys1.sign_event(unsigned1).await.unwrap(); | 789 | let event1 = keys1.sign_event(unsigned1).await.unwrap(); |
| 815 | 790 | ||
| 816 | let unsigned2 = nostr_sdk::EventBuilder::text_note("test2") | 791 | let unsigned2 = nostr_sdk::EventBuilder::text_note("test2").build(keys2.public_key()); |
| 817 | .build(keys2.public_key()); | ||
| 818 | let event2 = keys2.sign_event(unsigned2).await.unwrap(); | 792 | let event2 = keys2.sign_event(unsigned2).await.unwrap(); |
| 819 | 793 | ||
| 820 | // Add two different maintainer repos | 794 | // Add two different maintainer repos |
| @@ -836,8 +810,7 @@ mod tests { | |||
| 836 | assert_eq!(index.cold_index_len(), 2); | 810 | assert_eq!(index.cold_index_len(), 2); |
| 837 | 811 | ||
| 838 | // Invalidate only first maintainer | 812 | // Invalidate only first maintainer |
| 839 | let (removed, hot_events) = | 813 | let (removed, hot_events) = index.invalidate_and_get_events(&event1.pubkey, "repo1"); |
| 840 | index.invalidate_and_get_events(&event1.pubkey, "repo1"); | ||
| 841 | 814 | ||
| 842 | assert_eq!(removed, 1); | 815 | assert_eq!(removed, 1); |
| 843 | assert_eq!(hot_events.len(), 1); | 816 | assert_eq!(hot_events.len(), 1); |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index d0090c8..b86d298 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -526,31 +526,46 @@ mod tests { | |||
| 526 | #[test] | 526 | #[test] |
| 527 | fn test_normalize_url_with_wss_scheme() { | 527 | fn test_normalize_url_with_wss_scheme() { |
| 528 | let url = "wss://relay.example.com"; | 528 | let url = "wss://relay.example.com"; |
| 529 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); | 529 | assert_eq!( |
| 530 | RelayConnection::normalize_url(url), | ||
| 531 | "wss://relay.example.com" | ||
| 532 | ); | ||
| 530 | } | 533 | } |
| 531 | 534 | ||
| 532 | #[test] | 535 | #[test] |
| 533 | fn test_normalize_url_with_ws_scheme() { | 536 | fn test_normalize_url_with_ws_scheme() { |
| 534 | let url = "ws://relay.example.com"; | 537 | let url = "ws://relay.example.com"; |
| 535 | assert_eq!(RelayConnection::normalize_url(url), "ws://relay.example.com"); | 538 | assert_eq!( |
| 539 | RelayConnection::normalize_url(url), | ||
| 540 | "ws://relay.example.com" | ||
| 541 | ); | ||
| 536 | } | 542 | } |
| 537 | 543 | ||
| 538 | #[test] | 544 | #[test] |
| 539 | fn test_normalize_url_without_scheme() { | 545 | fn test_normalize_url_without_scheme() { |
| 540 | let url = "relay.example.com"; | 546 | let url = "relay.example.com"; |
| 541 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); | 547 | assert_eq!( |
| 548 | RelayConnection::normalize_url(url), | ||
| 549 | "wss://relay.example.com" | ||
| 550 | ); | ||
| 542 | } | 551 | } |
| 543 | 552 | ||
| 544 | #[test] | 553 | #[test] |
| 545 | fn test_normalize_url_without_scheme_with_port() { | 554 | fn test_normalize_url_without_scheme_with_port() { |
| 546 | let url = "relay.example.com:8080"; | 555 | let url = "relay.example.com:8080"; |
| 547 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com:8080"); | 556 | assert_eq!( |
| 557 | RelayConnection::normalize_url(url), | ||
| 558 | "wss://relay.example.com:8080" | ||
| 559 | ); | ||
| 548 | } | 560 | } |
| 549 | 561 | ||
| 550 | #[test] | 562 | #[test] |
| 551 | fn test_normalize_url_with_path() { | 563 | fn test_normalize_url_with_path() { |
| 552 | let url = "relay.example.com/nostr"; | 564 | let url = "relay.example.com/nostr"; |
| 553 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com/nostr"); | 565 | assert_eq!( |
| 566 | RelayConnection::normalize_url(url), | ||
| 567 | "wss://relay.example.com/nostr" | ||
| 568 | ); | ||
| 554 | } | 569 | } |
| 555 | 570 | ||
| 556 | #[test] | 571 | #[test] |
| @@ -587,6 +602,9 @@ mod tests { | |||
| 587 | fn test_normalize_url_real_world_example() { | 602 | fn test_normalize_url_real_world_example() { |
| 588 | // Test the exact case from the bug report | 603 | // Test the exact case from the bug report |
| 589 | let url = "git.shakespeare.diy"; | 604 | let url = "git.shakespeare.diy"; |
| 590 | assert_eq!(RelayConnection::normalize_url(url), "wss://git.shakespeare.diy"); | 605 | assert_eq!( |
| 606 | RelayConnection::normalize_url(url), | ||
| 607 | "wss://git.shakespeare.diy" | ||
| 608 | ); | ||
| 591 | } | 609 | } |
| 592 | } | 610 | } |