upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 19:58:41 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 19:58:41 +0000
commitb28a356cb41077ccee12a9c52f4ef2054e76cac6 (patch)
tree2a0867f1ab0216e86efa062aef90b2b8077e6fb9 /src/sync
parent6dd9fcd5392891b0ddb7894e2c5cb40450eae00e (diff)
chore: cargo fmt
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/metrics.rs37
-rw-r--r--src/sync/mod.rs185
-rw-r--r--src/sync/rejected_index.rs57
-rw-r--r--src/sync/relay_connection.rs30
4 files changed, 156 insertions, 153 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index 7d6d42d..2ed983e 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -156,19 +156,25 @@ impl SyncMetrics {
156 "ngit_sync_rejected_announcements_hot_cache_hits_total", 156 "ngit_sync_rejected_announcements_hot_cache_hits_total",
157 "Total hot cache hits (events re-processed from cache)", 157 "Total hot cache hits (events re-processed from cache)",
158 ))?; 158 ))?;
159 registry.register(Box::new(rejected_announcements_hot_cache_hits_total.clone()))?; 159 registry.register(Box::new(
160 rejected_announcements_hot_cache_hits_total.clone(),
161 ))?;
160 162
161 let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new( 163 let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new(
162 "ngit_sync_rejected_announcements_hot_cache_misses_total", 164 "ngit_sync_rejected_announcements_hot_cache_misses_total",
163 "Total hot cache misses (events not in cache when invalidated)", 165 "Total hot cache misses (events not in cache when invalidated)",
164 ))?; 166 ))?;
165 registry.register(Box::new(rejected_announcements_hot_cache_misses_total.clone()))?; 167 registry.register(Box::new(
168 rejected_announcements_hot_cache_misses_total.clone(),
169 ))?;
166 170
167 let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new( 171 let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new(
168 "ngit_sync_rejected_announcements_hot_cache_expired_total", 172 "ngit_sync_rejected_announcements_hot_cache_expired_total",
169 "Total expired entries removed from hot cache", 173 "Total expired entries removed from hot cache",
170 ))?; 174 ))?;
171 registry.register(Box::new(rejected_announcements_hot_cache_expired_total.clone()))?; 175 registry.register(Box::new(
176 rejected_announcements_hot_cache_expired_total.clone(),
177 ))?;
172 178
173 let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new( 179 let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new(
174 "ngit_sync_rejected_announcements_cold_index_current", 180 "ngit_sync_rejected_announcements_cold_index_current",
@@ -180,7 +186,9 @@ impl SyncMetrics {
180 "ngit_sync_rejected_announcements_cold_index_expired_total", 186 "ngit_sync_rejected_announcements_cold_index_expired_total",
181 "Total expired entries removed from cold index", 187 "Total expired entries removed from cold index",
182 ))?; 188 ))?;
183 registry.register(Box::new(rejected_announcements_cold_index_expired_total.clone()))?; 189 registry.register(Box::new(
190 rejected_announcements_cold_index_expired_total.clone(),
191 ))?;
184 192
185 let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new( 193 let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new(
186 "ngit_sync_rejected_announcements_invalidated_total", 194 "ngit_sync_rejected_announcements_invalidated_total",
@@ -430,7 +438,8 @@ impl SyncMetrics {
430 438
431 /// Update hot cache current size gauge. 439 /// Update hot cache current size gauge.
432 pub fn update_hot_cache_size(&self, size: usize) { 440 pub fn update_hot_cache_size(&self, size: usize) {
433 self.rejected_announcements_hot_cache_current.set(size as i64); 441 self.rejected_announcements_hot_cache_current
442 .set(size as i64);
434 } 443 }
435 444
436 /// Record hot cache hit (event re-processed from cache). 445 /// Record hot cache hit (event re-processed from cache).
@@ -445,22 +454,26 @@ impl SyncMetrics {
445 454
446 /// Record hot cache expired entries. 455 /// Record hot cache expired entries.
447 pub fn record_hot_cache_expired(&self, count: usize) { 456 pub fn record_hot_cache_expired(&self, count: usize) {
448 self.rejected_announcements_hot_cache_expired_total.inc_by(count as u64); 457 self.rejected_announcements_hot_cache_expired_total
458 .inc_by(count as u64);
449 } 459 }
450 460
451 /// Update cold index current size gauge. 461 /// Update cold index current size gauge.
452 pub fn update_cold_index_size(&self, size: usize) { 462 pub fn update_cold_index_size(&self, size: usize) {
453 self.rejected_announcements_cold_index_current.set(size as i64); 463 self.rejected_announcements_cold_index_current
464 .set(size as i64);
454 } 465 }
455 466
456 /// Record cold index expired entries. 467 /// Record cold index expired entries.
457 pub fn record_cold_index_expired(&self, count: usize) { 468 pub fn record_cold_index_expired(&self, count: usize) {
458 self.rejected_announcements_cold_index_expired_total.inc_by(count as u64); 469 self.rejected_announcements_cold_index_expired_total
470 .inc_by(count as u64);
459 } 471 }
460 472
461 /// Record invalidation (maintainer announcement invalidated). 473 /// Record invalidation (maintainer announcement invalidated).
462 pub fn record_invalidation(&self, count: usize) { 474 pub fn record_invalidation(&self, count: usize) {
463 self.rejected_announcements_invalidated_total.inc_by(count as u64); 475 self.rejected_announcements_invalidated_total
476 .inc_by(count as u64);
464 } 477 }
465 478
466 // === Rejected States Recording Methods === 479 // === Rejected States Recording Methods ===
@@ -482,7 +495,8 @@ impl SyncMetrics {
482 495
483 /// Record state event hot cache expired entries. 496 /// Record state event hot cache expired entries.
484 pub fn record_states_hot_cache_expired(&self, count: usize) { 497 pub fn record_states_hot_cache_expired(&self, count: usize) {
485 self.rejected_states_hot_cache_expired_total.inc_by(count as u64); 498 self.rejected_states_hot_cache_expired_total
499 .inc_by(count as u64);
486 } 500 }
487 501
488 /// Update state events cold index current size gauge. 502 /// Update state events cold index current size gauge.
@@ -492,7 +506,8 @@ impl SyncMetrics {
492 506
493 /// Record state event cold index expired entries. 507 /// Record state event cold index expired entries.
494 pub fn record_states_cold_index_expired(&self, count: usize) { 508 pub fn record_states_cold_index_expired(&self, count: usize) {
495 self.rejected_states_cold_index_expired_total.inc_by(count as u64); 509 self.rejected_states_cold_index_expired_total
510 .inc_by(count as u64);
496 } 511 }
497 512
498 /// Record state event invalidation. 513 /// Record state event invalidation.
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index ed3b78c..07527c7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds};
27pub use metrics::SyncMetrics; 27pub use metrics::SyncMetrics;
28 28
29// Re-export rejected index types 29// Re-export rejected index types
30pub use rejected_index::{RejectionReason}; 30pub use rejected_index::RejectionReason;
31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used 31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used
32// Current code still uses the simple HashSet type alias below 32// Current code still uses the simple HashSet type alias below
33 33
@@ -73,7 +73,7 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
73/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. 73/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync.
74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing 74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing
75/// to avoid repeatedly fetching and rejecting the same events. 75/// to avoid repeatedly fetching and rejecting the same events.
76/// 76///
77/// Uses the two-tier RejectedEventsIndex from rejected_index.rs: 77/// Uses the two-tier RejectedEventsIndex from rejected_index.rs:
78/// - Hot cache: Full events for 2 minutes (enables immediate re-processing) 78/// - Hot cache: Full events for 2 minutes (enables immediate re-processing)
79/// - Cold index: Metadata for 7 days (prevents repeated downloads) 79/// - Cold index: Metadata for 7 days (prevents repeated downloads)
@@ -113,7 +113,9 @@ impl ConnectionStatus {
113 pub fn is_live_sync_active(&self) -> bool { 113 pub fn is_live_sync_active(&self) -> bool {
114 matches!( 114 matches!(
115 self, 115 self,
116 ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedHistoricSyncFailures 116 ConnectionStatus::Syncing
117 | ConnectionStatus::Connected
118 | ConnectionStatus::ConnectedHistoricSyncFailures
117 ) 119 )
118 } 120 }
119} 121}
@@ -384,9 +386,7 @@ async fn run_rejected_index_cleanup(
384 let hot_cache_interval = Duration::from_secs(60); 386 let hot_cache_interval = Duration::from_secs(60);
385 let cold_index_interval = Duration::from_secs(86400); // 24 hours 387 let cold_index_interval = Duration::from_secs(86400); // 24 hours
386 388
387 tracing::info!( 389 tracing::info!("Rejected index cleanup started (hot cache: 60s, cold index: daily)");
388 "Rejected index cleanup started (hot cache: 60s, cold index: daily)"
389 );
390 390
391 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); 391 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval);
392 let mut cold_index_timer = tokio::time::interval(cold_index_interval); 392 let mut cold_index_timer = tokio::time::interval(cold_index_interval);
@@ -399,7 +399,7 @@ async fn run_rejected_index_cleanup(
399 tokio::select! { 399 tokio::select! {
400 _ = hot_cache_timer.tick() => { 400 _ = hot_cache_timer.tick() => {
401 let manager = sync_manager.lock().await; 401 let manager = sync_manager.lock().await;
402 402
403 // Clean up announcements index 403 // Clean up announcements index
404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); 404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired();
405 if hot_expired > 0 { 405 if hot_expired > 0 {
@@ -408,7 +408,7 @@ async fn run_rejected_index_cleanup(
408 hot_expired 408 hot_expired
409 ); 409 );
410 } 410 }
411 411
412 // Clean up states index 412 // Clean up states index
413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); 413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired();
414 if states_hot_expired > 0 { 414 if states_hot_expired > 0 {
@@ -420,7 +420,7 @@ async fn run_rejected_index_cleanup(
420 } 420 }
421 _ = cold_index_timer.tick() => { 421 _ = cold_index_timer.tick() => {
422 let manager = sync_manager.lock().await; 422 let manager = sync_manager.lock().await;
423 423
424 // Clean up announcements index 424 // Clean up announcements index
425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); 425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired();
426 if cold_expired > 0 { 426 if cold_expired > 0 {
@@ -429,7 +429,7 @@ async fn run_rejected_index_cleanup(
429 cold_expired 429 cold_expired
430 ); 430 );
431 } 431 }
432 432
433 // Clean up states index 433 // Clean up states index
434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); 434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired();
435 if states_cold_expired > 0 { 435 if states_cold_expired > 0 {
@@ -799,8 +799,7 @@ impl SyncManager {
799 if let (Some(requested), Some(received)) = 799 if let (Some(requested), Some(received)) =
800 (&batch.requested_event_ids, &batch.received_event_ids) 800 (&batch.requested_event_ids, &batch.received_event_ids)
801 { 801 {
802 let missing: Vec<EventId> = 802 let missing: Vec<EventId> = requested.difference(received).cloned().collect();
803 requested.difference(received).cloned().collect();
804 803
805 if !missing.is_empty() { 804 if !missing.is_empty() {
806 let requested_count = requested.len(); 805 let requested_count = requested.len();
@@ -884,13 +883,11 @@ impl SyncManager {
884 // Re-acquire lock and update batch with new subscriptions 883 // Re-acquire lock and update batch with new subscriptions
885 let mut pending = self.pending_sync_index.write().await; 884 let mut pending = self.pending_sync_index.write().await;
886 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 885 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
887 if let Some(batch) = 886 if let Some(batch) = batches.iter_mut().find(|b| b.batch_id == batch_id)
888 batches.iter_mut().find(|b| b.batch_id == batch_id)
889 { 887 {
890 batch.outstanding_subs.extend(new_sub_ids.clone()); 888 batch.outstanding_subs.extend(new_sub_ids.clone());
891 // Update requested_event_ids to only include missing ones 889 // Update requested_event_ids to only include missing ones
892 batch.requested_event_ids = 890 batch.requested_event_ids = Some(missing.iter().cloned().collect());
893 Some(missing.iter().cloned().collect());
894 // Clear received_event_ids for fresh tracking 891 // Clear received_event_ids for fresh tracking
895 batch.received_event_ids = Some(HashSet::new()); 892 batch.received_event_ids = Some(HashSet::new());
896 // Increment retry counter 893 // Increment retry counter
@@ -921,14 +918,14 @@ impl SyncManager {
921 // Re-acquire lock to extract the batch 918 // Re-acquire lock to extract the batch
922 let mut pending = self.pending_sync_index.write().await; 919 let mut pending = self.pending_sync_index.write().await;
923 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 920 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
924 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) 921 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
925 {
926 let completed_batch = batches.remove(idx); 922 let completed_batch = batches.remove(idx);
927 if batches.is_empty() { 923 if batches.is_empty() {
928 pending.remove(&relay_url_for_retry); 924 pending.remove(&relay_url_for_retry);
929 } 925 }
930 drop(pending); 926 drop(pending);
931 self.confirm_batch(&relay_url_for_retry, completed_batch).await; 927 self.confirm_batch(&relay_url_for_retry, completed_batch)
928 .await;
932 } 929 }
933 } 930 }
934 return; 931 return;
@@ -1023,17 +1020,17 @@ impl SyncManager {
1023 "Batch completed but no RelayState found for relay" 1020 "Batch completed but no RelayState found for relay"
1024 ); 1021 );
1025 } 1022 }
1026 1023
1027 // Release lock before checking if historic sync is complete 1024 // Release lock before checking if historic sync is complete
1028 drop(relay_index); 1025 drop(relay_index);
1029 1026
1030 // Spawn background task to check if historic sync is complete 1027 // Spawn background task to check if historic sync is complete
1031 // This avoids blocking the confirm_batch flow for 6 seconds 1028 // This avoids blocking the confirm_batch flow for 6 seconds
1032 let relay_url = relay_url.to_string(); 1029 let relay_url = relay_url.to_string();
1033 let pending_index = self.pending_sync_index.clone(); 1030 let pending_index = self.pending_sync_index.clone();
1034 let relay_index = self.relay_sync_index.clone(); 1031 let relay_index = self.relay_sync_index.clone();
1035 let metrics = self.metrics.clone(); 1032 let metrics = self.metrics.clone();
1036 1033
1037 tokio::spawn(async move { 1034 tokio::spawn(async move {
1038 Self::check_and_complete_historic_sync_impl( 1035 Self::check_and_complete_historic_sync_impl(
1039 &relay_url, 1036 &relay_url,
@@ -1073,29 +1070,33 @@ impl SyncManager {
1073 // First check: Are there any pending batches? 1070 // First check: Are there any pending batches?
1074 let has_pending = { 1071 let has_pending = {
1075 let pending = pending_index.read().await; 1072 let pending = pending_index.read().await;
1076 pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) 1073 pending
1074 .get(relay_url)
1075 .is_some_and(|batches| !batches.is_empty())
1077 }; 1076 };
1078 1077
1079 if has_pending { 1078 if has_pending {
1080 // Still syncing, don't transition yet 1079 // Still syncing, don't transition yet
1081 return; 1080 return;
1082 } 1081 }
1083 1082
1084 // Wait for self-subscriber batch window + buffer to catch any in-flight events 1083 // Wait for self-subscriber batch window + buffer to catch any in-flight events
1085 // that might create new Layer 2/3 filters 1084 // that might create new Layer 2/3 filters
1086 tokio::time::sleep(Duration::from_millis(6000)).await; 1085 tokio::time::sleep(Duration::from_millis(6000)).await;
1087 1086
1088 // Second check: Are there still no pending batches? 1087 // Second check: Are there still no pending batches?
1089 let has_pending = { 1088 let has_pending = {
1090 let pending = pending_index.read().await; 1089 let pending = pending_index.read().await;
1091 pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) 1090 pending
1091 .get(relay_url)
1092 .is_some_and(|batches| !batches.is_empty())
1092 }; 1093 };
1093 1094
1094 if has_pending { 1095 if has_pending {
1095 // New batches appeared during the wait - still syncing 1096 // New batches appeared during the wait - still syncing
1096 return; 1097 return;
1097 } 1098 }
1098 1099
1099 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded 1100 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded
1100 let mut relay_index_guard = relay_index.write().await; 1101 let mut relay_index_guard = relay_index.write().await;
1101 if let Some(state) = relay_index_guard.get_mut(relay_url) { 1102 if let Some(state) = relay_index_guard.get_mut(relay_url) {
@@ -1106,11 +1107,11 @@ impl SyncManager {
1106 } else { 1107 } else {
1107 ConnectionStatus::Connected 1108 ConnectionStatus::Connected
1108 }; 1109 };
1109 1110
1110 state.connection_status = new_status; 1111 state.connection_status = new_status;
1111 state.historic_sync_completed = true; 1112 state.historic_sync_completed = true;
1112 state.historic_sync_completed_at = Some(Timestamp::now()); 1113 state.historic_sync_completed_at = Some(Timestamp::now());
1113 1114
1114 tracing::info!( 1115 tracing::info!(
1115 relay = %relay_url, 1116 relay = %relay_url,
1116 repos_synced = state.repos.len(), 1117 repos_synced = state.repos.len(),
@@ -1120,7 +1121,7 @@ impl SyncManager {
1120 "Historic sync complete - transitioned to {} status", 1121 "Historic sync complete - transitioned to {} status",
1121 if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } 1122 if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" }
1122 ); 1123 );
1123 1124
1124 // Update metrics 1125 // Update metrics
1125 if let Some(ref metrics) = metrics { 1126 if let Some(ref metrics) = metrics {
1126 metrics.record_connection_status(relay_url, new_status); 1127 metrics.record_connection_status(relay_url, new_status);
@@ -1362,8 +1363,8 @@ impl SyncManager {
1362 ); 1363 );
1363 return; 1364 return;
1364 } 1365 }
1365 Some(ConnectionStatus::Syncing) 1366 Some(ConnectionStatus::Syncing)
1366 | Some(ConnectionStatus::Connected) 1367 | Some(ConnectionStatus::Connected)
1367 | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { 1368 | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => {
1368 // Continue to subscribe - live sync is active, can accept new filters 1369 // Continue to subscribe - live sync is active, can accept new filters
1369 } 1370 }
@@ -1468,7 +1469,8 @@ impl SyncManager {
1468 match relay_event { 1469 match relay_event {
1469 RelayEvent::Event(event, subscription_id) => { 1470 RelayEvent::Event(event, subscription_id) => {
1470 // Skip events we've already rejected (announcements only) 1471 // Skip events we've already rejected (announcements only)
1471 if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState) 1472 if (event.kind == Kind::GitRepoAnnouncement
1473 || event.kind == Kind::RepoState)
1472 && rejected_events_index.contains(&event.id) 1474 && rejected_events_index.contains(&event.id)
1473 { 1475 {
1474 tracing::trace!( 1476 tracing::trace!(
@@ -1479,7 +1481,7 @@ impl SyncManager {
1479 ); 1481 );
1480 continue; 1482 continue;
1481 } 1483 }
1482 1484
1483 let result = Self::process_event_static( 1485 let result = Self::process_event_static(
1484 &event, 1486 &event,
1485 &relay_url_clone, 1487 &relay_url_clone,
@@ -1863,11 +1865,16 @@ impl SyncManager {
1863 // Create RelayConnection if not exists 1865 // Create RelayConnection if not exists
1864 if !self.connections.contains_key(&relay_url) { 1866 if !self.connections.contains_key(&relay_url) {
1865 // Get relay owner keys for NIP-42 authentication 1867 // Get relay owner keys for NIP-42 authentication
1866 let keys = self.config.relay_owner_keys() 1868 let keys = self
1869 .config
1870 .relay_owner_keys()
1867 .expect("relay_owner_keys should be available"); 1871 .expect("relay_owner_keys should be available");
1868 1872
1869 let connection = 1873 let connection = RelayConnection::new_with_database(
1870 RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database), keys); 1874 relay_url.clone(),
1875 Arc::clone(&self.database),
1876 keys,
1877 );
1871 self.connections.insert(relay_url.clone(), connection); 1878 self.connections.insert(relay_url.clone(), connection);
1872 tracing::debug!(relay = %relay_url, "Registered new relay connection"); 1879 tracing::debug!(relay = %relay_url, "Registered new relay connection");
1873 } 1880 }
@@ -1919,7 +1926,7 @@ impl SyncManager {
1919 state.connection_status = ConnectionStatus::Connecting; 1926 state.connection_status = ConnectionStatus::Connecting;
1920 } 1927 }
1921 } 1928 }
1922 1929
1923 // Update metrics to show connecting status 1930 // Update metrics to show connecting status
1924 if let Some(ref metrics) = self.metrics { 1931 if let Some(ref metrics) = self.metrics {
1925 metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); 1932 metrics.record_connection_status(relay_url, ConnectionStatus::Connecting);
@@ -1974,7 +1981,8 @@ impl SyncManager {
1974 if let Some(ref metrics) = self.metrics { 1981 if let Some(ref metrics) = self.metrics {
1975 metrics.record_connection_attempt(relay_url, false); 1982 metrics.record_connection_attempt(relay_url, false);
1976 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); 1983 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
1977 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 1984 metrics
1985 .record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1978 } 1986 }
1979 } 1987 }
1980 } 1988 }
@@ -2172,7 +2180,7 @@ impl SyncManager {
2172 // immediately and should now pass validation. 2180 // immediately and should now pass validation.
2173 if event.kind == Kind::GitRepoAnnouncement { 2181 if event.kind == Kind::GitRepoAnnouncement {
2174 use crate::nostr::events::RepositoryAnnouncement; 2182 use crate::nostr::events::RepositoryAnnouncement;
2175 2183
2176 match RepositoryAnnouncement::from_event(event.clone()) { 2184 match RepositoryAnnouncement::from_event(event.clone()) {
2177 Ok(announcement) => { 2185 Ok(announcement) => {
2178 if !announcement.maintainers.is_empty() { 2186 if !announcement.maintainers.is_empty() {
@@ -2219,15 +2227,16 @@ impl SyncManager {
2219 // 2. Second attempt uses maintainer exception (different code path) 2227 // 2. Second attempt uses maintainer exception (different code path)
2220 // 3. If second attempt fails, stays in cold index only (no third attempt) 2228 // 3. If second attempt fails, stays in cold index only (no third attempt)
2221 // Use Box::pin to avoid infinitely sized future 2229 // Use Box::pin to avoid infinitely sized future
2222 let reprocess_result = Box::pin(Self::process_event_static( 2230 let reprocess_result =
2223 &maintainer_event, 2231 Box::pin(Self::process_event_static(
2224 relay_url, 2232 &maintainer_event,
2225 database, 2233 relay_url,
2226 write_policy, 2234 database,
2227 local_relay, 2235 write_policy,
2228 rejected_events_index, 2236 local_relay,
2229 )) 2237 rejected_events_index,
2230 .await; 2238 ))
2239 .await;
2231 2240
2232 match reprocess_result { 2241 match reprocess_result {
2233 ProcessResult::Saved => { 2242 ProcessResult::Saved => {
@@ -2275,7 +2284,7 @@ impl SyncManager {
2275 ); 2284 );
2276 } 2285 }
2277 } 2286 }
2278 2287
2279 // When a repository announcement is accepted, re-process any state events 2288 // When a repository announcement is accepted, re-process any state events
2280 // that were previously rejected because no announcement existed. 2289 // that were previously rejected because no announcement existed.
2281 // This handles the race condition where state events arrive before their 2290 // This handles the race condition where state events arrive before their
@@ -2284,7 +2293,10 @@ impl SyncManager {
2284 Ok(announcement) => { 2293 Ok(announcement) => {
2285 // Get the announcement author's state events that were rejected 2294 // Get the announcement author's state events that were rejected
2286 let (removed, hot_events) = rejected_events_index 2295 let (removed, hot_events) = rejected_events_index
2287 .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier); 2296 .invalidate_and_get_state_events(
2297 &event.pubkey,
2298 &announcement.identifier,
2299 );
2288 2300
2289 if removed > 0 { 2301 if removed > 0 {
2290 tracing::info!( 2302 tracing::info!(
@@ -2357,7 +2369,7 @@ impl SyncManager {
2357 } 2369 }
2358 } 2370 }
2359 } 2371 }
2360 2372
2361 // When a state event is accepted (git data arrived), re-process any other 2373 // When a state event is accepted (git data arrived), re-process any other
2362 // rejected state events for the same repository. This handles the case where 2374 // rejected state events for the same repository. This handles the case where
2363 // multiple state events arrive but only one has git data initially. 2375 // multiple state events arrive but only one has git data initially.
@@ -2454,7 +2466,7 @@ impl SyncManager {
2454 reason = %message, 2466 reason = %message,
2455 "Event rejected by write policy" 2467 "Event rejected by write policy"
2456 ); 2468 );
2457 2469
2458 // Track rejected announcement and state events to avoid re-fetching them 2470 // Track rejected announcement and state events to avoid re-fetching them
2459 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { 2471 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
2460 // Extract identifier from 'd' tag 2472 // Extract identifier from 'd' tag
@@ -2465,12 +2477,14 @@ impl SyncManager {
2465 .and_then(|t| t.content()) 2477 .and_then(|t| t.content())
2466 { 2478 {
2467 // Determine rejection reason based on message 2479 // Determine rejection reason based on message
2468 let reason = if message.contains("doesn't list this service") 2480 let reason = if message.contains("doesn't list this service")
2469 || message.contains("Announcement must list service") { 2481 || message.contains("Announcement must list service")
2482 {
2470 rejected_index::RejectionReason::DoesNotListService 2483 rejected_index::RejectionReason::DoesNotListService
2471 } else if message.contains("maintainer") 2484 } else if message.contains("maintainer")
2472 || message.contains("no announcement exists") 2485 || message.contains("no announcement exists")
2473 || message.contains("not authorized") { 2486 || message.contains("not authorized")
2487 {
2474 rejected_index::RejectionReason::MaintainerNotYetValid 2488 rejected_index::RejectionReason::MaintainerNotYetValid
2475 } else { 2489 } else {
2476 rejected_index::RejectionReason::Other 2490 rejected_index::RejectionReason::Other
@@ -2512,7 +2526,7 @@ impl SyncManager {
2512 ); 2526 );
2513 } 2527 }
2514 } 2528 }
2515 2529
2516 ProcessResult::Rejected 2530 ProcessResult::Rejected
2517 } 2531 }
2518 } 2532 }
@@ -3042,7 +3056,8 @@ impl SyncManager {
3042 // Get event IDs to exclude: purgatory + rejected announcements 3056 // Get event IDs to exclude: purgatory + rejected announcements
3043 let purgatory_ids = self.purgatory.event_ids(); 3057 let purgatory_ids = self.purgatory.event_ids();
3044 let rejected_ids = self.rejected_events_index.get_all_event_ids(); 3058 let rejected_ids = self.rejected_events_index.get_all_event_ids();
3045 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); 3059 let excluded_ids: HashSet<EventId> =
3060 purgatory_ids.union(&rejected_ids).cloned().collect();
3046 3061
3047 for (idx, result) in diff_results { 3062 for (idx, result) in diff_results {
3048 match result { 3063 match result {
@@ -3166,8 +3181,7 @@ impl SyncManager {
3166 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { 3181 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) {
3167 batch.outstanding_subs.extend(subscription_ids.clone()); 3182 batch.outstanding_subs.extend(subscription_ids.clone());
3168 // Store requested event IDs for validation after EOSE 3183 // Store requested event IDs for validation after EOSE
3169 batch.requested_event_ids = 3184 batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect());
3170 Some(all_remote_ids.iter().cloned().collect());
3171 batch.received_event_ids = Some(HashSet::new()); 3185 batch.received_event_ids = Some(HashSet::new());
3172 } 3186 }
3173 } 3187 }
@@ -3359,20 +3373,16 @@ mod tests {
3359 async fn test_rejected_events_excluded_from_negentropy() { 3373 async fn test_rejected_events_excluded_from_negentropy() {
3360 // Create indices 3374 // Create indices
3361 let purgatory_ids: HashSet<EventId> = HashSet::new(); 3375 let purgatory_ids: HashSet<EventId> = HashSet::new();
3362 let rejected_index = RejectedEventsIndex::new( 3376 let rejected_index =
3363 Duration::from_secs(120), 3377 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
3364 Duration::from_secs(604800),
3365 );
3366 3378
3367 // Create test event IDs 3379 // Create test event IDs
3368 let _rejected_id = EventId::from_hex( 3380 let _rejected_id =
3369 "0000000000000000000000000000000000000000000000000000000000000001", 3381 EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000001")
3370 ) 3382 .unwrap();
3371 .unwrap(); 3383 let valid_id =
3372 let valid_id = EventId::from_hex( 3384 EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000002")
3373 "0000000000000000000000000000000000000000000000000000000000000002", 3385 .unwrap();
3374 )
3375 .unwrap();
3376 3386
3377 // Add rejected event to index 3387 // Add rejected event to index
3378 let keys = Keys::generate(); 3388 let keys = Keys::generate();
@@ -3383,7 +3393,7 @@ mod tests {
3383 )) 3393 ))
3384 .sign_with_keys(&keys) 3394 .sign_with_keys(&keys)
3385 .unwrap(); 3395 .unwrap();
3386 3396
3387 // Override the event ID for testing (we need a specific ID) 3397 // Override the event ID for testing (we need a specific ID)
3388 // Since we can't override the ID, let's use the actual event ID 3398 // Since we can't override the ID, let's use the actual event ID
3389 let rejected_id = rejected_event.id; 3399 let rejected_id = rejected_event.id;
@@ -3403,8 +3413,7 @@ mod tests {
3403 remote_ids.insert(valid_id); 3413 remote_ids.insert(valid_id);
3404 3414
3405 // Exclude rejected and purgatory events 3415 // Exclude rejected and purgatory events
3406 let excluded_ids: HashSet<EventId> = 3416 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect();
3407 purgatory_ids.union(&rejected_ids).cloned().collect();
3408 let filtered_ids: HashSet<EventId> = 3417 let filtered_ids: HashSet<EventId> =
3409 remote_ids.difference(&excluded_ids).cloned().collect(); 3418 remote_ids.difference(&excluded_ids).cloned().collect();
3410 3419
@@ -3422,22 +3431,14 @@ mod tests {
3422 // Requested 5 events from negentropy diff 3431 // Requested 5 events from negentropy diff
3423 let mut requested: HashSet<EventId> = HashSet::new(); 3432 let mut requested: HashSet<EventId> = HashSet::new();
3424 for i in 1u8..=5 { 3433 for i in 1u8..=5 {
3425 let id = EventId::from_hex(&format!( 3434 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3426 "{:0>64}",
3427 format!("{:x}", i)
3428 ))
3429 .unwrap();
3430 requested.insert(id); 3435 requested.insert(id);
3431 } 3436 }
3432 3437
3433 // Only received 3 events (simulating relay limit) 3438 // Only received 3 events (simulating relay limit)
3434 let mut received: HashSet<EventId> = HashSet::new(); 3439 let mut received: HashSet<EventId> = HashSet::new();
3435 for i in 1u8..=3 { 3440 for i in 1u8..=3 {
3436 let id = EventId::from_hex(&format!( 3441 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3437 "{:0>64}",
3438 format!("{:x}", i)
3439 ))
3440 .unwrap();
3441 received.insert(id); 3442 received.insert(id);
3442 } 3443 }
3443 3444
@@ -3461,11 +3462,7 @@ mod tests {
3461 // Simulate scenario where all requested events are received 3462 // Simulate scenario where all requested events are received
3462 let mut requested: HashSet<EventId> = HashSet::new(); 3463 let mut requested: HashSet<EventId> = HashSet::new();
3463 for i in 1u8..=3 { 3464 for i in 1u8..=3 {
3464 let id = EventId::from_hex(&format!( 3465 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3465 "{:0>64}",
3466 format!("{:x}", i)
3467 ))
3468 .unwrap();
3469 requested.insert(id); 3466 requested.insert(id);
3470 } 3467 }
3471 3468
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index a9d7a4d..403792a 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -190,9 +190,7 @@ impl HotCache {
190 let now = Instant::now(); 190 let now = Instant::now();
191 let initial_count = entries.len(); 191 let initial_count = entries.len();
192 192
193 entries.retain(|_, entry| { 193 entries.retain(|_, entry| now.duration_since(entry.cached_at) < self.expiry_duration);
194 now.duration_since(entry.cached_at) < self.expiry_duration
195 });
196 194
197 initial_count - entries.len() 195 initial_count - entries.len()
198 } 196 }
@@ -284,9 +282,7 @@ impl ColdIndex {
284 let now = Instant::now(); 282 let now = Instant::now();
285 let initial_count = entries.len(); 283 let initial_count = entries.len();
286 284
287 entries.retain(|_, entry| { 285 entries.retain(|_, entry| now.duration_since(entry.rejected_at) < self.expiry_duration);
288 now.duration_since(entry.rejected_at) < self.expiry_duration
289 });
290 286
291 initial_count - entries.len() 287 initial_count - entries.len()
292 } 288 }
@@ -389,12 +385,8 @@ impl RejectedEventsIndex {
389 reason: RejectionReason, 385 reason: RejectionReason,
390 ) { 386 ) {
391 // Add to hot cache (full event) 387 // Add to hot cache (full event)
392 self.hot_cache.add( 388 self.hot_cache
393 event.clone(), 389 .add(event.clone(), pubkey, identifier.clone(), reason);
394 pubkey,
395 identifier.clone(),
396 reason,
397 );
398 390
399 // Add to cold index (metadata only) 391 // Add to cold index (metadata only)
400 self.cold_index.add(event.id, pubkey, identifier, reason); 392 self.cold_index.add(event.id, pubkey, identifier, reason);
@@ -419,12 +411,8 @@ impl RejectedEventsIndex {
419 reason: RejectionReason, 411 reason: RejectionReason,
420 ) { 412 ) {
421 // Add to hot cache (full event) 413 // Add to hot cache (full event)
422 self.hot_cache.add( 414 self.hot_cache
423 event.clone(), 415 .add(event.clone(), pubkey, identifier.clone(), reason);
424 pubkey,
425 identifier.clone(),
426 reason,
427 );
428 416
429 // Add to cold index (metadata only) 417 // Add to cold index (metadata only)
430 self.cold_index.add(event.id, pubkey, identifier, reason); 418 self.cold_index.add(event.id, pubkey, identifier, reason);
@@ -608,8 +596,7 @@ mod tests {
608 596
609 async fn create_test_event() -> Event { 597 async fn create_test_event() -> Event {
610 let keys = Keys::generate(); 598 let keys = Keys::generate();
611 let unsigned = nostr_sdk::EventBuilder::text_note("test") 599 let unsigned = nostr_sdk::EventBuilder::text_note("test").build(keys.public_key());
612 .build(keys.public_key());
613 keys.sign_event(unsigned).await.unwrap() 600 keys.sign_event(unsigned).await.unwrap()
614 } 601 }
615 602
@@ -695,10 +682,7 @@ mod tests {
695 682
696 #[tokio::test] 683 #[tokio::test]
697 async fn test_two_tier_index_add_and_contains() { 684 async fn test_two_tier_index_add_and_contains() {
698 let index = RejectedEventsIndex::new( 685 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
699 Duration::from_secs(120),
700 Duration::from_secs(604800),
701 );
702 let event = create_test_event().await; 686 let event = create_test_event().await;
703 687
704 index.add_announcement( 688 index.add_announcement(
@@ -715,10 +699,7 @@ mod tests {
715 699
716 #[tokio::test] 700 #[tokio::test]
717 async fn test_invalidate_and_get_events() { 701 async fn test_invalidate_and_get_events() {
718 let index = RejectedEventsIndex::new( 702 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
719 Duration::from_secs(120),
720 Duration::from_secs(604800),
721 );
722 let event = create_test_event().await; 703 let event = create_test_event().await;
723 let pubkey = event.pubkey; 704 let pubkey = event.pubkey;
724 let identifier = "test-repo".to_string(); 705 let identifier = "test-repo".to_string();
@@ -773,10 +754,8 @@ mod tests {
773 754
774 #[tokio::test] 755 #[tokio::test]
775 async fn test_hot_cache_miss_after_expiry() { 756 async fn test_hot_cache_miss_after_expiry() {
776 let index = RejectedEventsIndex::new( 757 let index =
777 Duration::from_millis(50), 758 RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800));
778 Duration::from_secs(604800),
779 );
780 let event = create_test_event().await; 759 let event = create_test_event().await;
781 let pubkey = event.pubkey; 760 let pubkey = event.pubkey;
782 let identifier = "test-repo".to_string(); 761 let identifier = "test-repo".to_string();
@@ -801,20 +780,15 @@ mod tests {
801 780
802 #[tokio::test] 781 #[tokio::test]
803 async fn test_multiple_maintainer_repos() { 782 async fn test_multiple_maintainer_repos() {
804 let index = RejectedEventsIndex::new( 783 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
805 Duration::from_secs(120),
806 Duration::from_secs(604800),
807 );
808 784
809 let keys1 = Keys::generate(); 785 let keys1 = Keys::generate();
810 let keys2 = Keys::generate(); 786 let keys2 = Keys::generate();
811 787
812 let unsigned1 = nostr_sdk::EventBuilder::text_note("test1") 788 let unsigned1 = nostr_sdk::EventBuilder::text_note("test1").build(keys1.public_key());
813 .build(keys1.public_key());
814 let event1 = keys1.sign_event(unsigned1).await.unwrap(); 789 let event1 = keys1.sign_event(unsigned1).await.unwrap();
815 790
816 let unsigned2 = nostr_sdk::EventBuilder::text_note("test2") 791 let unsigned2 = nostr_sdk::EventBuilder::text_note("test2").build(keys2.public_key());
817 .build(keys2.public_key());
818 let event2 = keys2.sign_event(unsigned2).await.unwrap(); 792 let event2 = keys2.sign_event(unsigned2).await.unwrap();
819 793
820 // Add two different maintainer repos 794 // Add two different maintainer repos
@@ -836,8 +810,7 @@ mod tests {
836 assert_eq!(index.cold_index_len(), 2); 810 assert_eq!(index.cold_index_len(), 2);
837 811
838 // Invalidate only first maintainer 812 // Invalidate only first maintainer
839 let (removed, hot_events) = 813 let (removed, hot_events) = index.invalidate_and_get_events(&event1.pubkey, "repo1");
840 index.invalidate_and_get_events(&event1.pubkey, "repo1");
841 814
842 assert_eq!(removed, 1); 815 assert_eq!(removed, 1);
843 assert_eq!(hot_events.len(), 1); 816 assert_eq!(hot_events.len(), 1);
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index d0090c8..b86d298 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -526,31 +526,46 @@ mod tests {
526 #[test] 526 #[test]
527 fn test_normalize_url_with_wss_scheme() { 527 fn test_normalize_url_with_wss_scheme() {
528 let url = "wss://relay.example.com"; 528 let url = "wss://relay.example.com";
529 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); 529 assert_eq!(
530 RelayConnection::normalize_url(url),
531 "wss://relay.example.com"
532 );
530 } 533 }
531 534
532 #[test] 535 #[test]
533 fn test_normalize_url_with_ws_scheme() { 536 fn test_normalize_url_with_ws_scheme() {
534 let url = "ws://relay.example.com"; 537 let url = "ws://relay.example.com";
535 assert_eq!(RelayConnection::normalize_url(url), "ws://relay.example.com"); 538 assert_eq!(
539 RelayConnection::normalize_url(url),
540 "ws://relay.example.com"
541 );
536 } 542 }
537 543
538 #[test] 544 #[test]
539 fn test_normalize_url_without_scheme() { 545 fn test_normalize_url_without_scheme() {
540 let url = "relay.example.com"; 546 let url = "relay.example.com";
541 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); 547 assert_eq!(
548 RelayConnection::normalize_url(url),
549 "wss://relay.example.com"
550 );
542 } 551 }
543 552
544 #[test] 553 #[test]
545 fn test_normalize_url_without_scheme_with_port() { 554 fn test_normalize_url_without_scheme_with_port() {
546 let url = "relay.example.com:8080"; 555 let url = "relay.example.com:8080";
547 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com:8080"); 556 assert_eq!(
557 RelayConnection::normalize_url(url),
558 "wss://relay.example.com:8080"
559 );
548 } 560 }
549 561
550 #[test] 562 #[test]
551 fn test_normalize_url_with_path() { 563 fn test_normalize_url_with_path() {
552 let url = "relay.example.com/nostr"; 564 let url = "relay.example.com/nostr";
553 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com/nostr"); 565 assert_eq!(
566 RelayConnection::normalize_url(url),
567 "wss://relay.example.com/nostr"
568 );
554 } 569 }
555 570
556 #[test] 571 #[test]
@@ -587,6 +602,9 @@ mod tests {
587 fn test_normalize_url_real_world_example() { 602 fn test_normalize_url_real_world_example() {
588 // Test the exact case from the bug report 603 // Test the exact case from the bug report
589 let url = "git.shakespeare.diy"; 604 let url = "git.shakespeare.diy";
590 assert_eq!(RelayConnection::normalize_url(url), "wss://git.shakespeare.diy"); 605 assert_eq!(
606 RelayConnection::normalize_url(url),
607 "wss://git.shakespeare.diy"
608 );
591 } 609 }
592} 610}