From 9bd58faad6be254f0221820fa5e8516b8b15e19d Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 20:51:49 +0000 Subject: refactor(sync): add EventType enum and unify rejected index methods Add EventType enum (Announcement, State) to distinguish event types within RejectedEventsIndex. This consolidates the two-tier index design into a single unified interface. Changes: - Add EventType enum with Announcement and State variants - Add event_type field to HotCacheEntry and ColdIndexEntry - Create unified invalidate_and_get() with optional event_type filter - Update cleanup_expired_for_type() to handle both types - Remove deprecated wrapper methods (invalidate_and_get_events, invalidate_and_get_state_events, cleanup_expired, cleanup_states_expired) Consolidates phases 2, 3, and 7 of rejected events index refactoring. --- src/sync/rejected_index.rs | 382 ++++++++++++++++++++++++++++++--------------- 1 file changed, 259 insertions(+), 123 deletions(-) (limited to 'src/sync') diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 403792a..4d31901 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs @@ -55,7 +55,7 @@ //! # Usage //! //! ```rust,ignore -//! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason}; +//! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason, EventType}; //! use nostr_sdk::{Event, PublicKey}; //! use std::time::Duration; //! @@ -73,9 +73,10 @@ //! ); //! //! // Later, when owner announcement accepted... -//! let (removed, hot_events) = index.invalidate_and_get_events( +//! let (removed, hot_events) = index.invalidate_and_get( //! &maintainer_pubkey, //! "my-repo", +//! Some(EventType::Announcement), //! ); //! //! // Re-process events from hot cache immediately @@ -89,6 +90,24 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; +/// Type of event stored in the rejected events index +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EventType { + /// Repository announcement (kind 30617) + Announcement, + /// Repository state event (kind 30618) + State, +} + +impl std::fmt::Display for EventType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Announcement => write!(f, "announcement"), + Self::State => write!(f, "state"), + } + } +} + /// Reason why a repository announcement was rejected #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RejectionReason { @@ -116,6 +135,7 @@ struct HotCacheEntry { event: Event, pubkey: PublicKey, identifier: String, + event_type: EventType, #[allow(dead_code)] // Used for metrics/debugging in future reason: RejectionReason, cached_at: Instant, @@ -128,6 +148,7 @@ struct HotCacheEntry { struct ColdIndexEntry { pubkey: PublicKey, identifier: String, + event_type: EventType, #[allow(dead_code)] // Used for metrics/debugging in future reason: RejectionReason, rejected_at: Instant, @@ -155,11 +176,19 @@ impl HotCache { } /// Add event to hot cache - fn add(&self, event: Event, pubkey: PublicKey, identifier: String, reason: RejectionReason) { + fn add( + &self, + event: Event, + pubkey: PublicKey, + identifier: String, + event_type: EventType, + reason: RejectionReason, + ) { let entry = HotCacheEntry { event, pubkey, identifier, + event_type, reason, cached_at: Instant::now(), }; @@ -168,7 +197,15 @@ impl HotCache { } /// Get events for a specific maintainer/identifier from hot cache - fn get_maintainer_events(&self, pubkey: &PublicKey, identifier: &str) -> Vec { + /// + /// If `event_type` is `Some`, only returns events of that type. + /// If `event_type` is `None`, returns all event types. + fn get_maintainer_events( + &self, + pubkey: &PublicKey, + identifier: &str, + event_type: Option, + ) -> Vec { let entries = self.entries.read().unwrap(); let now = Instant::now(); @@ -176,8 +213,10 @@ impl HotCache { .values() .filter(|entry| { // Check if entry matches and hasn't expired + let matches_type = event_type.is_none_or(|et| entry.event_type == et); entry.pubkey == *pubkey && entry.identifier == identifier + && matches_type && now.duration_since(entry.cached_at) < self.expiry_duration }) .map(|entry| entry.event.clone()) @@ -233,11 +272,13 @@ impl ColdIndex { event_id: EventId, pubkey: PublicKey, identifier: String, + event_type: EventType, reason: RejectionReason, ) { let entry = ColdIndexEntry { pubkey, identifier, + event_type, reason, rejected_at: Instant::now(), }; @@ -256,21 +297,26 @@ impl ColdIndex { } } - /// Invalidate (remove) maintainer announcements from cold index + /// Invalidate (remove) entries from cold index /// /// Called when an owner announcement is accepted that lists this maintainer. /// Removes the cold index entries so they can be re-fetched on next sync. + /// + /// If `event_type` is `Some`, only removes entries of that type. + /// If `event_type` is `None`, removes all event types matching pubkey/identifier. fn invalidate_maintainer_announcements( &self, maintainer_pubkey: &PublicKey, identifier: &str, + event_type: Option, ) -> usize { let mut entries = self.entries.write().unwrap(); let initial_count = entries.len(); entries.retain(|_, entry| { - // Keep entries that DON'T match the maintainer/identifier - !(entry.pubkey == *maintainer_pubkey && entry.identifier == identifier) + // Keep entries that DON'T match the maintainer/identifier/type + let matches_type = event_type.is_none_or(|et| entry.event_type == et); + !(entry.pubkey == *maintainer_pubkey && entry.identifier == identifier && matches_type) }); initial_count - entries.len() @@ -348,24 +394,21 @@ impl RejectedEventsIndex { metrics: Some(metrics), }; - // Initialize metrics with current sizes - index.update_metrics(); + // Initialize metrics with current sizes for both event types + index.update_metrics_for_type("announcement"); + index.update_metrics_for_type("state"); index } - /// Update metrics with current sizes (for announcements) - fn update_metrics(&self) { - if let Some(ref metrics) = self.metrics { - metrics.update_hot_cache_size(self.hot_cache.len()); - metrics.update_cold_index_size(self.cold_index.len()); - } - } - - /// Update metrics with current sizes (for states) - fn update_states_metrics(&self) { + /// Update metrics with current sizes for a specific event type + /// + /// # Arguments + /// + /// * `event_type` - The event type label ("announcement" or "state") + fn update_metrics_for_type(&self, event_type: &str) { if let Some(ref metrics) = self.metrics { - metrics.update_states_hot_cache_size(self.hot_cache.len()); - metrics.update_states_cold_index_size(self.cold_index.len()); + metrics.update_rejected_hot_cache_size(event_type, self.hot_cache.len()); + metrics.update_rejected_cold_index_size(event_type, self.cold_index.len()); } } @@ -385,14 +428,25 @@ impl RejectedEventsIndex { reason: RejectionReason, ) { // Add to hot cache (full event) - self.hot_cache - .add(event.clone(), pubkey, identifier.clone(), reason); + self.hot_cache.add( + event.clone(), + pubkey, + identifier.clone(), + EventType::Announcement, + reason, + ); // Add to cold index (metadata only) - self.cold_index.add(event.id, pubkey, identifier, reason); + self.cold_index.add( + event.id, + pubkey, + identifier, + EventType::Announcement, + reason, + ); // Update metrics - self.update_metrics(); + self.update_metrics_for_type("announcement"); } /// Add rejected state event to both tiers @@ -411,14 +465,20 @@ impl RejectedEventsIndex { reason: RejectionReason, ) { // Add to hot cache (full event) - self.hot_cache - .add(event.clone(), pubkey, identifier.clone(), reason); + self.hot_cache.add( + event.clone(), + pubkey, + identifier.clone(), + EventType::State, + reason, + ); // Add to cold index (metadata only) - self.cold_index.add(event.id, pubkey, identifier, reason); + self.cold_index + .add(event.id, pubkey, identifier, EventType::State, reason); - // Update metrics (using states metrics) - self.update_states_metrics(); + // Update metrics + self.update_metrics_for_type("state"); } /// Check if event is already rejected (in either tier) @@ -426,136 +486,92 @@ impl RejectedEventsIndex { self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) } - /// Invalidate maintainer announcements and get events for immediate re-processing + /// Invalidate events and get them for immediate re-processing (unified method) + /// + /// This is called when a dependency is satisfied (e.g., owner announcement accepted, + /// or announcement accepted for state events). It removes the cold index entries + /// (so they can be re-fetched on next sync) and returns any events still in the + /// hot cache for immediate re-processing. + /// + /// # Arguments /// - /// This is called when an owner announcement is accepted that lists a maintainer. - /// It removes the cold index entries (so they can be re-fetched on next sync) and - /// returns any events still in the hot cache for immediate re-processing. + /// * `pubkey` - Public key to match (maintainer for announcements, author for states) + /// * `identifier` - Repository identifier (d tag) + /// * `event_type` - If `Some`, filter to that event type; if `None`, return all types /// /// # Returns /// /// Tuple of (number of cold index entries removed, events from hot cache) - pub fn invalidate_and_get_events( + pub fn invalidate_and_get( &self, - maintainer_pubkey: &PublicKey, + pubkey: &PublicKey, identifier: &str, + event_type: Option, ) -> (usize, Vec) { - // Remove from cold index (prevents re-fetch) + // Remove from cold index let removed = self .cold_index - .invalidate_maintainer_announcements(maintainer_pubkey, identifier); + .invalidate_maintainer_announcements(pubkey, identifier, event_type); // Get from hot cache (for immediate re-processing) let events = self .hot_cache - .get_maintainer_events(maintainer_pubkey, identifier); + .get_maintainer_events(pubkey, identifier, event_type); - // Track metrics + // Track metrics based on event type if let Some(ref metrics) = self.metrics { + let type_label = match event_type { + Some(EventType::State) => "state", + Some(EventType::Announcement) | None => "announcement", + }; + if removed > 0 { - metrics.record_invalidation(removed); + metrics.record_rejected_invalidation(type_label, removed); } if events.is_empty() { - metrics.record_hot_cache_miss(); + metrics.record_rejected_hot_cache_miss(type_label); } else { for _ in &events { - metrics.record_hot_cache_hit(); + metrics.record_rejected_hot_cache_hit(type_label); } } } - // Update size metrics - self.update_metrics(); + // Update size metrics based on event type + let type_label = match event_type { + Some(EventType::State) => "state", + Some(EventType::Announcement) | None => "announcement", + }; + self.update_metrics_for_type(type_label); (removed, events) } - /// Invalidate state events and get events for immediate re-processing + /// Clean up expired entries from both tiers /// - /// This is called when an announcement is accepted that authorizes state events. - /// It removes the cold index entries (so they can be re-fetched on next sync) and - /// returns any events still in the hot cache for immediate re-processing. + /// # Arguments /// - /// # Returns + /// * `event_type` - The event type label for metrics ("announcement" or "state") /// - /// Tuple of (number of cold index entries removed, events from hot cache) - pub fn invalidate_and_get_state_events( - &self, - maintainer_pubkey: &PublicKey, - identifier: &str, - ) -> (usize, Vec) { - // Remove from cold index (prevents re-fetch) - let removed = self - .cold_index - .invalidate_maintainer_announcements(maintainer_pubkey, identifier); - - // Get from hot cache (for immediate re-processing) - let events = self - .hot_cache - .get_maintainer_events(maintainer_pubkey, identifier); - - // Track metrics (using states metrics) - if let Some(ref metrics) = self.metrics { - if removed > 0 { - metrics.record_states_invalidation(removed); - } - if events.is_empty() { - metrics.record_states_hot_cache_miss(); - } else { - for _ in &events { - metrics.record_states_hot_cache_hit(); - } - } - } - - // Update size metrics (using states metrics) - self.update_states_metrics(); - - (removed, events) - } - - /// Clean up expired entries from both tiers (for announcements) + /// # Returns /// - /// Returns tuple of (hot cache expired, cold index expired) - pub fn cleanup_expired(&self) -> (usize, usize) { + /// Tuple of (hot cache expired, cold index expired) + pub fn cleanup_expired_for_type(&self, event_type: &str) -> (usize, usize) { let hot_expired = self.hot_cache.cleanup_expired(); let cold_expired = self.cold_index.cleanup_expired(); // Track metrics if let Some(ref metrics) = self.metrics { if hot_expired > 0 { - metrics.record_hot_cache_expired(hot_expired); + metrics.record_rejected_hot_cache_expired(event_type, hot_expired); } if cold_expired > 0 { - metrics.record_cold_index_expired(cold_expired); + metrics.record_rejected_cold_index_expired(event_type, cold_expired); } } // Update size metrics - self.update_metrics(); - - (hot_expired, cold_expired) - } - - /// Clean up expired entries from both tiers (for states) - /// - /// Returns tuple of (hot cache expired, cold index expired) - pub fn cleanup_states_expired(&self) -> (usize, usize) { - let hot_expired = self.hot_cache.cleanup_expired(); - let cold_expired = self.cold_index.cleanup_expired(); - - // Track metrics (using states metrics) - if let Some(ref metrics) = self.metrics { - if hot_expired > 0 { - metrics.record_states_hot_cache_expired(hot_expired); - } - if cold_expired > 0 { - metrics.record_states_cold_index_expired(cold_expired); - } - } - - // Update size metrics (using states metrics) - self.update_states_metrics(); + self.update_metrics_for_type(event_type); (hot_expired, cold_expired) } @@ -611,12 +627,13 @@ mod tests { event.clone(), pubkey, identifier.clone(), + EventType::Announcement, RejectionReason::DoesNotListService, ); assert!(cache.contains(&event.id)); - let retrieved = cache.get_maintainer_events(&pubkey, &identifier); + let retrieved = cache.get_maintainer_events(&pubkey, &identifier, None); assert_eq!(retrieved.len(), 1); assert_eq!(retrieved[0].id, event.id); } @@ -630,6 +647,7 @@ mod tests { event.clone(), event.pubkey, "test-repo".to_string(), + EventType::Announcement, RejectionReason::DoesNotListService, ); @@ -652,6 +670,7 @@ mod tests { event.id, event.pubkey, "test-repo".to_string(), + EventType::Announcement, RejectionReason::DoesNotListService, ); @@ -670,12 +689,17 @@ mod tests { event.id, pubkey, identifier.clone(), + EventType::Announcement, RejectionReason::MaintainerNotYetValid, ); assert!(index.contains(&event.id)); - let removed = index.invalidate_maintainer_announcements(&pubkey, &identifier); + let removed = index.invalidate_maintainer_announcements( + &pubkey, + &identifier, + Some(EventType::Announcement), + ); assert_eq!(removed, 1); assert!(!index.contains(&event.id)); } @@ -698,7 +722,7 @@ mod tests { } #[tokio::test] - async fn test_invalidate_and_get_events() { + async fn test_invalidate_and_get_announcements() { let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); let event = create_test_event().await; let pubkey = event.pubkey; @@ -711,7 +735,8 @@ mod tests { RejectionReason::MaintainerNotYetValid, ); - let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); + let (removed, hot_events) = + index.invalidate_and_get(&pubkey, &identifier, Some(EventType::Announcement)); assert_eq!(removed, 1); // Removed from cold index assert_eq!(hot_events.len(), 1); // Retrieved from hot cache @@ -740,14 +765,14 @@ mod tests { // Wait for hot cache to expire std::thread::sleep(Duration::from_millis(60)); - let (hot_expired, cold_expired) = index.cleanup_expired(); + let (hot_expired, cold_expired) = index.cleanup_expired_for_type("announcement"); assert_eq!(hot_expired, 1); assert_eq!(cold_expired, 0); // Not expired yet // Wait for cold index to expire std::thread::sleep(Duration::from_millis(50)); - let (hot_expired, cold_expired) = index.cleanup_expired(); + let (hot_expired, cold_expired) = index.cleanup_expired_for_type("announcement"); assert_eq!(hot_expired, 0); // Already cleaned up assert_eq!(cold_expired, 1); } @@ -770,7 +795,8 @@ mod tests { // Wait for hot cache to expire std::thread::sleep(Duration::from_millis(60)); - let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); + let (removed, hot_events) = + index.invalidate_and_get(&pubkey, &identifier, Some(EventType::Announcement)); assert_eq!(removed, 1); // Removed from cold index assert_eq!(hot_events.len(), 0); // Hot cache expired - miss! @@ -810,7 +836,8 @@ mod tests { assert_eq!(index.cold_index_len(), 2); // Invalidate only first maintainer - let (removed, hot_events) = index.invalidate_and_get_events(&event1.pubkey, "repo1"); + let (removed, hot_events) = + index.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement)); assert_eq!(removed, 1); assert_eq!(hot_events.len(), 1); @@ -820,4 +847,113 @@ mod tests { assert_eq!(index.cold_index_len(), 1); assert!(index.contains(&event2.id)); } + + #[tokio::test] + async fn test_invalidate_and_get_unified_with_event_type_filter() { + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let keys = Keys::generate(); + + // Create an announcement event + let unsigned_ann = + nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); + let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); + + // Create a state event + let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); + let event_state = keys.sign_event(unsigned_state).await.unwrap(); + + let pubkey = event_ann.pubkey; + let identifier = "test-repo".to_string(); + + // Add announcement and state for same pubkey/identifier + index.add_announcement( + event_ann.clone(), + pubkey, + identifier.clone(), + RejectionReason::MaintainerNotYetValid, + ); + + index.add_state( + event_state.clone(), + pubkey, + identifier.clone(), + RejectionReason::Other, + ); + + assert_eq!(index.hot_cache_len(), 2); + assert_eq!(index.cold_index_len(), 2); + + // Invalidate only announcements + let (removed, hot_events) = + index.invalidate_and_get(&pubkey, &identifier, Some(EventType::Announcement)); + + assert_eq!(removed, 1); // Only announcement removed from cold index + assert_eq!(hot_events.len(), 1); + assert_eq!(hot_events[0].id, event_ann.id); + + // State is still in cold index + assert_eq!(index.cold_index_len(), 1); + assert!(index.contains(&event_state.id)); + + // Now invalidate states + let (removed, hot_events) = + index.invalidate_and_get(&pubkey, &identifier, Some(EventType::State)); + + assert_eq!(removed, 1); + assert_eq!(hot_events.len(), 1); + assert_eq!(hot_events[0].id, event_state.id); + + // Cold index now empty + assert_eq!(index.cold_index_len(), 0); + } + + #[tokio::test] + async fn test_invalidate_and_get_unified_without_filter() { + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let keys = Keys::generate(); + + // Create an announcement event + let unsigned_ann = + nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); + let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); + + // Create a state event + let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); + let event_state = keys.sign_event(unsigned_state).await.unwrap(); + + let pubkey = event_ann.pubkey; + let identifier = "test-repo".to_string(); + + // Add announcement and state for same pubkey/identifier + index.add_announcement( + event_ann.clone(), + pubkey, + identifier.clone(), + RejectionReason::MaintainerNotYetValid, + ); + + index.add_state( + event_state.clone(), + pubkey, + identifier.clone(), + RejectionReason::Other, + ); + + assert_eq!(index.hot_cache_len(), 2); + assert_eq!(index.cold_index_len(), 2); + + // Invalidate all types (None filter) + let (removed, hot_events) = index.invalidate_and_get(&pubkey, &identifier, None); + + assert_eq!(removed, 2); // Both removed from cold index + assert_eq!(hot_events.len(), 2); // Both returned from hot cache + + // Both should be in the results + let event_ids: Vec<_> = hot_events.iter().map(|e| e.id).collect(); + assert!(event_ids.contains(&event_ann.id)); + assert!(event_ids.contains(&event_state.id)); + + // Cold index now empty + assert_eq!(index.cold_index_len(), 0); + } } -- cgit v1.2.3