diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 08:35:12 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 08:35:12 +0000 |
| commit | 5eb736e1184e313efa65237bf1973dee21afb43f (patch) | |
| tree | 3884ab1a806c0b5f3335b72af7648adc1c011181 /src | |
| parent | c4142e99c2f21f0ce7ec2618a0bb1fd39c72ce07 (diff) | |
feat(sync): track and exclude rejected announcement events
Implement RejectedEventsIndex to prevent repeatedly fetching and
processing announcement events (kinds 30617/30618) that have been
rejected by the write policy.
Changes:
- Add RejectedEventsIndex to track rejected announcement EventIds
- Record rejections in process_event_static when announcements fail
write policy validation
- Exclude rejected events from negentropy sync (along with purgatory)
- Skip rejected events early in REQ+EOSE processing
- Add 2 tests verifying tracking and exclusion logic
Benefits:
- Reduced network traffic (no re-fetching of known-bad events)
- Lower CPU usage (no repeated validation)
- Faster sync (smaller negentropy diffs)
- Better observability (trace logging when skipping)
Scope limited to announcements as they are the primary source of
repeated rejection cycles during Layer 1 sync.
Closes: Reduces wasted bandwidth from continually fetching rejected events
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/mod.rs | 124 |
1 files changed, 118 insertions, 6 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6da2644..ecfd020 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -64,6 +64,11 @@ pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>; | |||
| 64 | /// Key: relay URL | 64 | /// Key: relay URL |
| 65 | pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | 65 | pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; |
| 66 | 66 | ||
| 67 | /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. | ||
| 68 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing | ||
| 69 | /// to avoid repeatedly fetching and rejecting the same events. | ||
| 70 | pub type RejectedEventsIndex = Arc<RwLock<HashSet<EventId>>>; | ||
| 71 | |||
| 67 | // ============================================================================= | 72 | // ============================================================================= |
| 68 | // Supporting Data Structures | 73 | // Supporting Data Structures |
| 69 | // ============================================================================= | 74 | // ============================================================================= |
| @@ -389,6 +394,8 @@ pub struct SyncManager { | |||
| 389 | relay_sync_index: RelaySyncIndex, | 394 | relay_sync_index: RelaySyncIndex, |
| 390 | /// In-flight subscription batches | 395 | /// In-flight subscription batches |
| 391 | pending_sync_index: PendingSyncIndex, | 396 | pending_sync_index: PendingSyncIndex, |
| 397 | /// Rejected announcement event IDs (30617/30618) - excluded from sync | ||
| 398 | rejected_events_index: RejectedEventsIndex, | ||
| 392 | /// Active relay connections - keyed by relay URL | 399 | /// Active relay connections - keyed by relay URL |
| 393 | connections: HashMap<String, RelayConnection>, | 400 | connections: HashMap<String, RelayConnection>, |
| 394 | /// Health tracker for relay connection state | 401 | /// Health tracker for relay connection state |
| @@ -441,6 +448,7 @@ impl SyncManager { | |||
| 441 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 448 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 442 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 449 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 443 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | 450 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 451 | rejected_events_index: Arc::new(RwLock::new(HashSet::new())), | ||
| 444 | connections: HashMap::new(), | 452 | connections: HashMap::new(), |
| 445 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 453 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 446 | next_batch_id: 0, | 454 | next_batch_id: 0, |
| @@ -1022,6 +1030,7 @@ impl SyncManager { | |||
| 1022 | let metrics_clone = self.metrics.clone(); | 1030 | let metrics_clone = self.metrics.clone(); |
| 1023 | let pending_sync_index = Arc::clone(&self.pending_sync_index); | 1031 | let pending_sync_index = Arc::clone(&self.pending_sync_index); |
| 1024 | let health_tracker = Arc::clone(&self.health_tracker); | 1032 | let health_tracker = Arc::clone(&self.health_tracker); |
| 1033 | let rejected_events_index = Arc::clone(&self.rejected_events_index); | ||
| 1025 | 1034 | ||
| 1026 | tokio::spawn(async move { | 1035 | tokio::spawn(async move { |
| 1027 | let mut disconnect_sent = false; | 1036 | let mut disconnect_sent = false; |
| @@ -1029,12 +1038,27 @@ impl SyncManager { | |||
| 1029 | while let Some(relay_event) = event_rx.recv().await { | 1038 | while let Some(relay_event) = event_rx.recv().await { |
| 1030 | match relay_event { | 1039 | match relay_event { |
| 1031 | RelayEvent::Event(event, subscription_id) => { | 1040 | RelayEvent::Event(event, subscription_id) => { |
| 1041 | // Skip events we've already rejected (announcements only) | ||
| 1042 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | ||
| 1043 | let rejected = rejected_events_index.read().await; | ||
| 1044 | if rejected.contains(&event.id) { | ||
| 1045 | tracing::trace!( | ||
| 1046 | event_id = %event.id, | ||
| 1047 | kind = %event.kind.as_u16(), | ||
| 1048 | relay = %relay_url_clone, | ||
| 1049 | "Skipping previously rejected announcement event" | ||
| 1050 | ); | ||
| 1051 | continue; | ||
| 1052 | } | ||
| 1053 | } | ||
| 1054 | |||
| 1032 | let result = Self::process_event_static( | 1055 | let result = Self::process_event_static( |
| 1033 | &event, | 1056 | &event, |
| 1034 | &relay_url_clone, | 1057 | &relay_url_clone, |
| 1035 | &database, | 1058 | &database, |
| 1036 | &write_policy, | 1059 | &write_policy, |
| 1037 | &local_relay, | 1060 | &local_relay, |
| 1061 | &rejected_events_index, | ||
| 1038 | ) | 1062 | ) |
| 1039 | .await; | 1063 | .await; |
| 1040 | // Only record metric when event is actually saved | 1064 | // Only record metric when event is actually saved |
| @@ -1644,6 +1668,7 @@ impl SyncManager { | |||
| 1644 | database: &SharedDatabase, | 1668 | database: &SharedDatabase, |
| 1645 | write_policy: &Nip34WritePolicy, | 1669 | write_policy: &Nip34WritePolicy, |
| 1646 | local_relay: &LocalRelay, | 1670 | local_relay: &LocalRelay, |
| 1671 | rejected_events_index: &RejectedEventsIndex, | ||
| 1647 | ) -> ProcessResult { | 1672 | ) -> ProcessResult { |
| 1648 | use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; | 1673 | use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; |
| 1649 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | 1674 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; |
| @@ -1706,9 +1731,22 @@ impl SyncManager { | |||
| 1706 | tracing::debug!( | 1731 | tracing::debug!( |
| 1707 | event_id = %event.id, | 1732 | event_id = %event.id, |
| 1708 | relay = %relay_url, | 1733 | relay = %relay_url, |
| 1734 | kind = %event.kind.as_u16(), | ||
| 1709 | reason = %message, | 1735 | reason = %message, |
| 1710 | "Event rejected by write policy" | 1736 | "Event rejected by write policy" |
| 1711 | ); | 1737 | ); |
| 1738 | |||
| 1739 | // Track rejected announcement events to avoid re-fetching them | ||
| 1740 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | ||
| 1741 | let mut rejected = rejected_events_index.write().await; | ||
| 1742 | rejected.insert(event.id); | ||
| 1743 | tracing::debug!( | ||
| 1744 | event_id = %event.id, | ||
| 1745 | kind = %event.kind.as_u16(), | ||
| 1746 | "Added rejected announcement to exclusion list" | ||
| 1747 | ); | ||
| 1748 | } | ||
| 1749 | |||
| 1712 | ProcessResult::Rejected | 1750 | ProcessResult::Rejected |
| 1713 | } | 1751 | } |
| 1714 | } | 1752 | } |
| @@ -2231,28 +2269,30 @@ impl SyncManager { | |||
| 2231 | let mut all_remote_ids = Vec::new(); | 2269 | let mut all_remote_ids = Vec::new(); |
| 2232 | let mut failed_count = 0; | 2270 | let mut failed_count = 0; |
| 2233 | 2271 | ||
| 2234 | // Get event IDs currently in purgatory to avoid fetching them | 2272 | // Get event IDs to exclude: purgatory + rejected announcements |
| 2235 | let purgatory_ids = self.purgatory.event_ids(); | 2273 | let purgatory_ids = self.purgatory.event_ids(); |
| 2274 | let rejected_ids = self.rejected_events_index.read().await.clone(); | ||
| 2275 | let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); | ||
| 2236 | 2276 | ||
| 2237 | for (idx, result) in diff_results { | 2277 | for (idx, result) in diff_results { |
| 2238 | match result { | 2278 | match result { |
| 2239 | Ok(reconciliation) => { | 2279 | Ok(reconciliation) => { |
| 2240 | let remote_excluding_purgatory_ids: HashSet<EventId> = reconciliation | 2280 | let remote_excluding_ids: HashSet<EventId> = reconciliation |
| 2241 | .remote | 2281 | .remote |
| 2242 | .difference(&purgatory_ids) | 2282 | .difference(&excluded_ids) |
| 2243 | .cloned() | 2283 | .cloned() |
| 2244 | .collect(); | 2284 | .collect(); |
| 2245 | let remote_count = remote_excluding_purgatory_ids.len(); | 2285 | let remote_count = remote_excluding_ids.len(); |
| 2246 | tracing::debug!( | 2286 | tracing::debug!( |
| 2247 | relay = %relay_url, | 2287 | relay = %relay_url, |
| 2248 | filter_idx = idx, | 2288 | filter_idx = idx, |
| 2249 | remote_count = remote_count, | 2289 | remote_count = remote_count, |
| 2250 | local_count = reconciliation.local.len(), | 2290 | local_count = reconciliation.local.len(), |
| 2251 | remote_ids = ?remote_excluding_purgatory_ids, | 2291 | remote_ids = ?remote_excluding_ids, |
| 2252 | "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx | 2292 | "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx |
| 2253 | ); | 2293 | ); |
| 2254 | if remote_count > 0 { | 2294 | if remote_count > 0 { |
| 2255 | all_remote_ids.extend(remote_excluding_purgatory_ids.into_iter()); | 2295 | all_remote_ids.extend(remote_excluding_ids.into_iter()); |
| 2256 | } | 2296 | } |
| 2257 | } | 2297 | } |
| 2258 | Err(e) => { | 2298 | Err(e) => { |
| @@ -2496,3 +2536,75 @@ impl SyncManager { | |||
| 2496 | tracing::info!("SyncManager shutdown complete"); | 2536 | tracing::info!("SyncManager shutdown complete"); |
| 2497 | } | 2537 | } |
| 2498 | } | 2538 | } |
| 2539 | |||
| 2540 | #[cfg(test)] | ||
| 2541 | mod tests { | ||
| 2542 | use super::*; | ||
| 2543 | use nostr_sdk::prelude::*; | ||
| 2544 | |||
| 2545 | #[tokio::test] | ||
| 2546 | async fn test_rejected_events_index_tracks_announcements() { | ||
| 2547 | // Create a rejected events index | ||
| 2548 | let rejected_index: RejectedEventsIndex = Arc::new(RwLock::new(HashSet::new())); | ||
| 2549 | |||
| 2550 | // Create test announcement event (kind 30617) | ||
| 2551 | let keys = Keys::generate(); | ||
| 2552 | let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content") | ||
| 2553 | .sign_with_keys(&keys) | ||
| 2554 | .unwrap(); | ||
| 2555 | |||
| 2556 | // Verify index is empty | ||
| 2557 | { | ||
| 2558 | let rejected = rejected_index.read().await; | ||
| 2559 | assert_eq!(rejected.len(), 0); | ||
| 2560 | } | ||
| 2561 | |||
| 2562 | // Simulate rejection by adding to index | ||
| 2563 | { | ||
| 2564 | let mut rejected = rejected_index.write().await; | ||
| 2565 | rejected.insert(announcement.id); | ||
| 2566 | } | ||
| 2567 | |||
| 2568 | // Verify event is tracked | ||
| 2569 | { | ||
| 2570 | let rejected = rejected_index.read().await; | ||
| 2571 | assert!(rejected.contains(&announcement.id)); | ||
| 2572 | assert_eq!(rejected.len(), 1); | ||
| 2573 | } | ||
| 2574 | } | ||
| 2575 | |||
| 2576 | #[tokio::test] | ||
| 2577 | async fn test_rejected_events_excluded_from_negentropy() { | ||
| 2578 | // Create indices | ||
| 2579 | let purgatory_ids: HashSet<EventId> = HashSet::new(); | ||
| 2580 | let mut rejected_ids = HashSet::new(); | ||
| 2581 | |||
| 2582 | // Create test event IDs | ||
| 2583 | let rejected_id = EventId::from_hex( | ||
| 2584 | "0000000000000000000000000000000000000000000000000000000000000001", | ||
| 2585 | ) | ||
| 2586 | .unwrap(); | ||
| 2587 | let valid_id = EventId::from_hex( | ||
| 2588 | "0000000000000000000000000000000000000000000000000000000000000002", | ||
| 2589 | ) | ||
| 2590 | .unwrap(); | ||
| 2591 | |||
| 2592 | rejected_ids.insert(rejected_id); | ||
| 2593 | |||
| 2594 | // Simulate negentropy reconciliation result | ||
| 2595 | let mut remote_ids = HashSet::new(); | ||
| 2596 | remote_ids.insert(rejected_id); | ||
| 2597 | remote_ids.insert(valid_id); | ||
| 2598 | |||
| 2599 | // Exclude rejected and purgatory events | ||
| 2600 | let excluded_ids: HashSet<EventId> = | ||
| 2601 | purgatory_ids.union(&rejected_ids).cloned().collect(); | ||
| 2602 | let filtered_ids: HashSet<EventId> = | ||
| 2603 | remote_ids.difference(&excluded_ids).cloned().collect(); | ||
| 2604 | |||
| 2605 | // Verify rejected event is excluded | ||
| 2606 | assert!(!filtered_ids.contains(&rejected_id)); | ||
| 2607 | assert!(filtered_ids.contains(&valid_id)); | ||
| 2608 | assert_eq!(filtered_ids.len(), 1); | ||
| 2609 | } | ||
| 2610 | } | ||