From b28a356cb41077ccee12a9c52f4ef2054e76cac6 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 19:58:41 +0000 Subject: chore: cargo fmt --- src/sync/mod.rs | 185 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 91 insertions(+), 94 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed3b78c..07527c7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; pub use metrics::SyncMetrics; // Re-export rejected index types -pub use rejected_index::{RejectionReason}; +pub use rejected_index::RejectionReason; // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used // Current code still uses the simple HashSet type alias below @@ -73,7 +73,7 @@ pub type PendingSyncIndex = Arc>>>; /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing /// to avoid repeatedly fetching and rejecting the same events. -/// +/// /// Uses the two-tier RejectedEventsIndex from rejected_index.rs: /// - Hot cache: Full events for 2 minutes (enables immediate re-processing) /// - Cold index: Metadata for 7 days (prevents repeated downloads) @@ -113,7 +113,9 @@ impl ConnectionStatus { pub fn is_live_sync_active(&self) -> bool { matches!( self, - ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedHistoricSyncFailures + ConnectionStatus::Syncing + | ConnectionStatus::Connected + | ConnectionStatus::ConnectedHistoricSyncFailures ) } } @@ -384,9 +386,7 @@ async fn run_rejected_index_cleanup( let hot_cache_interval = Duration::from_secs(60); let cold_index_interval = Duration::from_secs(86400); // 24 hours - tracing::info!( - "Rejected index cleanup started (hot cache: 60s, cold index: daily)" - ); + tracing::info!("Rejected index cleanup started (hot cache: 60s, cold index: daily)"); let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); let mut cold_index_timer = tokio::time::interval(cold_index_interval); @@ -399,7 +399,7 @@ async fn run_rejected_index_cleanup( tokio::select! { _ = hot_cache_timer.tick() => { let manager = sync_manager.lock().await; - + // Clean up announcements index let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); if hot_expired > 0 { @@ -408,7 +408,7 @@ async fn run_rejected_index_cleanup( hot_expired ); } - + // Clean up states index let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); if states_hot_expired > 0 { @@ -420,7 +420,7 @@ async fn run_rejected_index_cleanup( } _ = cold_index_timer.tick() => { let manager = sync_manager.lock().await; - + // Clean up announcements index let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); if cold_expired > 0 { @@ -429,7 +429,7 @@ async fn run_rejected_index_cleanup( cold_expired ); } - + // Clean up states index let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); if states_cold_expired > 0 { @@ -799,8 +799,7 @@ impl SyncManager { if let (Some(requested), Some(received)) = (&batch.requested_event_ids, &batch.received_event_ids) { - let missing: Vec = - requested.difference(received).cloned().collect(); + let missing: Vec = requested.difference(received).cloned().collect(); if !missing.is_empty() { let requested_count = requested.len(); @@ -884,13 +883,11 @@ impl SyncManager { // Re-acquire lock and update batch with new subscriptions let mut pending = self.pending_sync_index.write().await; if let Some(batches) = pending.get_mut(&relay_url_for_retry) { - if let Some(batch) = - batches.iter_mut().find(|b| b.batch_id == batch_id) + if let Some(batch) = batches.iter_mut().find(|b| b.batch_id == batch_id) { batch.outstanding_subs.extend(new_sub_ids.clone()); // Update requested_event_ids to only include missing ones - batch.requested_event_ids = - Some(missing.iter().cloned().collect()); + batch.requested_event_ids = Some(missing.iter().cloned().collect()); // Clear received_event_ids for fresh tracking batch.received_event_ids = Some(HashSet::new()); // Increment retry counter @@ -921,14 +918,14 @@ impl SyncManager { // Re-acquire lock to extract the batch let mut pending = self.pending_sync_index.write().await; if let Some(batches) = pending.get_mut(&relay_url_for_retry) { - if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) - { + if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); - self.confirm_batch(&relay_url_for_retry, completed_batch).await; + self.confirm_batch(&relay_url_for_retry, completed_batch) + .await; } } return; @@ -1023,17 +1020,17 @@ impl SyncManager { "Batch completed but no RelayState found for relay" ); } - + // Release lock before checking if historic sync is complete drop(relay_index); - + // Spawn background task to check if historic sync is complete // This avoids blocking the confirm_batch flow for 6 seconds let relay_url = relay_url.to_string(); let pending_index = self.pending_sync_index.clone(); let relay_index = self.relay_sync_index.clone(); let metrics = self.metrics.clone(); - + tokio::spawn(async move { Self::check_and_complete_historic_sync_impl( &relay_url, @@ -1073,29 +1070,33 @@ impl SyncManager { // First check: Are there any pending batches? let has_pending = { let pending = pending_index.read().await; - pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) + pending + .get(relay_url) + .is_some_and(|batches| !batches.is_empty()) }; - + if has_pending { // Still syncing, don't transition yet return; } - + // Wait for self-subscriber batch window + buffer to catch any in-flight events // that might create new Layer 2/3 filters tokio::time::sleep(Duration::from_millis(6000)).await; - + // Second check: Are there still no pending batches? let has_pending = { let pending = pending_index.read().await; - pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) + pending + .get(relay_url) + .is_some_and(|batches| !batches.is_empty()) }; - + if has_pending { // New batches appeared during the wait - still syncing return; } - + // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded let mut relay_index_guard = relay_index.write().await; if let Some(state) = relay_index_guard.get_mut(relay_url) { @@ -1106,11 +1107,11 @@ impl SyncManager { } else { ConnectionStatus::Connected }; - + state.connection_status = new_status; state.historic_sync_completed = true; state.historic_sync_completed_at = Some(Timestamp::now()); - + tracing::info!( relay = %relay_url, repos_synced = state.repos.len(), @@ -1120,7 +1121,7 @@ impl SyncManager { "Historic sync complete - transitioned to {} status", if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } ); - + // Update metrics if let Some(ref metrics) = metrics { metrics.record_connection_status(relay_url, new_status); @@ -1362,8 +1363,8 @@ impl SyncManager { ); return; } - Some(ConnectionStatus::Syncing) - | Some(ConnectionStatus::Connected) + Some(ConnectionStatus::Syncing) + | Some(ConnectionStatus::Connected) | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { // Continue to subscribe - live sync is active, can accept new filters } @@ -1468,7 +1469,8 @@ impl SyncManager { match relay_event { RelayEvent::Event(event, subscription_id) => { // Skip events we've already rejected (announcements only) - if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState) + if (event.kind == Kind::GitRepoAnnouncement + || event.kind == Kind::RepoState) && rejected_events_index.contains(&event.id) { tracing::trace!( @@ -1479,7 +1481,7 @@ impl SyncManager { ); continue; } - + let result = Self::process_event_static( &event, &relay_url_clone, @@ -1863,11 +1865,16 @@ impl SyncManager { // Create RelayConnection if not exists if !self.connections.contains_key(&relay_url) { // Get relay owner keys for NIP-42 authentication - let keys = self.config.relay_owner_keys() + let keys = self + .config + .relay_owner_keys() .expect("relay_owner_keys should be available"); - - let connection = - RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database), keys); + + let connection = RelayConnection::new_with_database( + relay_url.clone(), + Arc::clone(&self.database), + keys, + ); self.connections.insert(relay_url.clone(), connection); tracing::debug!(relay = %relay_url, "Registered new relay connection"); } @@ -1919,7 +1926,7 @@ impl SyncManager { state.connection_status = ConnectionStatus::Connecting; } } - + // Update metrics to show connecting status if let Some(ref metrics) = self.metrics { metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); @@ -1974,7 +1981,8 @@ impl SyncManager { if let Some(ref metrics) = self.metrics { metrics.record_connection_attempt(relay_url, false); metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); - metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + metrics + .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); } } } @@ -2172,7 +2180,7 @@ impl SyncManager { // immediately and should now pass validation. if event.kind == Kind::GitRepoAnnouncement { use crate::nostr::events::RepositoryAnnouncement; - + match RepositoryAnnouncement::from_event(event.clone()) { Ok(announcement) => { if !announcement.maintainers.is_empty() { @@ -2219,15 +2227,16 @@ impl SyncManager { // 2. Second attempt uses maintainer exception (different code path) // 3. If second attempt fails, stays in cold index only (no third attempt) // Use Box::pin to avoid infinitely sized future - let reprocess_result = Box::pin(Self::process_event_static( - &maintainer_event, - relay_url, - database, - write_policy, - local_relay, - rejected_events_index, - )) - .await; + let reprocess_result = + Box::pin(Self::process_event_static( + &maintainer_event, + relay_url, + database, + write_policy, + local_relay, + rejected_events_index, + )) + .await; match reprocess_result { ProcessResult::Saved => { @@ -2275,7 +2284,7 @@ impl SyncManager { ); } } - + // When a repository announcement is accepted, re-process any state events // that were previously rejected because no announcement existed. // This handles the race condition where state events arrive before their @@ -2284,7 +2293,10 @@ impl SyncManager { Ok(announcement) => { // Get the announcement author's state events that were rejected let (removed, hot_events) = rejected_events_index - .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier); + .invalidate_and_get_state_events( + &event.pubkey, + &announcement.identifier, + ); if removed > 0 { tracing::info!( @@ -2357,7 +2369,7 @@ impl SyncManager { } } } - + // When a state event is accepted (git data arrived), re-process any other // rejected state events for the same repository. This handles the case where // multiple state events arrive but only one has git data initially. @@ -2454,7 +2466,7 @@ impl SyncManager { reason = %message, "Event rejected by write policy" ); - + // Track rejected announcement and state events to avoid re-fetching them if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { // Extract identifier from 'd' tag @@ -2465,12 +2477,14 @@ impl SyncManager { .and_then(|t| t.content()) { // Determine rejection reason based on message - let reason = if message.contains("doesn't list this service") - || message.contains("Announcement must list service") { + let reason = if message.contains("doesn't list this service") + || message.contains("Announcement must list service") + { rejected_index::RejectionReason::DoesNotListService - } else if message.contains("maintainer") + } else if message.contains("maintainer") || message.contains("no announcement exists") - || message.contains("not authorized") { + || message.contains("not authorized") + { rejected_index::RejectionReason::MaintainerNotYetValid } else { rejected_index::RejectionReason::Other @@ -2512,7 +2526,7 @@ impl SyncManager { ); } } - + ProcessResult::Rejected } } @@ -3042,7 +3056,8 @@ impl SyncManager { // Get event IDs to exclude: purgatory + rejected announcements let purgatory_ids = self.purgatory.event_ids(); let rejected_ids = self.rejected_events_index.get_all_event_ids(); - let excluded_ids: HashSet = purgatory_ids.union(&rejected_ids).cloned().collect(); + let excluded_ids: HashSet = + purgatory_ids.union(&rejected_ids).cloned().collect(); for (idx, result) in diff_results { match result { @@ -3166,8 +3181,7 @@ impl SyncManager { if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { batch.outstanding_subs.extend(subscription_ids.clone()); // Store requested event IDs for validation after EOSE - batch.requested_event_ids = - Some(all_remote_ids.iter().cloned().collect()); + batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect()); batch.received_event_ids = Some(HashSet::new()); } } @@ -3359,20 +3373,16 @@ mod tests { async fn test_rejected_events_excluded_from_negentropy() { // Create indices let purgatory_ids: HashSet = HashSet::new(); - let rejected_index = RejectedEventsIndex::new( - Duration::from_secs(120), - Duration::from_secs(604800), - ); + let rejected_index = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); // Create test event IDs - let _rejected_id = EventId::from_hex( - "0000000000000000000000000000000000000000000000000000000000000001", - ) - .unwrap(); - let valid_id = EventId::from_hex( - "0000000000000000000000000000000000000000000000000000000000000002", - ) - .unwrap(); + let _rejected_id = + EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000001") + .unwrap(); + let valid_id = + EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000002") + .unwrap(); // Add rejected event to index let keys = Keys::generate(); @@ -3383,7 +3393,7 @@ mod tests { )) .sign_with_keys(&keys) .unwrap(); - + // Override the event ID for testing (we need a specific ID) // Since we can't override the ID, let's use the actual event ID let rejected_id = rejected_event.id; @@ -3403,8 +3413,7 @@ mod tests { remote_ids.insert(valid_id); // Exclude rejected and purgatory events - let excluded_ids: HashSet = - purgatory_ids.union(&rejected_ids).cloned().collect(); + let excluded_ids: HashSet = purgatory_ids.union(&rejected_ids).cloned().collect(); let filtered_ids: HashSet = remote_ids.difference(&excluded_ids).cloned().collect(); @@ -3422,22 +3431,14 @@ mod tests { // Requested 5 events from negentropy diff let mut requested: HashSet = HashSet::new(); for i in 1u8..=5 { - let id = EventId::from_hex(&format!( - "{:0>64}", - format!("{:x}", i) - )) - .unwrap(); + let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); requested.insert(id); } // Only received 3 events (simulating relay limit) let mut received: HashSet = HashSet::new(); for i in 1u8..=3 { - let id = EventId::from_hex(&format!( - "{:0>64}", - format!("{:x}", i) - )) - .unwrap(); + let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); received.insert(id); } @@ -3461,11 +3462,7 @@ mod tests { // Simulate scenario where all requested events are received let mut requested: HashSet = HashSet::new(); for i in 1u8..=3 { - let id = EventId::from_hex(&format!( - "{:0>64}", - format!("{:x}", i) - )) - .unwrap(); + let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); requested.insert(id); } -- cgit v1.2.3