From 5eb736e1184e313efa65237bf1973dee21afb43f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 08:35:12 +0000 Subject: feat(sync): track and exclude rejected announcement events Implement RejectedEventsIndex to prevent repeatedly fetching and processing announcement events (kinds 30617/30618) that have been rejected by the write policy. Changes: - Add RejectedEventsIndex to track rejected announcement EventIds - Record rejections in process_event_static when announcements fail write policy validation - Exclude rejected events from negentropy sync (along with purgatory) - Skip rejected events early in REQ+EOSE processing - Add 2 tests verifying tracking and exclusion logic Benefits: - Reduced network traffic (no re-fetching of known-bad events) - Lower CPU usage (no repeated validation) - Faster sync (smaller negentropy diffs) - Better observability (trace logging when skipping) Scope limited to announcements as they are the primary source of repeated rejection cycles during Layer 1 sync. Closes: Reduces wasted bandwidth from continually fetching rejected events --- src/sync/mod.rs | 124 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 118 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6da2644..ecfd020 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -64,6 +64,11 @@ pub type RelaySyncIndex = Arc>>; /// Key: relay URL pub type PendingSyncIndex = Arc>>>; +/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. +/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing +/// to avoid repeatedly fetching and rejecting the same events. +pub type RejectedEventsIndex = Arc>>; + // ============================================================================= // Supporting Data Structures // ============================================================================= @@ -389,6 +394,8 @@ pub struct SyncManager { relay_sync_index: RelaySyncIndex, /// In-flight subscription batches pending_sync_index: PendingSyncIndex, + /// Rejected announcement event IDs (30617/30618) - excluded from sync + rejected_events_index: RejectedEventsIndex, /// Active relay connections - keyed by relay URL connections: HashMap, /// Health tracker for relay connection state @@ -441,6 +448,7 @@ impl SyncManager { repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), pending_sync_index: Arc::new(RwLock::new(HashMap::new())), + rejected_events_index: Arc::new(RwLock::new(HashSet::new())), connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, @@ -1022,6 +1030,7 @@ impl SyncManager { let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); let health_tracker = Arc::clone(&self.health_tracker); + let rejected_events_index = Arc::clone(&self.rejected_events_index); tokio::spawn(async move { let mut disconnect_sent = false; @@ -1029,12 +1038,27 @@ impl SyncManager { while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event, subscription_id) => { + // Skip events we've already rejected (announcements only) + if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { + let rejected = rejected_events_index.read().await; + if rejected.contains(&event.id) { + tracing::trace!( + event_id = %event.id, + kind = %event.kind.as_u16(), + relay = %relay_url_clone, + "Skipping previously rejected announcement event" + ); + continue; + } + } + let result = Self::process_event_static( &event, &relay_url_clone, &database, &write_policy, &local_relay, + &rejected_events_index, ) .await; // Only record metric when event is actually saved @@ -1644,6 +1668,7 @@ impl SyncManager { database: &SharedDatabase, write_policy: &Nip34WritePolicy, local_relay: &LocalRelay, + rejected_events_index: &RejectedEventsIndex, ) -> ProcessResult { use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -1706,9 +1731,22 @@ impl SyncManager { tracing::debug!( event_id = %event.id, relay = %relay_url, + kind = %event.kind.as_u16(), reason = %message, "Event rejected by write policy" ); + + // Track rejected announcement events to avoid re-fetching them + if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { + let mut rejected = rejected_events_index.write().await; + rejected.insert(event.id); + tracing::debug!( + event_id = %event.id, + kind = %event.kind.as_u16(), + "Added rejected announcement to exclusion list" + ); + } + ProcessResult::Rejected } } @@ -2231,28 +2269,30 @@ impl SyncManager { let mut all_remote_ids = Vec::new(); let mut failed_count = 0; - // Get event IDs currently in purgatory to avoid fetching them + // Get event IDs to exclude: purgatory + rejected announcements let purgatory_ids = self.purgatory.event_ids(); + let rejected_ids = self.rejected_events_index.read().await.clone(); + let excluded_ids: HashSet = purgatory_ids.union(&rejected_ids).cloned().collect(); for (idx, result) in diff_results { match result { Ok(reconciliation) => { - let remote_excluding_purgatory_ids: HashSet = reconciliation + let remote_excluding_ids: HashSet = reconciliation .remote - .difference(&purgatory_ids) + .difference(&excluded_ids) .cloned() .collect(); - let remote_count = remote_excluding_purgatory_ids.len(); + let remote_count = remote_excluding_ids.len(); tracing::debug!( relay = %relay_url, filter_idx = idx, remote_count = remote_count, local_count = reconciliation.local.len(), - remote_ids = ?remote_excluding_purgatory_ids, + remote_ids = ?remote_excluding_ids, "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx ); if remote_count > 0 { - all_remote_ids.extend(remote_excluding_purgatory_ids.into_iter()); + all_remote_ids.extend(remote_excluding_ids.into_iter()); } } Err(e) => { @@ -2496,3 +2536,75 @@ impl SyncManager { tracing::info!("SyncManager shutdown complete"); } } + +#[cfg(test)] +mod tests { + use super::*; + use nostr_sdk::prelude::*; + + #[tokio::test] + async fn test_rejected_events_index_tracks_announcements() { + // Create a rejected events index + let rejected_index: RejectedEventsIndex = Arc::new(RwLock::new(HashSet::new())); + + // Create test announcement event (kind 30617) + let keys = Keys::generate(); + let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content") + .sign_with_keys(&keys) + .unwrap(); + + // Verify index is empty + { + let rejected = rejected_index.read().await; + assert_eq!(rejected.len(), 0); + } + + // Simulate rejection by adding to index + { + let mut rejected = rejected_index.write().await; + rejected.insert(announcement.id); + } + + // Verify event is tracked + { + let rejected = rejected_index.read().await; + assert!(rejected.contains(&announcement.id)); + assert_eq!(rejected.len(), 1); + } + } + + #[tokio::test] + async fn test_rejected_events_excluded_from_negentropy() { + // Create indices + let purgatory_ids: HashSet = HashSet::new(); + let mut rejected_ids = HashSet::new(); + + // Create test event IDs + let rejected_id = EventId::from_hex( + "0000000000000000000000000000000000000000000000000000000000000001", + ) + .unwrap(); + let valid_id = EventId::from_hex( + "0000000000000000000000000000000000000000000000000000000000000002", + ) + .unwrap(); + + rejected_ids.insert(rejected_id); + + // Simulate negentropy reconciliation result + let mut remote_ids = HashSet::new(); + remote_ids.insert(rejected_id); + remote_ids.insert(valid_id); + + // Exclude rejected and purgatory events + let excluded_ids: HashSet = + purgatory_ids.union(&rejected_ids).cloned().collect(); + let filtered_ids: HashSet = + remote_ids.difference(&excluded_ids).cloned().collect(); + + // Verify rejected event is excluded + assert!(!filtered_ids.contains(&rejected_id)); + assert!(filtered_ids.contains(&valid_id)); + assert_eq!(filtered_ids.len(), 1); + } +} -- cgit v1.2.3