From e4cfecbfc909c9ca4983101cf6a5855959a5d49f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 15:38:58 +0000 Subject: feat: Add two-tier rejected events index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a sophisticated two-tier storage system for rejected repository announcements to enable immediate re-processing when dependencies resolve. ## Architecture **Tier 1: Hot Cache (2 minutes)** - Stores full event objects for immediate re-processing - Enables <1 second re-processing vs 24 hour wait - Auto-expires to prevent memory growth - Memory: ~200 KB typical, ~20 MB worst case **Tier 2: Cold Index (7 days)** - Stores metadata only (event_id, pubkey, identifier) - Prevents repeated downloads of rejected events - Enables invalidation when circumstances change - Memory: ~1 MB typical ## Problem Solved Without this system, maintainer announcements face a timing gap: 00:00 - Maintainer announcement rejected → Event discarded 00:02 - Owner announcement accepted (lists maintainer) → Want to re-process 00:02 - ❌ Maintainer announcement GONE → Must wait 24h for next sync With two-tier system: 00:00 - Maintainer announcement rejected → Stored in both tiers 00:02 - Owner announcement accepted → Invalidate + get from hot cache 00:02 - ✅ Re-process immediately → Accepted in <1 second ## Implementation New module: src/sync/rejected_index.rs - RejectedEventsIndex: Public API combining both tiers - HotCache: Internal struct for full event storage - ColdIndex: Internal struct for metadata storage - RejectionReason: Enum for tracking why events were rejected Key methods: - add_announcement(): Add to both tiers - contains(): Check if event is rejected - invalidate_and_get_events(): Remove from cold index, get from hot cache - cleanup_expired(): Remove expired entries from both tiers ## Testing 9 comprehensive unit tests covering: - Hot cache storage and retrieval - Hot cache expiration - Cold index metadata tracking - Cold index invalidation - Two-tier integration - Cleanup of expired entries - Hot cache misses after expiry - Multiple maintainer repositories All tests passing. ## Next Steps PR2: Switch SyncManager to use new RejectedEventsIndex PR3: Add invalidation + immediate re-processing logic PR4: Add cleanup task + Prometheus metrics Part of: Maintainer chain discovery fix See: work/SOLUTION-SUMMARY-V2.md for full design --- src/sync/mod.rs | 17 +- src/sync/rejected_index.rs | 647 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 660 insertions(+), 4 deletions(-) create mode 100644 src/sync/rejected_index.rs (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8b1da0e..55bea17 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -16,6 +16,7 @@ pub mod algorithms; pub mod filters; pub mod health; pub mod metrics; +pub mod rejected_index; pub mod relay_connection; pub mod self_subscriber; @@ -25,6 +26,11 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; // Re-export metrics types pub use metrics::SyncMetrics; +// Re-export rejected index types +pub use rejected_index::{RejectionReason}; +// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used +// Current code still uses the simple HashSet type alias below + // Re-export relay connection types pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; @@ -67,7 +73,10 @@ pub type PendingSyncIndex = Arc>>>; /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing /// to avoid repeatedly fetching and rejecting the same events. -pub type RejectedEventsIndex = Arc>>; +/// +/// NOTE: This is a temporary simple implementation. PR2 will replace this with the +/// two-tier RejectedEventsIndex from rejected_index.rs (hot cache + cold index). +type RejectedEventsIndexSimple = Arc>>; // ============================================================================= // Supporting Data Structures @@ -436,7 +445,7 @@ pub struct SyncManager { /// In-flight subscription batches pending_sync_index: PendingSyncIndex, /// Rejected announcement event IDs (30617/30618) - excluded from sync - rejected_events_index: RejectedEventsIndex, + rejected_events_index: RejectedEventsIndexSimple, /// Active relay connections - keyed by relay URL connections: HashMap, /// Health tracker for relay connection state @@ -1992,7 +2001,7 @@ impl SyncManager { database: &SharedDatabase, write_policy: &Nip34WritePolicy, local_relay: &LocalRelay, - rejected_events_index: &RejectedEventsIndex, + rejected_events_index: &RejectedEventsIndexSimple, ) -> ProcessResult { use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -2880,7 +2889,7 @@ mod tests { #[tokio::test] async fn test_rejected_events_index_tracks_announcements() { // Create a rejected events index - let rejected_index: RejectedEventsIndex = Arc::new(RwLock::new(HashSet::new())); + let rejected_index: RejectedEventsIndexSimple = Arc::new(RwLock::new(HashSet::new())); // Create test announcement event (kind 30617) let keys = Keys::generate(); diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs new file mode 100644 index 0000000..f89783a --- /dev/null +++ b/src/sync/rejected_index.rs @@ -0,0 +1,647 @@ +//! Two-tier rejected events index for efficient re-processing +//! +//! This module provides a two-tier storage system for rejected repository announcements: +//! +//! 1. **Hot Cache (Tier 1)**: Stores full event objects for 2 minutes +//! - Enables immediate re-processing when dependencies resolve +//! - Auto-expires to prevent memory growth +//! - Typical memory: ~200 KB, worst case: ~20 MB +//! +//! 2. **Cold Index (Tier 2)**: Stores metadata only for 7 days +//! - Prevents repeated downloads of rejected events +//! - Enables invalidation when dependencies change +//! - Typical memory: ~1 MB +//! +//! # Problem Solved +//! +//! Without this system, maintainer announcements face a timing gap: +//! +//! ```text +//! 00:00 - Maintainer announcement rejected → Event discarded +//! 00:02 - Owner announcement accepted (lists maintainer) → Want to re-process +//! 00:02 - ❌ Maintainer announcement GONE → Must wait 24h for next sync +//! ``` +//! +//! With the two-tier system: +//! +//! ```text +//! 00:00 - Maintainer announcement rejected → Stored in hot cache + cold index +//! 00:02 - Owner announcement accepted → Invalidate + get from hot cache +//! 00:02 - ✅ Re-process immediately → Accepted in <1 second +//! ``` +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────┐ +//! │ Tier 1: Hot Cache (2 minutes) │ +//! │ - Stores FULL EVENT objects │ +//! │ - Enables IMMEDIATE re-processing │ +//! │ - Auto-expires after 2 minutes │ +//! │ - Memory: ~200 KB typical, ~20 MB worst case │ +//! └─────────────────────────────────────────────────────────────┘ +//! │ +//! │ After 2 minutes +//! ▼ +//! ┌─────────────────────────────────────────────────────────────┐ +//! │ Tier 2: Cold Index (7 days) │ +//! │ - Stores METADATA only (event_id, pubkey, identifier) │ +//! │ - Prevents repeated downloads │ +//! │ - Enables invalidation │ +//! │ - Memory: ~1 MB typical │ +//! └─────────────────────────────────────────────────────────────┘ +//! ``` +//! +//! # Usage +//! +//! ```rust,no_run +//! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason}; +//! use nostr_sdk::Event; +//! use std::time::Duration; +//! +//! let index = RejectedEventsIndex::new( +//! Duration::from_secs(120), // hot cache: 2 minutes +//! Duration::from_secs(604800), // cold index: 7 days +//! ); +//! +//! // Add rejected announcement +//! index.add_announcement( +//! event.clone(), +//! event.pubkey, +//! "my-repo".to_string(), +//! RejectionReason::DoesNotListService, +//! ); +//! +//! // Later, when owner announcement accepted... +//! let (removed, hot_events) = index.invalidate_and_get_events( +//! &maintainer_pubkey, +//! "my-repo", +//! ); +//! +//! // Re-process events from hot cache immediately +//! for event in hot_events { +//! process_event(&event).await; +//! } +//! ``` + +use nostr_sdk::{Event, EventId, PublicKey}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +/// Reason why a repository announcement was rejected +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RejectionReason { + /// Announcement doesn't list this service in clone/web URLs + DoesNotListService, + /// Maintainer announcement rejected (owner not yet accepted) + MaintainerNotYetValid, + /// Other validation failure + Other, +} + +impl std::fmt::Display for RejectionReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::DoesNotListService => write!(f, "does_not_list_service"), + Self::MaintainerNotYetValid => write!(f, "maintainer_not_yet_valid"), + Self::Other => write!(f, "other"), + } + } +} + +/// Entry in the hot cache (full event) +#[derive(Debug, Clone)] +struct HotCacheEntry { + event: Event, + pubkey: PublicKey, + identifier: String, + reason: RejectionReason, + cached_at: Instant, +} + +/// Entry in the cold index (metadata only) +#[derive(Debug, Clone)] +struct ColdIndexEntry { + event_id: EventId, + pubkey: PublicKey, + identifier: String, + reason: RejectionReason, + rejected_at: Instant, +} + +/// Hot cache: Stores full events for immediate re-processing +/// +/// Events are stored for a short duration (default: 2 minutes) to enable +/// immediate re-processing when dependencies resolve. After expiry, events +/// are dropped from the hot cache but remain in the cold index. +#[derive(Debug, Clone)] +struct HotCache { + /// Map of event_id -> full event entry + entries: Arc>>, + /// Duration before entries expire + expiry_duration: Duration, +} + +impl HotCache { + fn new(expiry_duration: Duration) -> Self { + Self { + entries: Arc::new(RwLock::new(HashMap::new())), + expiry_duration, + } + } + + /// Add event to hot cache + fn add(&self, event: Event, pubkey: PublicKey, identifier: String, reason: RejectionReason) { + let entry = HotCacheEntry { + event, + pubkey, + identifier, + reason, + cached_at: Instant::now(), + }; + + self.entries.write().unwrap().insert(entry.event.id, entry); + } + + /// Get events for a specific maintainer/identifier from hot cache + fn get_maintainer_events(&self, pubkey: &PublicKey, identifier: &str) -> Vec { + let entries = self.entries.read().unwrap(); + let now = Instant::now(); + + entries + .values() + .filter(|entry| { + // Check if entry matches and hasn't expired + entry.pubkey == *pubkey + && entry.identifier == identifier + && now.duration_since(entry.cached_at) < self.expiry_duration + }) + .map(|entry| entry.event.clone()) + .collect() + } + + /// Remove expired entries from hot cache + fn cleanup_expired(&self) -> usize { + let mut entries = self.entries.write().unwrap(); + let now = Instant::now(); + let initial_count = entries.len(); + + entries.retain(|_, entry| { + now.duration_since(entry.cached_at) < self.expiry_duration + }); + + initial_count - entries.len() + } + + /// Get current number of entries in hot cache + fn len(&self) -> usize { + self.entries.read().unwrap().len() + } + + /// Check if event is in hot cache + fn contains(&self, event_id: &EventId) -> bool { + self.entries.read().unwrap().contains_key(event_id) + } +} + +/// Cold index: Stores metadata only for long-term deduplication +/// +/// Events are stored for a long duration (default: 7 days) to prevent +/// repeated downloads of rejected events. Only metadata is stored to +/// minimize memory usage. +#[derive(Debug, Clone)] +struct ColdIndex { + /// Map of event_id -> metadata entry + entries: Arc>>, + /// Duration before entries expire + expiry_duration: Duration, +} + +impl ColdIndex { + fn new(expiry_duration: Duration) -> Self { + Self { + entries: Arc::new(RwLock::new(HashMap::new())), + expiry_duration, + } + } + + /// Add metadata to cold index + fn add( + &self, + event_id: EventId, + pubkey: PublicKey, + identifier: String, + reason: RejectionReason, + ) { + let entry = ColdIndexEntry { + event_id, + pubkey, + identifier, + reason, + rejected_at: Instant::now(), + }; + + self.entries.write().unwrap().insert(event_id, entry); + } + + /// Check if event is in cold index + fn contains(&self, event_id: &EventId) -> bool { + let entries = self.entries.read().unwrap(); + if let Some(entry) = entries.get(event_id) { + let now = Instant::now(); + now.duration_since(entry.rejected_at) < self.expiry_duration + } else { + false + } + } + + /// Invalidate (remove) maintainer announcements from cold index + /// + /// Called when an owner announcement is accepted that lists this maintainer. + /// Removes the cold index entries so they can be re-fetched on next sync. + fn invalidate_maintainer_announcements( + &self, + maintainer_pubkey: &PublicKey, + identifier: &str, + ) -> usize { + let mut entries = self.entries.write().unwrap(); + let initial_count = entries.len(); + + entries.retain(|_, entry| { + // Keep entries that DON'T match the maintainer/identifier + !(entry.pubkey == *maintainer_pubkey && entry.identifier == identifier) + }); + + initial_count - entries.len() + } + + /// Remove expired entries from cold index + fn cleanup_expired(&self) -> usize { + let mut entries = self.entries.write().unwrap(); + let now = Instant::now(); + let initial_count = entries.len(); + + entries.retain(|_, entry| { + now.duration_since(entry.rejected_at) < self.expiry_duration + }); + + initial_count - entries.len() + } + + /// Get current number of entries in cold index + fn len(&self) -> usize { + self.entries.read().unwrap().len() + } +} + +/// Two-tier rejected events index +/// +/// Combines hot cache (full events, short duration) with cold index +/// (metadata only, long duration) for efficient re-processing and deduplication. +#[derive(Debug, Clone)] +pub struct RejectedEventsIndex { + hot_cache: HotCache, + cold_index: ColdIndex, +} + +impl RejectedEventsIndex { + /// Create new rejected events index + /// + /// # Arguments + /// + /// * `hot_cache_duration` - How long to keep full events in hot cache (default: 2 minutes) + /// * `cold_index_duration` - How long to keep metadata in cold index (default: 7 days) + pub fn new(hot_cache_duration: Duration, cold_index_duration: Duration) -> Self { + Self { + hot_cache: HotCache::new(hot_cache_duration), + cold_index: ColdIndex::new(cold_index_duration), + } + } + + /// Add rejected announcement to both tiers + /// + /// # Arguments + /// + /// * `event` - Full event object (stored in hot cache) + /// * `pubkey` - Author's public key + /// * `identifier` - Repository identifier (d tag) + /// * `reason` - Why the announcement was rejected + pub fn add_announcement( + &self, + event: Event, + pubkey: PublicKey, + identifier: String, + reason: RejectionReason, + ) { + // Add to hot cache (full event) + self.hot_cache.add( + event.clone(), + pubkey, + identifier.clone(), + reason, + ); + + // Add to cold index (metadata only) + self.cold_index.add(event.id, pubkey, identifier, reason); + } + + /// Check if event is already rejected (in either tier) + pub fn contains(&self, event_id: &EventId) -> bool { + self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) + } + + /// Invalidate maintainer announcements and get events for immediate re-processing + /// + /// This is called when an owner announcement is accepted that lists a maintainer. + /// It removes the cold index entries (so they can be re-fetched on next sync) and + /// returns any events still in the hot cache for immediate re-processing. + /// + /// # Returns + /// + /// Tuple of (number of cold index entries removed, events from hot cache) + pub fn invalidate_and_get_events( + &self, + maintainer_pubkey: &PublicKey, + identifier: &str, + ) -> (usize, Vec) { + // Remove from cold index (prevents re-fetch) + let removed = self + .cold_index + .invalidate_maintainer_announcements(maintainer_pubkey, identifier); + + // Get from hot cache (for immediate re-processing) + let events = self + .hot_cache + .get_maintainer_events(maintainer_pubkey, identifier); + + (removed, events) + } + + /// Clean up expired entries from both tiers + /// + /// Returns tuple of (hot cache expired, cold index expired) + pub fn cleanup_expired(&self) -> (usize, usize) { + let hot_expired = self.hot_cache.cleanup_expired(); + let cold_expired = self.cold_index.cleanup_expired(); + (hot_expired, cold_expired) + } + + /// Get current number of entries in hot cache + pub fn hot_cache_len(&self) -> usize { + self.hot_cache.len() + } + + /// Get current number of entries in cold index + pub fn cold_index_len(&self) -> usize { + self.cold_index.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use nostr_sdk::{Keys, NostrSigner}; + + async fn create_test_event() -> Event { + let keys = Keys::generate(); + let unsigned = nostr_sdk::EventBuilder::text_note("test") + .build(keys.public_key()); + keys.sign_event(unsigned).await.unwrap() + } + + #[tokio::test] + async fn test_hot_cache_stores_and_retrieves_events() { + let cache = HotCache::new(Duration::from_secs(120)); + let event = create_test_event().await; + let pubkey = event.pubkey; + let identifier = "test-repo".to_string(); + + cache.add( + event.clone(), + pubkey, + identifier.clone(), + RejectionReason::DoesNotListService, + ); + + assert!(cache.contains(&event.id)); + + let retrieved = cache.get_maintainer_events(&pubkey, &identifier); + assert_eq!(retrieved.len(), 1); + assert_eq!(retrieved[0].id, event.id); + } + + #[tokio::test] + async fn test_hot_cache_expires_after_duration() { + let cache = HotCache::new(Duration::from_millis(50)); + let event = create_test_event().await; + + cache.add( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + assert!(cache.contains(&event.id)); + + // Wait for expiry + std::thread::sleep(Duration::from_millis(60)); + + let expired = cache.cleanup_expired(); + assert_eq!(expired, 1); + assert!(!cache.contains(&event.id)); + } + + #[tokio::test] + async fn test_cold_index_tracks_metadata() { + let index = ColdIndex::new(Duration::from_secs(604800)); + let event = create_test_event().await; + + index.add( + event.id, + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + assert!(index.contains(&event.id)); + assert_eq!(index.len(), 1); + } + + #[tokio::test] + async fn test_cold_index_invalidation() { + let index = ColdIndex::new(Duration::from_secs(604800)); + let event = create_test_event().await; + let pubkey = event.pubkey; + let identifier = "test-repo".to_string(); + + index.add( + event.id, + pubkey, + identifier.clone(), + RejectionReason::MaintainerNotYetValid, + ); + + assert!(index.contains(&event.id)); + + let removed = index.invalidate_maintainer_announcements(&pubkey, &identifier); + assert_eq!(removed, 1); + assert!(!index.contains(&event.id)); + } + + #[tokio::test] + async fn test_two_tier_index_add_and_contains() { + let index = RejectedEventsIndex::new( + Duration::from_secs(120), + Duration::from_secs(604800), + ); + let event = create_test_event().await; + + index.add_announcement( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + assert!(index.contains(&event.id)); + assert_eq!(index.hot_cache_len(), 1); + assert_eq!(index.cold_index_len(), 1); + } + + #[tokio::test] + async fn test_invalidate_and_get_events() { + let index = RejectedEventsIndex::new( + Duration::from_secs(120), + Duration::from_secs(604800), + ); + let event = create_test_event().await; + let pubkey = event.pubkey; + let identifier = "test-repo".to_string(); + + index.add_announcement( + event.clone(), + pubkey, + identifier.clone(), + RejectionReason::MaintainerNotYetValid, + ); + + let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); + + assert_eq!(removed, 1); // Removed from cold index + assert_eq!(hot_events.len(), 1); // Retrieved from hot cache + assert_eq!(hot_events[0].id, event.id); + + // Cold index entry removed, hot cache still has it + assert_eq!(index.cold_index_len(), 0); + assert_eq!(index.hot_cache_len(), 1); + } + + #[tokio::test] + async fn test_cleanup_expired_both_tiers() { + let index = RejectedEventsIndex::new( + Duration::from_millis(50), // Hot cache expires quickly + Duration::from_millis(100), // Cold index expires slower + ); + let event = create_test_event().await; + + index.add_announcement( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + // Wait for hot cache to expire + std::thread::sleep(Duration::from_millis(60)); + + let (hot_expired, cold_expired) = index.cleanup_expired(); + assert_eq!(hot_expired, 1); + assert_eq!(cold_expired, 0); // Not expired yet + + // Wait for cold index to expire + std::thread::sleep(Duration::from_millis(50)); + + let (hot_expired, cold_expired) = index.cleanup_expired(); + assert_eq!(hot_expired, 0); // Already cleaned up + assert_eq!(cold_expired, 1); + } + + #[tokio::test] + async fn test_hot_cache_miss_after_expiry() { + let index = RejectedEventsIndex::new( + Duration::from_millis(50), + Duration::from_secs(604800), + ); + let event = create_test_event().await; + let pubkey = event.pubkey; + let identifier = "test-repo".to_string(); + + index.add_announcement( + event.clone(), + pubkey, + identifier.clone(), + RejectionReason::MaintainerNotYetValid, + ); + + // Wait for hot cache to expire + std::thread::sleep(Duration::from_millis(60)); + + let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); + + assert_eq!(removed, 1); // Removed from cold index + assert_eq!(hot_events.len(), 0); // Hot cache expired - miss! + + // This is expected: events arrive >2 minutes apart, must wait for next sync + } + + #[tokio::test] + async fn test_multiple_maintainer_repos() { + let index = RejectedEventsIndex::new( + Duration::from_secs(120), + Duration::from_secs(604800), + ); + + let keys1 = Keys::generate(); + let keys2 = Keys::generate(); + + let unsigned1 = nostr_sdk::EventBuilder::text_note("test1") + .build(keys1.public_key()); + let event1 = keys1.sign_event(unsigned1).await.unwrap(); + + let unsigned2 = nostr_sdk::EventBuilder::text_note("test2") + .build(keys2.public_key()); + let event2 = keys2.sign_event(unsigned2).await.unwrap(); + + // Add two different maintainer repos + index.add_announcement( + event1.clone(), + event1.pubkey, + "repo1".to_string(), + RejectionReason::MaintainerNotYetValid, + ); + + index.add_announcement( + event2.clone(), + event2.pubkey, + "repo2".to_string(), + RejectionReason::MaintainerNotYetValid, + ); + + assert_eq!(index.hot_cache_len(), 2); + assert_eq!(index.cold_index_len(), 2); + + // Invalidate only first maintainer + let (removed, hot_events) = + index.invalidate_and_get_events(&event1.pubkey, "repo1"); + + assert_eq!(removed, 1); + assert_eq!(hot_events.len(), 1); + assert_eq!(hot_events[0].id, event1.id); + + // Second maintainer still in index + assert_eq!(index.cold_index_len(), 1); + assert!(index.contains(&event2.id)); + } +} -- cgit v1.2.3