diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 15:38:58 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 15:38:58 +0000 |
| commit | e4cfecbfc909c9ca4983101cf6a5855959a5d49f (patch) | |
| tree | d56e91539c4c8a283905a651caf4c95cb130a2a1 /src/sync | |
| parent | 29be4cb7d0fbd29325c995a76ba1b1f47beecca5 (diff) | |
feat: Add two-tier rejected events index
Implements a sophisticated two-tier storage system for rejected repository
announcements to enable immediate re-processing when dependencies resolve.
## Architecture
**Tier 1: Hot Cache (2 minutes)**
- Stores full event objects for immediate re-processing
- Enables <1 second re-processing vs 24 hour wait
- Auto-expires to prevent memory growth
- Memory: ~200 KB typical, ~20 MB worst case
**Tier 2: Cold Index (7 days)**
- Stores metadata only (event_id, pubkey, identifier)
- Prevents repeated downloads of rejected events
- Enables invalidation when circumstances change
- Memory: ~1 MB typical
## Problem Solved
Without this system, maintainer announcements face a timing gap:
00:00 - Maintainer announcement rejected → Event discarded
00:02 - Owner announcement accepted (lists maintainer) → Want to re-process
00:02 - ❌ Maintainer announcement GONE → Must wait 24h for next sync
With two-tier system:
00:00 - Maintainer announcement rejected → Stored in both tiers
00:02 - Owner announcement accepted → Invalidate + get from hot cache
00:02 - ✅ Re-process immediately → Accepted in <1 second
## Implementation
New module: src/sync/rejected_index.rs
- RejectedEventsIndex: Public API combining both tiers
- HotCache: Internal struct for full event storage
- ColdIndex: Internal struct for metadata storage
- RejectionReason: Enum for tracking why events were rejected
Key methods:
- add_announcement(): Add to both tiers
- contains(): Check if event is rejected
- invalidate_and_get_events(): Remove from cold index, get from hot cache
- cleanup_expired(): Remove expired entries from both tiers
## Testing
9 comprehensive unit tests covering:
- Hot cache storage and retrieval
- Hot cache expiration
- Cold index metadata tracking
- Cold index invalidation
- Two-tier integration
- Cleanup of expired entries
- Hot cache misses after expiry
- Multiple maintainer repositories
All tests passing.
## Next Steps
PR2: Switch SyncManager to use new RejectedEventsIndex
PR3: Add invalidation + immediate re-processing logic
PR4: Add cleanup task + Prometheus metrics
Part of: Maintainer chain discovery fix
See: work/SOLUTION-SUMMARY-V2.md for full design
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 17 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 647 |
2 files changed, 660 insertions, 4 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8b1da0e..55bea17 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -16,6 +16,7 @@ pub mod algorithms; | |||
| 16 | pub mod filters; | 16 | pub mod filters; |
| 17 | pub mod health; | 17 | pub mod health; |
| 18 | pub mod metrics; | 18 | pub mod metrics; |
| 19 | pub mod rejected_index; | ||
| 19 | pub mod relay_connection; | 20 | pub mod relay_connection; |
| 20 | pub mod self_subscriber; | 21 | pub mod self_subscriber; |
| 21 | 22 | ||
| @@ -25,6 +26,11 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; | |||
| 25 | // Re-export metrics types | 26 | // Re-export metrics types |
| 26 | pub use metrics::SyncMetrics; | 27 | pub use metrics::SyncMetrics; |
| 27 | 28 | ||
| 29 | // Re-export rejected index types | ||
| 30 | pub use rejected_index::{RejectionReason}; | ||
| 31 | // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used | ||
| 32 | // Current code still uses the simple HashSet type alias below | ||
| 33 | |||
| 28 | // Re-export relay connection types | 34 | // Re-export relay connection types |
| 29 | pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; | 35 | pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; |
| 30 | 36 | ||
| @@ -67,7 +73,10 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | |||
| 67 | /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. | 73 | /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. |
| 68 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing | 74 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing |
| 69 | /// to avoid repeatedly fetching and rejecting the same events. | 75 | /// to avoid repeatedly fetching and rejecting the same events. |
| 70 | pub type RejectedEventsIndex = Arc<RwLock<HashSet<EventId>>>; | 76 | /// |
| 77 | /// NOTE: This is a temporary simple implementation. PR2 will replace this with the | ||
| 78 | /// two-tier RejectedEventsIndex from rejected_index.rs (hot cache + cold index). | ||
| 79 | type RejectedEventsIndexSimple = Arc<RwLock<HashSet<EventId>>>; | ||
| 71 | 80 | ||
| 72 | // ============================================================================= | 81 | // ============================================================================= |
| 73 | // Supporting Data Structures | 82 | // Supporting Data Structures |
| @@ -436,7 +445,7 @@ pub struct SyncManager { | |||
| 436 | /// In-flight subscription batches | 445 | /// In-flight subscription batches |
| 437 | pending_sync_index: PendingSyncIndex, | 446 | pending_sync_index: PendingSyncIndex, |
| 438 | /// Rejected announcement event IDs (30617/30618) - excluded from sync | 447 | /// Rejected announcement event IDs (30617/30618) - excluded from sync |
| 439 | rejected_events_index: RejectedEventsIndex, | 448 | rejected_events_index: RejectedEventsIndexSimple, |
| 440 | /// Active relay connections - keyed by relay URL | 449 | /// Active relay connections - keyed by relay URL |
| 441 | connections: HashMap<String, RelayConnection>, | 450 | connections: HashMap<String, RelayConnection>, |
| 442 | /// Health tracker for relay connection state | 451 | /// Health tracker for relay connection state |
| @@ -1992,7 +2001,7 @@ impl SyncManager { | |||
| 1992 | database: &SharedDatabase, | 2001 | database: &SharedDatabase, |
| 1993 | write_policy: &Nip34WritePolicy, | 2002 | write_policy: &Nip34WritePolicy, |
| 1994 | local_relay: &LocalRelay, | 2003 | local_relay: &LocalRelay, |
| 1995 | rejected_events_index: &RejectedEventsIndex, | 2004 | rejected_events_index: &RejectedEventsIndexSimple, |
| 1996 | ) -> ProcessResult { | 2005 | ) -> ProcessResult { |
| 1997 | use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; | 2006 | use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; |
| 1998 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | 2007 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; |
| @@ -2880,7 +2889,7 @@ mod tests { | |||
| 2880 | #[tokio::test] | 2889 | #[tokio::test] |
| 2881 | async fn test_rejected_events_index_tracks_announcements() { | 2890 | async fn test_rejected_events_index_tracks_announcements() { |
| 2882 | // Create a rejected events index | 2891 | // Create a rejected events index |
| 2883 | let rejected_index: RejectedEventsIndex = Arc::new(RwLock::new(HashSet::new())); | 2892 | let rejected_index: RejectedEventsIndexSimple = Arc::new(RwLock::new(HashSet::new())); |
| 2884 | 2893 | ||
| 2885 | // Create test announcement event (kind 30617) | 2894 | // Create test announcement event (kind 30617) |
| 2886 | let keys = Keys::generate(); | 2895 | let keys = Keys::generate(); |
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs new file mode 100644 index 0000000..f89783a --- /dev/null +++ b/src/sync/rejected_index.rs | |||
| @@ -0,0 +1,647 @@ | |||
| 1 | //! Two-tier rejected events index for efficient re-processing | ||
| 2 | //! | ||
| 3 | //! This module provides a two-tier storage system for rejected repository announcements: | ||
| 4 | //! | ||
| 5 | //! 1. **Hot Cache (Tier 1)**: Stores full event objects for 2 minutes | ||
| 6 | //! - Enables immediate re-processing when dependencies resolve | ||
| 7 | //! - Auto-expires to prevent memory growth | ||
| 8 | //! - Typical memory: ~200 KB, worst case: ~20 MB | ||
| 9 | //! | ||
| 10 | //! 2. **Cold Index (Tier 2)**: Stores metadata only for 7 days | ||
| 11 | //! - Prevents repeated downloads of rejected events | ||
| 12 | //! - Enables invalidation when dependencies change | ||
| 13 | //! - Typical memory: ~1 MB | ||
| 14 | //! | ||
| 15 | //! # Problem Solved | ||
| 16 | //! | ||
| 17 | //! Without this system, maintainer announcements face a timing gap: | ||
| 18 | //! | ||
| 19 | //! ```text | ||
| 20 | //! 00:00 - Maintainer announcement rejected → Event discarded | ||
| 21 | //! 00:02 - Owner announcement accepted (lists maintainer) → Want to re-process | ||
| 22 | //! 00:02 - ❌ Maintainer announcement GONE → Must wait 24h for next sync | ||
| 23 | //! ``` | ||
| 24 | //! | ||
| 25 | //! With the two-tier system: | ||
| 26 | //! | ||
| 27 | //! ```text | ||
| 28 | //! 00:00 - Maintainer announcement rejected → Stored in hot cache + cold index | ||
| 29 | //! 00:02 - Owner announcement accepted → Invalidate + get from hot cache | ||
| 30 | //! 00:02 - ✅ Re-process immediately → Accepted in <1 second | ||
| 31 | //! ``` | ||
| 32 | //! | ||
| 33 | //! # Architecture | ||
| 34 | //! | ||
| 35 | //! ```text | ||
| 36 | //! ┌─────────────────────────────────────────────────────────────┐ | ||
| 37 | //! │ Tier 1: Hot Cache (2 minutes) │ | ||
| 38 | //! │ - Stores FULL EVENT objects │ | ||
| 39 | //! │ - Enables IMMEDIATE re-processing │ | ||
| 40 | //! │ - Auto-expires after 2 minutes │ | ||
| 41 | //! │ - Memory: ~200 KB typical, ~20 MB worst case │ | ||
| 42 | //! └─────────────────────────────────────────────────────────────┘ | ||
| 43 | //! │ | ||
| 44 | //! │ After 2 minutes | ||
| 45 | //! ▼ | ||
| 46 | //! ┌─────────────────────────────────────────────────────────────┐ | ||
| 47 | //! │ Tier 2: Cold Index (7 days) │ | ||
| 48 | //! │ - Stores METADATA only (event_id, pubkey, identifier) │ | ||
| 49 | //! │ - Prevents repeated downloads │ | ||
| 50 | //! │ - Enables invalidation │ | ||
| 51 | //! │ - Memory: ~1 MB typical │ | ||
| 52 | //! └─────────────────────────────────────────────────────────────┘ | ||
| 53 | //! ``` | ||
| 54 | //! | ||
| 55 | //! # Usage | ||
| 56 | //! | ||
| 57 | //! ```rust,no_run | ||
| 58 | //! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason}; | ||
| 59 | //! use nostr_sdk::Event; | ||
| 60 | //! use std::time::Duration; | ||
| 61 | //! | ||
| 62 | //! let index = RejectedEventsIndex::new( | ||
| 63 | //! Duration::from_secs(120), // hot cache: 2 minutes | ||
| 64 | //! Duration::from_secs(604800), // cold index: 7 days | ||
| 65 | //! ); | ||
| 66 | //! | ||
| 67 | //! // Add rejected announcement | ||
| 68 | //! index.add_announcement( | ||
| 69 | //! event.clone(), | ||
| 70 | //! event.pubkey, | ||
| 71 | //! "my-repo".to_string(), | ||
| 72 | //! RejectionReason::DoesNotListService, | ||
| 73 | //! ); | ||
| 74 | //! | ||
| 75 | //! // Later, when owner announcement accepted... | ||
| 76 | //! let (removed, hot_events) = index.invalidate_and_get_events( | ||
| 77 | //! &maintainer_pubkey, | ||
| 78 | //! "my-repo", | ||
| 79 | //! ); | ||
| 80 | //! | ||
| 81 | //! // Re-process events from hot cache immediately | ||
| 82 | //! for event in hot_events { | ||
| 83 | //! process_event(&event).await; | ||
| 84 | //! } | ||
| 85 | //! ``` | ||
| 86 | |||
| 87 | use nostr_sdk::{Event, EventId, PublicKey}; | ||
| 88 | use std::collections::HashMap; | ||
| 89 | use std::sync::{Arc, RwLock}; | ||
| 90 | use std::time::{Duration, Instant}; | ||
| 91 | |||
| 92 | /// Reason why a repository announcement was rejected | ||
| 93 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 94 | pub enum RejectionReason { | ||
| 95 | /// Announcement doesn't list this service in clone/web URLs | ||
| 96 | DoesNotListService, | ||
| 97 | /// Maintainer announcement rejected (owner not yet accepted) | ||
| 98 | MaintainerNotYetValid, | ||
| 99 | /// Other validation failure | ||
| 100 | Other, | ||
| 101 | } | ||
| 102 | |||
| 103 | impl std::fmt::Display for RejectionReason { | ||
| 104 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 105 | match self { | ||
| 106 | Self::DoesNotListService => write!(f, "does_not_list_service"), | ||
| 107 | Self::MaintainerNotYetValid => write!(f, "maintainer_not_yet_valid"), | ||
| 108 | Self::Other => write!(f, "other"), | ||
| 109 | } | ||
| 110 | } | ||
| 111 | } | ||
| 112 | |||
| 113 | /// Entry in the hot cache (full event) | ||
| 114 | #[derive(Debug, Clone)] | ||
| 115 | struct HotCacheEntry { | ||
| 116 | event: Event, | ||
| 117 | pubkey: PublicKey, | ||
| 118 | identifier: String, | ||
| 119 | reason: RejectionReason, | ||
| 120 | cached_at: Instant, | ||
| 121 | } | ||
| 122 | |||
| 123 | /// Entry in the cold index (metadata only) | ||
| 124 | #[derive(Debug, Clone)] | ||
| 125 | struct ColdIndexEntry { | ||
| 126 | event_id: EventId, | ||
| 127 | pubkey: PublicKey, | ||
| 128 | identifier: String, | ||
| 129 | reason: RejectionReason, | ||
| 130 | rejected_at: Instant, | ||
| 131 | } | ||
| 132 | |||
| 133 | /// Hot cache: Stores full events for immediate re-processing | ||
| 134 | /// | ||
| 135 | /// Events are stored for a short duration (default: 2 minutes) to enable | ||
| 136 | /// immediate re-processing when dependencies resolve. After expiry, events | ||
| 137 | /// are dropped from the hot cache but remain in the cold index. | ||
| 138 | #[derive(Debug, Clone)] | ||
| 139 | struct HotCache { | ||
| 140 | /// Map of event_id -> full event entry | ||
| 141 | entries: Arc<RwLock<HashMap<EventId, HotCacheEntry>>>, | ||
| 142 | /// Duration before entries expire | ||
| 143 | expiry_duration: Duration, | ||
| 144 | } | ||
| 145 | |||
| 146 | impl HotCache { | ||
| 147 | fn new(expiry_duration: Duration) -> Self { | ||
| 148 | Self { | ||
| 149 | entries: Arc::new(RwLock::new(HashMap::new())), | ||
| 150 | expiry_duration, | ||
| 151 | } | ||
| 152 | } | ||
| 153 | |||
| 154 | /// Add event to hot cache | ||
| 155 | fn add(&self, event: Event, pubkey: PublicKey, identifier: String, reason: RejectionReason) { | ||
| 156 | let entry = HotCacheEntry { | ||
| 157 | event, | ||
| 158 | pubkey, | ||
| 159 | identifier, | ||
| 160 | reason, | ||
| 161 | cached_at: Instant::now(), | ||
| 162 | }; | ||
| 163 | |||
| 164 | self.entries.write().unwrap().insert(entry.event.id, entry); | ||
| 165 | } | ||
| 166 | |||
| 167 | /// Get events for a specific maintainer/identifier from hot cache | ||
| 168 | fn get_maintainer_events(&self, pubkey: &PublicKey, identifier: &str) -> Vec<Event> { | ||
| 169 | let entries = self.entries.read().unwrap(); | ||
| 170 | let now = Instant::now(); | ||
| 171 | |||
| 172 | entries | ||
| 173 | .values() | ||
| 174 | .filter(|entry| { | ||
| 175 | // Check if entry matches and hasn't expired | ||
| 176 | entry.pubkey == *pubkey | ||
| 177 | && entry.identifier == identifier | ||
| 178 | && now.duration_since(entry.cached_at) < self.expiry_duration | ||
| 179 | }) | ||
| 180 | .map(|entry| entry.event.clone()) | ||
| 181 | .collect() | ||
| 182 | } | ||
| 183 | |||
| 184 | /// Remove expired entries from hot cache | ||
| 185 | fn cleanup_expired(&self) -> usize { | ||
| 186 | let mut entries = self.entries.write().unwrap(); | ||
| 187 | let now = Instant::now(); | ||
| 188 | let initial_count = entries.len(); | ||
| 189 | |||
| 190 | entries.retain(|_, entry| { | ||
| 191 | now.duration_since(entry.cached_at) < self.expiry_duration | ||
| 192 | }); | ||
| 193 | |||
| 194 | initial_count - entries.len() | ||
| 195 | } | ||
| 196 | |||
| 197 | /// Get current number of entries in hot cache | ||
| 198 | fn len(&self) -> usize { | ||
| 199 | self.entries.read().unwrap().len() | ||
| 200 | } | ||
| 201 | |||
| 202 | /// Check if event is in hot cache | ||
| 203 | fn contains(&self, event_id: &EventId) -> bool { | ||
| 204 | self.entries.read().unwrap().contains_key(event_id) | ||
| 205 | } | ||
| 206 | } | ||
| 207 | |||
| 208 | /// Cold index: Stores metadata only for long-term deduplication | ||
| 209 | /// | ||
| 210 | /// Events are stored for a long duration (default: 7 days) to prevent | ||
| 211 | /// repeated downloads of rejected events. Only metadata is stored to | ||
| 212 | /// minimize memory usage. | ||
| 213 | #[derive(Debug, Clone)] | ||
| 214 | struct ColdIndex { | ||
| 215 | /// Map of event_id -> metadata entry | ||
| 216 | entries: Arc<RwLock<HashMap<EventId, ColdIndexEntry>>>, | ||
| 217 | /// Duration before entries expire | ||
| 218 | expiry_duration: Duration, | ||
| 219 | } | ||
| 220 | |||
| 221 | impl ColdIndex { | ||
| 222 | fn new(expiry_duration: Duration) -> Self { | ||
| 223 | Self { | ||
| 224 | entries: Arc::new(RwLock::new(HashMap::new())), | ||
| 225 | expiry_duration, | ||
| 226 | } | ||
| 227 | } | ||
| 228 | |||
| 229 | /// Add metadata to cold index | ||
| 230 | fn add( | ||
| 231 | &self, | ||
| 232 | event_id: EventId, | ||
| 233 | pubkey: PublicKey, | ||
| 234 | identifier: String, | ||
| 235 | reason: RejectionReason, | ||
| 236 | ) { | ||
| 237 | let entry = ColdIndexEntry { | ||
| 238 | event_id, | ||
| 239 | pubkey, | ||
| 240 | identifier, | ||
| 241 | reason, | ||
| 242 | rejected_at: Instant::now(), | ||
| 243 | }; | ||
| 244 | |||
| 245 | self.entries.write().unwrap().insert(event_id, entry); | ||
| 246 | } | ||
| 247 | |||
| 248 | /// Check if event is in cold index | ||
| 249 | fn contains(&self, event_id: &EventId) -> bool { | ||
| 250 | let entries = self.entries.read().unwrap(); | ||
| 251 | if let Some(entry) = entries.get(event_id) { | ||
| 252 | let now = Instant::now(); | ||
| 253 | now.duration_since(entry.rejected_at) < self.expiry_duration | ||
| 254 | } else { | ||
| 255 | false | ||
| 256 | } | ||
| 257 | } | ||
| 258 | |||
| 259 | /// Invalidate (remove) maintainer announcements from cold index | ||
| 260 | /// | ||
| 261 | /// Called when an owner announcement is accepted that lists this maintainer. | ||
| 262 | /// Removes the cold index entries so they can be re-fetched on next sync. | ||
| 263 | fn invalidate_maintainer_announcements( | ||
| 264 | &self, | ||
| 265 | maintainer_pubkey: &PublicKey, | ||
| 266 | identifier: &str, | ||
| 267 | ) -> usize { | ||
| 268 | let mut entries = self.entries.write().unwrap(); | ||
| 269 | let initial_count = entries.len(); | ||
| 270 | |||
| 271 | entries.retain(|_, entry| { | ||
| 272 | // Keep entries that DON'T match the maintainer/identifier | ||
| 273 | !(entry.pubkey == *maintainer_pubkey && entry.identifier == identifier) | ||
| 274 | }); | ||
| 275 | |||
| 276 | initial_count - entries.len() | ||
| 277 | } | ||
| 278 | |||
| 279 | /// Remove expired entries from cold index | ||
| 280 | fn cleanup_expired(&self) -> usize { | ||
| 281 | let mut entries = self.entries.write().unwrap(); | ||
| 282 | let now = Instant::now(); | ||
| 283 | let initial_count = entries.len(); | ||
| 284 | |||
| 285 | entries.retain(|_, entry| { | ||
| 286 | now.duration_since(entry.rejected_at) < self.expiry_duration | ||
| 287 | }); | ||
| 288 | |||
| 289 | initial_count - entries.len() | ||
| 290 | } | ||
| 291 | |||
| 292 | /// Get current number of entries in cold index | ||
| 293 | fn len(&self) -> usize { | ||
| 294 | self.entries.read().unwrap().len() | ||
| 295 | } | ||
| 296 | } | ||
| 297 | |||
| 298 | /// Two-tier rejected events index | ||
| 299 | /// | ||
| 300 | /// Combines hot cache (full events, short duration) with cold index | ||
| 301 | /// (metadata only, long duration) for efficient re-processing and deduplication. | ||
| 302 | #[derive(Debug, Clone)] | ||
| 303 | pub struct RejectedEventsIndex { | ||
| 304 | hot_cache: HotCache, | ||
| 305 | cold_index: ColdIndex, | ||
| 306 | } | ||
| 307 | |||
| 308 | impl RejectedEventsIndex { | ||
| 309 | /// Create new rejected events index | ||
| 310 | /// | ||
| 311 | /// # Arguments | ||
| 312 | /// | ||
| 313 | /// * `hot_cache_duration` - How long to keep full events in hot cache (default: 2 minutes) | ||
| 314 | /// * `cold_index_duration` - How long to keep metadata in cold index (default: 7 days) | ||
| 315 | pub fn new(hot_cache_duration: Duration, cold_index_duration: Duration) -> Self { | ||
| 316 | Self { | ||
| 317 | hot_cache: HotCache::new(hot_cache_duration), | ||
| 318 | cold_index: ColdIndex::new(cold_index_duration), | ||
| 319 | } | ||
| 320 | } | ||
| 321 | |||
| 322 | /// Add rejected announcement to both tiers | ||
| 323 | /// | ||
| 324 | /// # Arguments | ||
| 325 | /// | ||
| 326 | /// * `event` - Full event object (stored in hot cache) | ||
| 327 | /// * `pubkey` - Author's public key | ||
| 328 | /// * `identifier` - Repository identifier (d tag) | ||
| 329 | /// * `reason` - Why the announcement was rejected | ||
| 330 | pub fn add_announcement( | ||
| 331 | &self, | ||
| 332 | event: Event, | ||
| 333 | pubkey: PublicKey, | ||
| 334 | identifier: String, | ||
| 335 | reason: RejectionReason, | ||
| 336 | ) { | ||
| 337 | // Add to hot cache (full event) | ||
| 338 | self.hot_cache.add( | ||
| 339 | event.clone(), | ||
| 340 | pubkey, | ||
| 341 | identifier.clone(), | ||
| 342 | reason, | ||
| 343 | ); | ||
| 344 | |||
| 345 | // Add to cold index (metadata only) | ||
| 346 | self.cold_index.add(event.id, pubkey, identifier, reason); | ||
| 347 | } | ||
| 348 | |||
| 349 | /// Check if event is already rejected (in either tier) | ||
| 350 | pub fn contains(&self, event_id: &EventId) -> bool { | ||
| 351 | self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) | ||
| 352 | } | ||
| 353 | |||
| 354 | /// Invalidate maintainer announcements and get events for immediate re-processing | ||
| 355 | /// | ||
| 356 | /// This is called when an owner announcement is accepted that lists a maintainer. | ||
| 357 | /// It removes the cold index entries (so they can be re-fetched on next sync) and | ||
| 358 | /// returns any events still in the hot cache for immediate re-processing. | ||
| 359 | /// | ||
| 360 | /// # Returns | ||
| 361 | /// | ||
| 362 | /// Tuple of (number of cold index entries removed, events from hot cache) | ||
| 363 | pub fn invalidate_and_get_events( | ||
| 364 | &self, | ||
| 365 | maintainer_pubkey: &PublicKey, | ||
| 366 | identifier: &str, | ||
| 367 | ) -> (usize, Vec<Event>) { | ||
| 368 | // Remove from cold index (prevents re-fetch) | ||
| 369 | let removed = self | ||
| 370 | .cold_index | ||
| 371 | .invalidate_maintainer_announcements(maintainer_pubkey, identifier); | ||
| 372 | |||
| 373 | // Get from hot cache (for immediate re-processing) | ||
| 374 | let events = self | ||
| 375 | .hot_cache | ||
| 376 | .get_maintainer_events(maintainer_pubkey, identifier); | ||
| 377 | |||
| 378 | (removed, events) | ||
| 379 | } | ||
| 380 | |||
| 381 | /// Clean up expired entries from both tiers | ||
| 382 | /// | ||
| 383 | /// Returns tuple of (hot cache expired, cold index expired) | ||
| 384 | pub fn cleanup_expired(&self) -> (usize, usize) { | ||
| 385 | let hot_expired = self.hot_cache.cleanup_expired(); | ||
| 386 | let cold_expired = self.cold_index.cleanup_expired(); | ||
| 387 | (hot_expired, cold_expired) | ||
| 388 | } | ||
| 389 | |||
| 390 | /// Get current number of entries in hot cache | ||
| 391 | pub fn hot_cache_len(&self) -> usize { | ||
| 392 | self.hot_cache.len() | ||
| 393 | } | ||
| 394 | |||
| 395 | /// Get current number of entries in cold index | ||
| 396 | pub fn cold_index_len(&self) -> usize { | ||
| 397 | self.cold_index.len() | ||
| 398 | } | ||
| 399 | } | ||
| 400 | |||
| 401 | #[cfg(test)] | ||
| 402 | mod tests { | ||
| 403 | use super::*; | ||
| 404 | use nostr_sdk::{Keys, NostrSigner}; | ||
| 405 | |||
| 406 | async fn create_test_event() -> Event { | ||
| 407 | let keys = Keys::generate(); | ||
| 408 | let unsigned = nostr_sdk::EventBuilder::text_note("test") | ||
| 409 | .build(keys.public_key()); | ||
| 410 | keys.sign_event(unsigned).await.unwrap() | ||
| 411 | } | ||
| 412 | |||
| 413 | #[tokio::test] | ||
| 414 | async fn test_hot_cache_stores_and_retrieves_events() { | ||
| 415 | let cache = HotCache::new(Duration::from_secs(120)); | ||
| 416 | let event = create_test_event().await; | ||
| 417 | let pubkey = event.pubkey; | ||
| 418 | let identifier = "test-repo".to_string(); | ||
| 419 | |||
| 420 | cache.add( | ||
| 421 | event.clone(), | ||
| 422 | pubkey, | ||
| 423 | identifier.clone(), | ||
| 424 | RejectionReason::DoesNotListService, | ||
| 425 | ); | ||
| 426 | |||
| 427 | assert!(cache.contains(&event.id)); | ||
| 428 | |||
| 429 | let retrieved = cache.get_maintainer_events(&pubkey, &identifier); | ||
| 430 | assert_eq!(retrieved.len(), 1); | ||
| 431 | assert_eq!(retrieved[0].id, event.id); | ||
| 432 | } | ||
| 433 | |||
| 434 | #[tokio::test] | ||
| 435 | async fn test_hot_cache_expires_after_duration() { | ||
| 436 | let cache = HotCache::new(Duration::from_millis(50)); | ||
| 437 | let event = create_test_event().await; | ||
| 438 | |||
| 439 | cache.add( | ||
| 440 | event.clone(), | ||
| 441 | event.pubkey, | ||
| 442 | "test-repo".to_string(), | ||
| 443 | RejectionReason::DoesNotListService, | ||
| 444 | ); | ||
| 445 | |||
| 446 | assert!(cache.contains(&event.id)); | ||
| 447 | |||
| 448 | // Wait for expiry | ||
| 449 | std::thread::sleep(Duration::from_millis(60)); | ||
| 450 | |||
| 451 | let expired = cache.cleanup_expired(); | ||
| 452 | assert_eq!(expired, 1); | ||
| 453 | assert!(!cache.contains(&event.id)); | ||
| 454 | } | ||
| 455 | |||
| 456 | #[tokio::test] | ||
| 457 | async fn test_cold_index_tracks_metadata() { | ||
| 458 | let index = ColdIndex::new(Duration::from_secs(604800)); | ||
| 459 | let event = create_test_event().await; | ||
| 460 | |||
| 461 | index.add( | ||
| 462 | event.id, | ||
| 463 | event.pubkey, | ||
| 464 | "test-repo".to_string(), | ||
| 465 | RejectionReason::DoesNotListService, | ||
| 466 | ); | ||
| 467 | |||
| 468 | assert!(index.contains(&event.id)); | ||
| 469 | assert_eq!(index.len(), 1); | ||
| 470 | } | ||
| 471 | |||
| 472 | #[tokio::test] | ||
| 473 | async fn test_cold_index_invalidation() { | ||
| 474 | let index = ColdIndex::new(Duration::from_secs(604800)); | ||
| 475 | let event = create_test_event().await; | ||
| 476 | let pubkey = event.pubkey; | ||
| 477 | let identifier = "test-repo".to_string(); | ||
| 478 | |||
| 479 | index.add( | ||
| 480 | event.id, | ||
| 481 | pubkey, | ||
| 482 | identifier.clone(), | ||
| 483 | RejectionReason::MaintainerNotYetValid, | ||
| 484 | ); | ||
| 485 | |||
| 486 | assert!(index.contains(&event.id)); | ||
| 487 | |||
| 488 | let removed = index.invalidate_maintainer_announcements(&pubkey, &identifier); | ||
| 489 | assert_eq!(removed, 1); | ||
| 490 | assert!(!index.contains(&event.id)); | ||
| 491 | } | ||
| 492 | |||
| 493 | #[tokio::test] | ||
| 494 | async fn test_two_tier_index_add_and_contains() { | ||
| 495 | let index = RejectedEventsIndex::new( | ||
| 496 | Duration::from_secs(120), | ||
| 497 | Duration::from_secs(604800), | ||
| 498 | ); | ||
| 499 | let event = create_test_event().await; | ||
| 500 | |||
| 501 | index.add_announcement( | ||
| 502 | event.clone(), | ||
| 503 | event.pubkey, | ||
| 504 | "test-repo".to_string(), | ||
| 505 | RejectionReason::DoesNotListService, | ||
| 506 | ); | ||
| 507 | |||
| 508 | assert!(index.contains(&event.id)); | ||
| 509 | assert_eq!(index.hot_cache_len(), 1); | ||
| 510 | assert_eq!(index.cold_index_len(), 1); | ||
| 511 | } | ||
| 512 | |||
| 513 | #[tokio::test] | ||
| 514 | async fn test_invalidate_and_get_events() { | ||
| 515 | let index = RejectedEventsIndex::new( | ||
| 516 | Duration::from_secs(120), | ||
| 517 | Duration::from_secs(604800), | ||
| 518 | ); | ||
| 519 | let event = create_test_event().await; | ||
| 520 | let pubkey = event.pubkey; | ||
| 521 | let identifier = "test-repo".to_string(); | ||
| 522 | |||
| 523 | index.add_announcement( | ||
| 524 | event.clone(), | ||
| 525 | pubkey, | ||
| 526 | identifier.clone(), | ||
| 527 | RejectionReason::MaintainerNotYetValid, | ||
| 528 | ); | ||
| 529 | |||
| 530 | let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); | ||
| 531 | |||
| 532 | assert_eq!(removed, 1); // Removed from cold index | ||
| 533 | assert_eq!(hot_events.len(), 1); // Retrieved from hot cache | ||
| 534 | assert_eq!(hot_events[0].id, event.id); | ||
| 535 | |||
| 536 | // Cold index entry removed, hot cache still has it | ||
| 537 | assert_eq!(index.cold_index_len(), 0); | ||
| 538 | assert_eq!(index.hot_cache_len(), 1); | ||
| 539 | } | ||
| 540 | |||
| 541 | #[tokio::test] | ||
| 542 | async fn test_cleanup_expired_both_tiers() { | ||
| 543 | let index = RejectedEventsIndex::new( | ||
| 544 | Duration::from_millis(50), // Hot cache expires quickly | ||
| 545 | Duration::from_millis(100), // Cold index expires slower | ||
| 546 | ); | ||
| 547 | let event = create_test_event().await; | ||
| 548 | |||
| 549 | index.add_announcement( | ||
| 550 | event.clone(), | ||
| 551 | event.pubkey, | ||
| 552 | "test-repo".to_string(), | ||
| 553 | RejectionReason::DoesNotListService, | ||
| 554 | ); | ||
| 555 | |||
| 556 | // Wait for hot cache to expire | ||
| 557 | std::thread::sleep(Duration::from_millis(60)); | ||
| 558 | |||
| 559 | let (hot_expired, cold_expired) = index.cleanup_expired(); | ||
| 560 | assert_eq!(hot_expired, 1); | ||
| 561 | assert_eq!(cold_expired, 0); // Not expired yet | ||
| 562 | |||
| 563 | // Wait for cold index to expire | ||
| 564 | std::thread::sleep(Duration::from_millis(50)); | ||
| 565 | |||
| 566 | let (hot_expired, cold_expired) = index.cleanup_expired(); | ||
| 567 | assert_eq!(hot_expired, 0); // Already cleaned up | ||
| 568 | assert_eq!(cold_expired, 1); | ||
| 569 | } | ||
| 570 | |||
| 571 | #[tokio::test] | ||
| 572 | async fn test_hot_cache_miss_after_expiry() { | ||
| 573 | let index = RejectedEventsIndex::new( | ||
| 574 | Duration::from_millis(50), | ||
| 575 | Duration::from_secs(604800), | ||
| 576 | ); | ||
| 577 | let event = create_test_event().await; | ||
| 578 | let pubkey = event.pubkey; | ||
| 579 | let identifier = "test-repo".to_string(); | ||
| 580 | |||
| 581 | index.add_announcement( | ||
| 582 | event.clone(), | ||
| 583 | pubkey, | ||
| 584 | identifier.clone(), | ||
| 585 | RejectionReason::MaintainerNotYetValid, | ||
| 586 | ); | ||
| 587 | |||
| 588 | // Wait for hot cache to expire | ||
| 589 | std::thread::sleep(Duration::from_millis(60)); | ||
| 590 | |||
| 591 | let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); | ||
| 592 | |||
| 593 | assert_eq!(removed, 1); // Removed from cold index | ||
| 594 | assert_eq!(hot_events.len(), 0); // Hot cache expired - miss! | ||
| 595 | |||
| 596 | // This is expected: events arrive >2 minutes apart, must wait for next sync | ||
| 597 | } | ||
| 598 | |||
| 599 | #[tokio::test] | ||
| 600 | async fn test_multiple_maintainer_repos() { | ||
| 601 | let index = RejectedEventsIndex::new( | ||
| 602 | Duration::from_secs(120), | ||
| 603 | Duration::from_secs(604800), | ||
| 604 | ); | ||
| 605 | |||
| 606 | let keys1 = Keys::generate(); | ||
| 607 | let keys2 = Keys::generate(); | ||
| 608 | |||
| 609 | let unsigned1 = nostr_sdk::EventBuilder::text_note("test1") | ||
| 610 | .build(keys1.public_key()); | ||
| 611 | let event1 = keys1.sign_event(unsigned1).await.unwrap(); | ||
| 612 | |||
| 613 | let unsigned2 = nostr_sdk::EventBuilder::text_note("test2") | ||
| 614 | .build(keys2.public_key()); | ||
| 615 | let event2 = keys2.sign_event(unsigned2).await.unwrap(); | ||
| 616 | |||
| 617 | // Add two different maintainer repos | ||
| 618 | index.add_announcement( | ||
| 619 | event1.clone(), | ||
| 620 | event1.pubkey, | ||
| 621 | "repo1".to_string(), | ||
| 622 | RejectionReason::MaintainerNotYetValid, | ||
| 623 | ); | ||
| 624 | |||
| 625 | index.add_announcement( | ||
| 626 | event2.clone(), | ||
| 627 | event2.pubkey, | ||
| 628 | "repo2".to_string(), | ||
| 629 | RejectionReason::MaintainerNotYetValid, | ||
| 630 | ); | ||
| 631 | |||
| 632 | assert_eq!(index.hot_cache_len(), 2); | ||
| 633 | assert_eq!(index.cold_index_len(), 2); | ||
| 634 | |||
| 635 | // Invalidate only first maintainer | ||
| 636 | let (removed, hot_events) = | ||
| 637 | index.invalidate_and_get_events(&event1.pubkey, "repo1"); | ||
| 638 | |||
| 639 | assert_eq!(removed, 1); | ||
| 640 | assert_eq!(hot_events.len(), 1); | ||
| 641 | assert_eq!(hot_events[0].id, event1.id); | ||
| 642 | |||
| 643 | // Second maintainer still in index | ||
| 644 | assert_eq!(index.cold_index_len(), 1); | ||
| 645 | assert!(index.contains(&event2.id)); | ||
| 646 | } | ||
| 647 | } | ||