upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/config.rs14
-rw-r--r--src/sync/mod.rs133
-rw-r--r--src/sync/rejected_index.rs20
3 files changed, 130 insertions, 37 deletions
diff --git a/src/config.rs b/src/config.rs
index c4a7b6c..2343c88 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -136,6 +136,18 @@ pub struct Config {
136 /// Primarily useful for testing that sync works without negentropy support. 136 /// Primarily useful for testing that sync works without negentropy support.
137 #[arg(long, env = "NGIT_SYNC_DISABLE_NEGENTROPY", default_value_t = false)] 137 #[arg(long, env = "NGIT_SYNC_DISABLE_NEGENTROPY", default_value_t = false)]
138 pub sync_disable_negentropy: bool, 138 pub sync_disable_negentropy: bool,
139
140 /// Hot cache duration in seconds for rejected announcements (default: 120 = 2 minutes)
141 /// Stores full event objects for immediate re-processing when dependencies resolve.
142 /// Too short (<30s): Miss events from slow relays
143 /// Too long (>5min): Waste memory
144 #[arg(long, env = "NGIT_REJECTED_HOT_CACHE_DURATION_SECS", default_value_t = 120)]
145 pub rejected_hot_cache_duration_secs: u64,
146
147 /// Cold index expiry in seconds for rejected announcements (default: 604800 = 7 days)
148 /// Stores metadata only to prevent repeated downloads of rejected events.
149 #[arg(long, env = "NGIT_REJECTED_COLD_INDEX_EXPIRY_SECS", default_value_t = 604800)]
150 pub rejected_cold_index_expiry_secs: u64,
139} 151}
140 152
141impl Config { 153impl Config {
@@ -258,6 +270,8 @@ impl Config {
258 sync_disconnect_check_interval_secs: 60, 270 sync_disconnect_check_interval_secs: 60,
259 sync_base_backoff_secs: 5, 271 sync_base_backoff_secs: 5,
260 sync_disable_negentropy: false, 272 sync_disable_negentropy: false,
273 rejected_hot_cache_duration_secs: 120,
274 rejected_cold_index_expiry_secs: 604800,
261 } 275 }
262 } 276 }
263} 277}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 55bea17..fe336d1 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -74,9 +74,10 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing 74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing
75/// to avoid repeatedly fetching and rejecting the same events. 75/// to avoid repeatedly fetching and rejecting the same events.
76/// 76///
77/// NOTE: This is a temporary simple implementation. PR2 will replace this with the 77/// Uses the two-tier RejectedEventsIndex from rejected_index.rs:
78/// two-tier RejectedEventsIndex from rejected_index.rs (hot cache + cold index). 78/// - Hot cache: Full events for 2 minutes (enables immediate re-processing)
79type RejectedEventsIndexSimple = Arc<RwLock<HashSet<EventId>>>; 79/// - Cold index: Metadata for 7 days (prevents repeated downloads)
80use rejected_index::RejectedEventsIndex;
80 81
81// ============================================================================= 82// =============================================================================
82// Supporting Data Structures 83// Supporting Data Structures
@@ -444,8 +445,8 @@ pub struct SyncManager {
444 relay_sync_index: RelaySyncIndex, 445 relay_sync_index: RelaySyncIndex,
445 /// In-flight subscription batches 446 /// In-flight subscription batches
446 pending_sync_index: PendingSyncIndex, 447 pending_sync_index: PendingSyncIndex,
447 /// Rejected announcement event IDs (30617/30618) - excluded from sync 448 /// Rejected announcement events (30617/30618) - two-tier storage for re-processing
448 rejected_events_index: RejectedEventsIndexSimple, 449 rejected_events_index: Arc<RejectedEventsIndex>,
449 /// Active relay connections - keyed by relay URL 450 /// Active relay connections - keyed by relay URL
450 connections: HashMap<String, RelayConnection>, 451 connections: HashMap<String, RelayConnection>,
451 /// Health tracker for relay connection state 452 /// Health tracker for relay connection state
@@ -498,7 +499,10 @@ impl SyncManager {
498 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 499 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
499 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 500 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
500 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 501 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
501 rejected_events_index: Arc::new(RwLock::new(HashSet::new())), 502 rejected_events_index: Arc::new(RejectedEventsIndex::new(
503 Duration::from_secs(config.rejected_hot_cache_duration_secs),
504 Duration::from_secs(config.rejected_cold_index_expiry_secs),
505 )),
502 connections: HashMap::new(), 506 connections: HashMap::new(),
503 health_tracker: Arc::new(RelayHealthTracker::new(config)), 507 health_tracker: Arc::new(RelayHealthTracker::new(config)),
504 next_batch_id: 0, 508 next_batch_id: 0,
@@ -1351,8 +1355,7 @@ impl SyncManager {
1351 RelayEvent::Event(event, subscription_id) => { 1355 RelayEvent::Event(event, subscription_id) => {
1352 // Skip events we've already rejected (announcements only) 1356 // Skip events we've already rejected (announcements only)
1353 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { 1357 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
1354 let rejected = rejected_events_index.read().await; 1358 if rejected_events_index.contains(&event.id) {
1355 if rejected.contains(&event.id) {
1356 tracing::trace!( 1359 tracing::trace!(
1357 event_id = %event.id, 1360 event_id = %event.id,
1358 kind = %event.kind.as_u16(), 1361 kind = %event.kind.as_u16(),
@@ -2001,7 +2004,7 @@ impl SyncManager {
2001 database: &SharedDatabase, 2004 database: &SharedDatabase,
2002 write_policy: &Nip34WritePolicy, 2005 write_policy: &Nip34WritePolicy,
2003 local_relay: &LocalRelay, 2006 local_relay: &LocalRelay,
2004 rejected_events_index: &RejectedEventsIndexSimple, 2007 rejected_events_index: &Arc<RejectedEventsIndex>,
2005 ) -> ProcessResult { 2008 ) -> ProcessResult {
2006 use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; 2009 use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult};
2007 use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 2010 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -2071,13 +2074,42 @@ impl SyncManager {
2071 2074
2072 // Track rejected announcement events to avoid re-fetching them 2075 // Track rejected announcement events to avoid re-fetching them
2073 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { 2076 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
2074 let mut rejected = rejected_events_index.write().await; 2077 // Extract identifier from 'd' tag
2075 rejected.insert(event.id); 2078 if let Some(identifier) = event
2076 tracing::debug!( 2079 .tags
2077 event_id = %event.id, 2080 .iter()
2078 kind = %event.kind.as_u16(), 2081 .find(|t| t.kind() == nostr_sdk::TagKind::d())
2079 "Added rejected announcement to exclusion list" 2082 .and_then(|t| t.content())
2080 ); 2083 {
2084 // Determine rejection reason based on message
2085 let reason = if message.contains("doesn't list this service") {
2086 rejected_index::RejectionReason::DoesNotListService
2087 } else if message.contains("maintainer") {
2088 rejected_index::RejectionReason::MaintainerNotYetValid
2089 } else {
2090 rejected_index::RejectionReason::Other
2091 };
2092
2093 rejected_events_index.add_announcement(
2094 event.clone(),
2095 event.pubkey,
2096 identifier.to_string(),
2097 reason,
2098 );
2099
2100 tracing::debug!(
2101 event_id = %event.id,
2102 kind = %event.kind.as_u16(),
2103 identifier = %identifier,
2104 "Added rejected announcement to two-tier index"
2105 );
2106 } else {
2107 tracing::warn!(
2108 event_id = %event.id,
2109 kind = %event.kind.as_u16(),
2110 "Announcement missing 'd' tag, cannot track in rejected index"
2111 );
2112 }
2081 } 2113 }
2082 2114
2083 ProcessResult::Rejected 2115 ProcessResult::Rejected
@@ -2608,7 +2640,7 @@ impl SyncManager {
2608 2640
2609 // Get event IDs to exclude: purgatory + rejected announcements 2641 // Get event IDs to exclude: purgatory + rejected announcements
2610 let purgatory_ids = self.purgatory.event_ids(); 2642 let purgatory_ids = self.purgatory.event_ids();
2611 let rejected_ids = self.rejected_events_index.read().await.clone(); 2643 let rejected_ids = self.rejected_events_index.get_all_event_ids();
2612 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); 2644 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect();
2613 2645
2614 for (idx, result) in diff_results { 2646 for (idx, result) in diff_results {
@@ -2888,40 +2920,48 @@ mod tests {
2888 2920
2889 #[tokio::test] 2921 #[tokio::test]
2890 async fn test_rejected_events_index_tracks_announcements() { 2922 async fn test_rejected_events_index_tracks_announcements() {
2891 // Create a rejected events index 2923 // Create a rejected events index with 2 minute hot cache, 7 day cold index
2892 let rejected_index: RejectedEventsIndexSimple = Arc::new(RwLock::new(HashSet::new())); 2924 let rejected_index = Arc::new(RejectedEventsIndex::new(
2925 Duration::from_secs(120),
2926 Duration::from_secs(604800),
2927 ));
2893 2928
2894 // Create test announcement event (kind 30617) 2929 // Create test announcement event (kind 30617) with 'd' tag
2895 let keys = Keys::generate(); 2930 let keys = Keys::generate();
2896 let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content") 2931 let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content")
2932 .tag(nostr_sdk::Tag::custom(
2933 nostr_sdk::TagKind::d(),
2934 vec!["test-repo"],
2935 ))
2897 .sign_with_keys(&keys) 2936 .sign_with_keys(&keys)
2898 .unwrap(); 2937 .unwrap();
2899 2938
2900 // Verify index is empty 2939 // Verify index is empty
2901 { 2940 assert_eq!(rejected_index.hot_cache_len(), 0);
2902 let rejected = rejected_index.read().await; 2941 assert_eq!(rejected_index.cold_index_len(), 0);
2903 assert_eq!(rejected.len(), 0);
2904 }
2905 2942
2906 // Simulate rejection by adding to index 2943 // Simulate rejection by adding to index
2907 { 2944 rejected_index.add_announcement(
2908 let mut rejected = rejected_index.write().await; 2945 announcement.clone(),
2909 rejected.insert(announcement.id); 2946 announcement.pubkey,
2910 } 2947 "test-repo".to_string(),
2948 rejected_index::RejectionReason::DoesNotListService,
2949 );
2911 2950
2912 // Verify event is tracked 2951 // Verify event is tracked in both tiers
2913 { 2952 assert!(rejected_index.contains(&announcement.id));
2914 let rejected = rejected_index.read().await; 2953 assert_eq!(rejected_index.hot_cache_len(), 1);
2915 assert!(rejected.contains(&announcement.id)); 2954 assert_eq!(rejected_index.cold_index_len(), 1);
2916 assert_eq!(rejected.len(), 1);
2917 }
2918 } 2955 }
2919 2956
2920 #[tokio::test] 2957 #[tokio::test]
2921 async fn test_rejected_events_excluded_from_negentropy() { 2958 async fn test_rejected_events_excluded_from_negentropy() {
2922 // Create indices 2959 // Create indices
2923 let purgatory_ids: HashSet<EventId> = HashSet::new(); 2960 let purgatory_ids: HashSet<EventId> = HashSet::new();
2924 let mut rejected_ids = HashSet::new(); 2961 let rejected_index = RejectedEventsIndex::new(
2962 Duration::from_secs(120),
2963 Duration::from_secs(604800),
2964 );
2925 2965
2926 // Create test event IDs 2966 // Create test event IDs
2927 let rejected_id = EventId::from_hex( 2967 let rejected_id = EventId::from_hex(
@@ -2933,7 +2973,28 @@ mod tests {
2933 ) 2973 )
2934 .unwrap(); 2974 .unwrap();
2935 2975
2936 rejected_ids.insert(rejected_id); 2976 // Add rejected event to index
2977 let keys = Keys::generate();
2978 let rejected_event = EventBuilder::new(Kind::GitRepoAnnouncement, "rejected")
2979 .tag(nostr_sdk::Tag::custom(
2980 nostr_sdk::TagKind::d(),
2981 vec!["rejected-repo"],
2982 ))
2983 .sign_with_keys(&keys)
2984 .unwrap();
2985
2986 // Override the event ID for testing (we need a specific ID)
2987 // Since we can't override the ID, let's use the actual event ID
2988 let rejected_id = rejected_event.id;
2989 rejected_index.add_announcement(
2990 rejected_event,
2991 keys.public_key(),
2992 "rejected-repo".to_string(),
2993 rejected_index::RejectionReason::DoesNotListService,
2994 );
2995
2996 // Get rejected IDs from index
2997 let rejected_ids = rejected_index.get_all_event_ids();
2937 2998
2938 // Simulate negentropy reconciliation result 2999 // Simulate negentropy reconciliation result
2939 let mut remote_ids = HashSet::new(); 3000 let mut remote_ids = HashSet::new();
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index f89783a..80d6b5b 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -85,7 +85,7 @@
85//! ``` 85//! ```
86 86
87use nostr_sdk::{Event, EventId, PublicKey}; 87use nostr_sdk::{Event, EventId, PublicKey};
88use std::collections::HashMap; 88use std::collections::{HashMap, HashSet};
89use std::sync::{Arc, RwLock}; 89use std::sync::{Arc, RwLock};
90use std::time::{Duration, Instant}; 90use std::time::{Duration, Instant};
91 91
@@ -396,6 +396,24 @@ impl RejectedEventsIndex {
396 pub fn cold_index_len(&self) -> usize { 396 pub fn cold_index_len(&self) -> usize {
397 self.cold_index.len() 397 self.cold_index.len()
398 } 398 }
399
400 /// Get all rejected event IDs (from both hot cache and cold index)
401 ///
402 /// Used for excluding rejected events from negentropy sync.
403 /// Note: This creates a snapshot - events may be added/removed concurrently.
404 pub fn get_all_event_ids(&self) -> HashSet<EventId> {
405 let mut ids = HashSet::new();
406
407 // Add from hot cache
408 let hot_entries = self.hot_cache.entries.read().unwrap();
409 ids.extend(hot_entries.keys().cloned());
410
411 // Add from cold index
412 let cold_entries = self.cold_index.entries.read().unwrap();
413 ids.extend(cold_entries.keys().cloned());
414
415 ids
416 }
399} 417}
400 418
401#[cfg(test)] 419#[cfg(test)]