upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 08:35:12 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 08:35:12 +0000
commit5eb736e1184e313efa65237bf1973dee21afb43f (patch)
tree3884ab1a806c0b5f3335b72af7648adc1c011181
parentc4142e99c2f21f0ce7ec2618a0bb1fd39c72ce07 (diff)
feat(sync): track and exclude rejected announcement events
Implement RejectedEventsIndex to prevent repeatedly fetching and processing announcement events (kinds 30617/30618) that have been rejected by the write policy. Changes: - Add RejectedEventsIndex to track rejected announcement EventIds - Record rejections in process_event_static when announcements fail write policy validation - Exclude rejected events from negentropy sync (along with purgatory) - Skip rejected events early in REQ+EOSE processing - Add 2 tests verifying tracking and exclusion logic Benefits: - Reduced network traffic (no re-fetching of known-bad events) - Lower CPU usage (no repeated validation) - Faster sync (smaller negentropy diffs) - Better observability (trace logging when skipping) Scope limited to announcements as they are the primary source of repeated rejection cycles during Layer 1 sync. Closes: Reduces wasted bandwidth from continually fetching rejected events
-rw-r--r--src/sync/mod.rs124
1 files changed, 118 insertions, 6 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 6da2644..ecfd020 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -64,6 +64,11 @@ pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
64/// Key: relay URL 64/// Key: relay URL
65pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; 65pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
66 66
67/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync.
68/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing
69/// to avoid repeatedly fetching and rejecting the same events.
70pub type RejectedEventsIndex = Arc<RwLock<HashSet<EventId>>>;
71
67// ============================================================================= 72// =============================================================================
68// Supporting Data Structures 73// Supporting Data Structures
69// ============================================================================= 74// =============================================================================
@@ -389,6 +394,8 @@ pub struct SyncManager {
389 relay_sync_index: RelaySyncIndex, 394 relay_sync_index: RelaySyncIndex,
390 /// In-flight subscription batches 395 /// In-flight subscription batches
391 pending_sync_index: PendingSyncIndex, 396 pending_sync_index: PendingSyncIndex,
397 /// Rejected announcement event IDs (30617/30618) - excluded from sync
398 rejected_events_index: RejectedEventsIndex,
392 /// Active relay connections - keyed by relay URL 399 /// Active relay connections - keyed by relay URL
393 connections: HashMap<String, RelayConnection>, 400 connections: HashMap<String, RelayConnection>,
394 /// Health tracker for relay connection state 401 /// Health tracker for relay connection state
@@ -441,6 +448,7 @@ impl SyncManager {
441 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 448 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
442 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 449 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
443 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 450 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
451 rejected_events_index: Arc::new(RwLock::new(HashSet::new())),
444 connections: HashMap::new(), 452 connections: HashMap::new(),
445 health_tracker: Arc::new(RelayHealthTracker::new(config)), 453 health_tracker: Arc::new(RelayHealthTracker::new(config)),
446 next_batch_id: 0, 454 next_batch_id: 0,
@@ -1022,6 +1030,7 @@ impl SyncManager {
1022 let metrics_clone = self.metrics.clone(); 1030 let metrics_clone = self.metrics.clone();
1023 let pending_sync_index = Arc::clone(&self.pending_sync_index); 1031 let pending_sync_index = Arc::clone(&self.pending_sync_index);
1024 let health_tracker = Arc::clone(&self.health_tracker); 1032 let health_tracker = Arc::clone(&self.health_tracker);
1033 let rejected_events_index = Arc::clone(&self.rejected_events_index);
1025 1034
1026 tokio::spawn(async move { 1035 tokio::spawn(async move {
1027 let mut disconnect_sent = false; 1036 let mut disconnect_sent = false;
@@ -1029,12 +1038,27 @@ impl SyncManager {
1029 while let Some(relay_event) = event_rx.recv().await { 1038 while let Some(relay_event) = event_rx.recv().await {
1030 match relay_event { 1039 match relay_event {
1031 RelayEvent::Event(event, subscription_id) => { 1040 RelayEvent::Event(event, subscription_id) => {
1041 // Skip events we've already rejected (announcements only)
1042 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
1043 let rejected = rejected_events_index.read().await;
1044 if rejected.contains(&event.id) {
1045 tracing::trace!(
1046 event_id = %event.id,
1047 kind = %event.kind.as_u16(),
1048 relay = %relay_url_clone,
1049 "Skipping previously rejected announcement event"
1050 );
1051 continue;
1052 }
1053 }
1054
1032 let result = Self::process_event_static( 1055 let result = Self::process_event_static(
1033 &event, 1056 &event,
1034 &relay_url_clone, 1057 &relay_url_clone,
1035 &database, 1058 &database,
1036 &write_policy, 1059 &write_policy,
1037 &local_relay, 1060 &local_relay,
1061 &rejected_events_index,
1038 ) 1062 )
1039 .await; 1063 .await;
1040 // Only record metric when event is actually saved 1064 // Only record metric when event is actually saved
@@ -1644,6 +1668,7 @@ impl SyncManager {
1644 database: &SharedDatabase, 1668 database: &SharedDatabase,
1645 write_policy: &Nip34WritePolicy, 1669 write_policy: &Nip34WritePolicy,
1646 local_relay: &LocalRelay, 1670 local_relay: &LocalRelay,
1671 rejected_events_index: &RejectedEventsIndex,
1647 ) -> ProcessResult { 1672 ) -> ProcessResult {
1648 use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; 1673 use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult};
1649 use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 1674 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -1706,9 +1731,22 @@ impl SyncManager {
1706 tracing::debug!( 1731 tracing::debug!(
1707 event_id = %event.id, 1732 event_id = %event.id,
1708 relay = %relay_url, 1733 relay = %relay_url,
1734 kind = %event.kind.as_u16(),
1709 reason = %message, 1735 reason = %message,
1710 "Event rejected by write policy" 1736 "Event rejected by write policy"
1711 ); 1737 );
1738
1739 // Track rejected announcement events to avoid re-fetching them
1740 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
1741 let mut rejected = rejected_events_index.write().await;
1742 rejected.insert(event.id);
1743 tracing::debug!(
1744 event_id = %event.id,
1745 kind = %event.kind.as_u16(),
1746 "Added rejected announcement to exclusion list"
1747 );
1748 }
1749
1712 ProcessResult::Rejected 1750 ProcessResult::Rejected
1713 } 1751 }
1714 } 1752 }
@@ -2231,28 +2269,30 @@ impl SyncManager {
2231 let mut all_remote_ids = Vec::new(); 2269 let mut all_remote_ids = Vec::new();
2232 let mut failed_count = 0; 2270 let mut failed_count = 0;
2233 2271
2234 // Get event IDs currently in purgatory to avoid fetching them 2272 // Get event IDs to exclude: purgatory + rejected announcements
2235 let purgatory_ids = self.purgatory.event_ids(); 2273 let purgatory_ids = self.purgatory.event_ids();
2274 let rejected_ids = self.rejected_events_index.read().await.clone();
2275 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect();
2236 2276
2237 for (idx, result) in diff_results { 2277 for (idx, result) in diff_results {
2238 match result { 2278 match result {
2239 Ok(reconciliation) => { 2279 Ok(reconciliation) => {
2240 let remote_excluding_purgatory_ids: HashSet<EventId> = reconciliation 2280 let remote_excluding_ids: HashSet<EventId> = reconciliation
2241 .remote 2281 .remote
2242 .difference(&purgatory_ids) 2282 .difference(&excluded_ids)
2243 .cloned() 2283 .cloned()
2244 .collect(); 2284 .collect();
2245 let remote_count = remote_excluding_purgatory_ids.len(); 2285 let remote_count = remote_excluding_ids.len();
2246 tracing::debug!( 2286 tracing::debug!(
2247 relay = %relay_url, 2287 relay = %relay_url,
2248 filter_idx = idx, 2288 filter_idx = idx,
2249 remote_count = remote_count, 2289 remote_count = remote_count,
2250 local_count = reconciliation.local.len(), 2290 local_count = reconciliation.local.len(),
2251 remote_ids = ?remote_excluding_purgatory_ids, 2291 remote_ids = ?remote_excluding_ids,
2252 "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx 2292 "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx
2253 ); 2293 );
2254 if remote_count > 0 { 2294 if remote_count > 0 {
2255 all_remote_ids.extend(remote_excluding_purgatory_ids.into_iter()); 2295 all_remote_ids.extend(remote_excluding_ids.into_iter());
2256 } 2296 }
2257 } 2297 }
2258 Err(e) => { 2298 Err(e) => {
@@ -2496,3 +2536,75 @@ impl SyncManager {
2496 tracing::info!("SyncManager shutdown complete"); 2536 tracing::info!("SyncManager shutdown complete");
2497 } 2537 }
2498} 2538}
2539
2540#[cfg(test)]
2541mod tests {
2542 use super::*;
2543 use nostr_sdk::prelude::*;
2544
2545 #[tokio::test]
2546 async fn test_rejected_events_index_tracks_announcements() {
2547 // Create a rejected events index
2548 let rejected_index: RejectedEventsIndex = Arc::new(RwLock::new(HashSet::new()));
2549
2550 // Create test announcement event (kind 30617)
2551 let keys = Keys::generate();
2552 let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content")
2553 .sign_with_keys(&keys)
2554 .unwrap();
2555
2556 // Verify index is empty
2557 {
2558 let rejected = rejected_index.read().await;
2559 assert_eq!(rejected.len(), 0);
2560 }
2561
2562 // Simulate rejection by adding to index
2563 {
2564 let mut rejected = rejected_index.write().await;
2565 rejected.insert(announcement.id);
2566 }
2567
2568 // Verify event is tracked
2569 {
2570 let rejected = rejected_index.read().await;
2571 assert!(rejected.contains(&announcement.id));
2572 assert_eq!(rejected.len(), 1);
2573 }
2574 }
2575
2576 #[tokio::test]
2577 async fn test_rejected_events_excluded_from_negentropy() {
2578 // Create indices
2579 let purgatory_ids: HashSet<EventId> = HashSet::new();
2580 let mut rejected_ids = HashSet::new();
2581
2582 // Create test event IDs
2583 let rejected_id = EventId::from_hex(
2584 "0000000000000000000000000000000000000000000000000000000000000001",
2585 )
2586 .unwrap();
2587 let valid_id = EventId::from_hex(
2588 "0000000000000000000000000000000000000000000000000000000000000002",
2589 )
2590 .unwrap();
2591
2592 rejected_ids.insert(rejected_id);
2593
2594 // Simulate negentropy reconciliation result
2595 let mut remote_ids = HashSet::new();
2596 remote_ids.insert(rejected_id);
2597 remote_ids.insert(valid_id);
2598
2599 // Exclude rejected and purgatory events
2600 let excluded_ids: HashSet<EventId> =
2601 purgatory_ids.union(&rejected_ids).cloned().collect();
2602 let filtered_ids: HashSet<EventId> =
2603 remote_ids.difference(&excluded_ids).cloned().collect();
2604
2605 // Verify rejected event is excluded
2606 assert!(!filtered_ids.contains(&rejected_id));
2607 assert!(filtered_ids.contains(&valid_id));
2608 assert_eq!(filtered_ids.len(), 1);
2609 }
2610}