From a68e23733e78d33ca1d48b83414a8db63ca3d5fd Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 20:52:07 +0000 Subject: refactor(sync): consolidate to single rejected index with helper extraction Remove rejected_states_index and use single rejected_events_index for both announcement and state events. Extract duplicate re-processing logic into a consolidated helper function. Changes: - Eliminate duplicate RepositoryAnnouncement::from_event() call - Remove rejected_states_index field from SyncManager - Update cleanup loop to process both event types via single index - Add ReprocessingStats struct to track re-processing outcomes - Add reprocess_events_from_hot_cache() helper that handles: - Logging re-processing attempts with context - Calling process_event_static recursively - Tracking saved/duplicate/purgatory/rejected counts - Replace three nearly-identical re-processing loops with helper calls Consolidates phases 1, 5, and 6 of rejected events index refactoring. --- src/sync/mod.rs | 389 ++++++++++++++++++++++++++------------------------------ 1 file changed, 181 insertions(+), 208 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 07527c7..8b5e1c3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; pub use metrics::SyncMetrics; // Re-export rejected index types -pub use rejected_index::RejectionReason; +pub use rejected_index::{EventType, RejectionReason}; // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used // Current code still uses the simple HashSet type alias below @@ -211,6 +211,19 @@ pub enum ProcessResult { Rejected, } +/// Statistics from re-processing events from hot cache +#[derive(Debug, Clone, Default)] +pub struct ReprocessingStats { + /// Number of events successfully saved + pub saved: usize, + /// Number of events that were duplicates + pub duplicate: usize, + /// Number of events added to purgatory + pub purgatory: usize, + /// Number of events still rejected + pub rejected: usize, +} + /// Pagination state for a subscription in non-Negentropy historic sync #[derive(Debug, Clone)] pub struct PaginationState { @@ -366,16 +379,14 @@ async fn run_daily_timer( // Combined Health and Metrics Checker -/// Run the combined health and metrics checker -/// -/// This function runs in a loop with a 2-second interval, performing three tasks: -/// Background task for cleaning up expired entries from the rejected events indexes +/// Background task for cleaning up expired entries from the rejected events index /// /// This task runs two cleanup operations at different intervals: /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index /// -/// Cleans up both the announcements index and the states index. +/// A single `RejectedEventsIndex` handles both announcement and state events, +/// differentiated by `EventType`. Each cleanup pass processes both types. /// /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). /// The cold index cleanup runs daily since metadata is small and expires slowly. @@ -400,42 +411,31 @@ async fn run_rejected_index_cleanup( _ = hot_cache_timer.tick() => { let manager = sync_manager.lock().await; - // Clean up announcements index - let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); - if hot_expired > 0 { - tracing::debug!( - "Cleaned up {} expired entries from rejected announcements hot cache", - hot_expired - ); - } + // Clean up hot cache for both event types (single index handles both) + // Note: cleanup_expired_for_type updates metrics with type label + let (ann_hot_expired, _) = manager.rejected_events_index.cleanup_expired_for_type("announcement"); + let (state_hot_expired, _) = manager.rejected_events_index.cleanup_expired_for_type("state"); - // Clean up states index - let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); - if states_hot_expired > 0 { + if ann_hot_expired + state_hot_expired > 0 { tracing::debug!( - "Cleaned up {} expired entries from rejected states hot cache", - states_hot_expired + announcements = ann_hot_expired, + states = state_hot_expired, + "Cleaned up expired entries from rejected events hot cache" ); } } _ = cold_index_timer.tick() => { let manager = sync_manager.lock().await; - // Clean up announcements index - let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); - if cold_expired > 0 { - tracing::info!( - "Cleaned up {} expired entries from rejected announcements cold index", - cold_expired - ); - } + // Clean up cold index for both event types (single index handles both) + let (_, ann_cold_expired) = manager.rejected_events_index.cleanup_expired_for_type("announcement"); + let (_, state_cold_expired) = manager.rejected_events_index.cleanup_expired_for_type("state"); - // Clean up states index - let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); - if states_cold_expired > 0 { + if ann_cold_expired + state_cold_expired > 0 { tracing::info!( - "Cleaned up {} expired entries from rejected states cold index", - states_cold_expired + announcements = ann_cold_expired, + states = state_cold_expired, + "Cleaned up expired entries from rejected events cold index" ); } } @@ -529,10 +529,9 @@ pub struct SyncManager { relay_sync_index: RelaySyncIndex, /// In-flight subscription batches pending_sync_index: PendingSyncIndex, - /// Rejected announcement events (30617/30618) - two-tier storage for re-processing + /// Rejected events (30617/30618) - two-tier storage for re-processing + /// Handles both announcement and state events via EventType discriminator rejected_events_index: Arc, - /// Rejected state events (30618) - two-tier storage for re-processing - rejected_states_index: Arc, /// Active relay connections - keyed by relay URL connections: HashMap, /// Health tracker for relay connection state @@ -597,18 +596,6 @@ impl SyncManager { Duration::from_secs(config.rejected_cold_index_expiry_secs), ) }), - rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics { - RejectedEventsIndex::with_metrics( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - metrics.clone(), - ) - } else { - RejectedEventsIndex::new( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - ) - }), connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, @@ -2111,6 +2098,107 @@ impl SyncManager { ); } + /// Re-process events from hot cache after their dependencies become available + /// + /// This helper consolidates the common pattern of re-processing rejected events + /// when their missing dependencies (owner announcements, git data, etc.) arrive. + /// + /// # Arguments + /// * `events` - Events to re-process from hot cache + /// * `context` - Description for logging (e.g., "maintainer announcement", "state event") + /// * `pubkey` - Public key for logging context + /// * `identifier` - Repository identifier for logging context + /// * `relay_url` - Relay URL for process_event_static + /// * `database` - Shared database for event storage + /// * `write_policy` - Policy for validating events + /// * `local_relay` - Local relay for broadcasting events + /// * `rejected_events_index` - Index for tracking rejected events + /// + /// # Returns + /// Statistics about re-processing outcomes + #[allow(clippy::too_many_arguments)] + async fn reprocess_events_from_hot_cache( + events: Vec, + context: &str, + pubkey: &PublicKey, + identifier: &str, + relay_url: &str, + database: &SharedDatabase, + write_policy: &Nip34WritePolicy, + local_relay: &LocalRelay, + rejected_events_index: &Arc, + ) -> ReprocessingStats { + let mut stats = ReprocessingStats::default(); + + for event in events { + tracing::info!( + event_id = %event.id, + pubkey = %pubkey, + identifier = %identifier, + context = %context, + "Re-processing {} from hot cache", + context + ); + + // Recursive call to process_event_static + // This is safe because: + // 1. Event was removed from hot cache before this call + // 2. Second attempt uses new context (different code path) + // 3. If second attempt fails, stays in cold index only (no third attempt) + // Use Box::pin to avoid infinitely sized future + let reprocess_result = Box::pin(Self::process_event_static( + &event, + relay_url, + database, + write_policy, + local_relay, + rejected_events_index, + )) + .await; + + match reprocess_result { + ProcessResult::Saved => { + stats.saved += 1; + tracing::info!( + event_id = %event.id, + pubkey = %pubkey, + identifier = %identifier, + "{} accepted on re-processing", + context + ); + } + ProcessResult::Duplicate => { + stats.duplicate += 1; + tracing::debug!( + event_id = %event.id, + "{} already exists (duplicate)", + context + ); + } + ProcessResult::Purgatory => { + stats.purgatory += 1; + tracing::debug!( + event_id = %event.id, + "{} added to purgatory (waiting for git data)", + context + ); + } + ProcessResult::Rejected => { + stats.rejected += 1; + tracing::warn!( + event_id = %event.id, + pubkey = %pubkey, + identifier = %identifier, + "{} still rejected on re-processing", + context + ); + } + } + } + + stats + } + /// Process a single event from a relay (static version for spawned tasks) /// /// Processes events with dedup, policy check, database save, and broadcast: @@ -2173,16 +2261,16 @@ impl SyncManager { "Synced event saved and broadcast" ); - // When an owner announcement is accepted, re-process any maintainer announcements - // that were previously rejected because the owner announcement didn't exist yet. - // This handles the race condition where maintainer events arrive before owner events - // during relay synchronization. Maintainer events in the hot cache are re-processed - // immediately and should now pass validation. + // When a repository announcement is accepted, re-process any rejected events: + // 1. Maintainer announcements that were rejected because the owner announcement didn't exist yet + // 2. State events that were rejected because no announcement existed + // This handles race conditions where events arrive before their dependencies during relay sync. if event.kind == Kind::GitRepoAnnouncement { use crate::nostr::events::RepositoryAnnouncement; match RepositoryAnnouncement::from_event(event.clone()) { Ok(announcement) => { + // Re-process rejected maintainer announcements if !announcement.maintainers.is_empty() { tracing::debug!( event_id = %event.id, @@ -2197,9 +2285,10 @@ impl SyncManager { match PublicKey::from_hex(maintainer_hex) { Ok(maintainer_pubkey) => { let (removed, hot_events) = rejected_events_index - .invalidate_and_get_events( + .invalidate_and_get( &maintainer_pubkey, &announcement.identifier, + Some(rejected_index::EventType::Announcement), ); if removed > 0 { @@ -2213,56 +2302,19 @@ impl SyncManager { } // Re-process events from hot cache immediately - for maintainer_event in hot_events { - tracing::info!( - event_id = %maintainer_event.id, - maintainer = %maintainer_hex, - identifier = %announcement.identifier, - "Re-processing maintainer announcement from hot cache" - ); - - // Recursive call to process_event_static - // This is safe because: - // 1. Event was removed from hot cache before this call - // 2. Second attempt uses maintainer exception (different code path) - // 3. If second attempt fails, stays in cold index only (no third attempt) - // Use Box::pin to avoid infinitely sized future - let reprocess_result = - Box::pin(Self::process_event_static( - &maintainer_event, - relay_url, - database, - write_policy, - local_relay, - rejected_events_index, - )) - .await; - - match reprocess_result { - ProcessResult::Saved => { - tracing::info!( - event_id = %maintainer_event.id, - maintainer = %maintainer_hex, - identifier = %announcement.identifier, - "Maintainer announcement accepted on re-processing" - ); - } - ProcessResult::Duplicate => { - tracing::debug!( - event_id = %maintainer_event.id, - "Maintainer announcement already exists (duplicate)" - ); - } - other => { - tracing::warn!( - event_id = %maintainer_event.id, - maintainer = %maintainer_hex, - identifier = %announcement.identifier, - result = ?other, - "Maintainer announcement still rejected on re-processing" - ); - } - } + if !hot_events.is_empty() { + let _stats = Self::reprocess_events_from_hot_cache( + hot_events, + "maintainer announcement", + &maintainer_pubkey, + &announcement.identifier, + relay_url, + database, + write_policy, + local_relay, + rejected_events_index, + ) + .await; } } Err(e) => { @@ -2275,28 +2327,13 @@ impl SyncManager { } } } - } - Err(e) => { - tracing::warn!( - event_id = %event.id, - error = %e, - "Failed to parse repository announcement for maintainer invalidation" - ); - } - } - // When a repository announcement is accepted, re-process any state events - // that were previously rejected because no announcement existed. - // This handles the race condition where state events arrive before their - // announcements during relay synchronization. - match RepositoryAnnouncement::from_event(event.clone()) { - Ok(announcement) => { - // Get the announcement author's state events that were rejected - let (removed, hot_events) = rejected_events_index - .invalidate_and_get_state_events( - &event.pubkey, - &announcement.identifier, - ); + // Re-process rejected state events for this announcement + let (removed, hot_events) = rejected_events_index.invalidate_and_get( + &event.pubkey, + &announcement.identifier, + Some(rejected_index::EventType::State), + ); if removed > 0 { tracing::info!( @@ -2309,62 +2346,26 @@ impl SyncManager { } // Re-process state events from hot cache immediately - for state_event in hot_events { - tracing::info!( - event_id = %state_event.id, - pubkey = %event.pubkey, - identifier = %announcement.identifier, - "Re-processing state event from hot cache (announcement now exists)" - ); - - let reprocess_result = Box::pin(Self::process_event_static( - &state_event, + if !hot_events.is_empty() { + let _stats = Self::reprocess_events_from_hot_cache( + hot_events, + "state event", + &event.pubkey, + &announcement.identifier, relay_url, database, write_policy, local_relay, rejected_events_index, - )) + ) .await; - - match reprocess_result { - ProcessResult::Saved => { - tracing::info!( - event_id = %state_event.id, - pubkey = %event.pubkey, - identifier = %announcement.identifier, - "State event accepted on re-processing (announcement now exists)" - ); - } - ProcessResult::Purgatory => { - tracing::debug!( - event_id = %state_event.id, - "State event added to purgatory (waiting for git data)" - ); - } - ProcessResult::Duplicate => { - tracing::debug!( - event_id = %state_event.id, - "State event already exists (duplicate)" - ); - } - other => { - tracing::warn!( - event_id = %state_event.id, - pubkey = %event.pubkey, - identifier = %announcement.identifier, - result = ?other, - "State event still rejected on re-processing" - ); - } - } } } Err(e) => { tracing::warn!( event_id = %event.id, error = %e, - "Failed to parse repository announcement for state event invalidation" + "Failed to parse repository announcement for rejected event invalidation" ); } } @@ -2383,8 +2384,11 @@ impl SyncManager { .and_then(|t| t.content()) { // Get rejected state events for this pubkey + identifier - let (removed, hot_events) = rejected_events_index - .invalidate_and_get_state_events(&event.pubkey, identifier); + let (removed, hot_events) = rejected_events_index.invalidate_and_get( + &event.pubkey, + identifier, + Some(rejected_index::EventType::State), + ); if removed > 0 { tracing::info!( @@ -2397,50 +2401,19 @@ impl SyncManager { } // Re-process events from hot cache immediately - for state_event in hot_events { - tracing::info!( - event_id = %state_event.id, - pubkey = %event.pubkey, - identifier = %identifier, - "Re-processing state event from hot cache" - ); - - // Recursive call to process_event_static - let reprocess_result = Box::pin(Self::process_event_static( - &state_event, + if !hot_events.is_empty() { + let _stats = Self::reprocess_events_from_hot_cache( + hot_events, + "state event", + &event.pubkey, + identifier, relay_url, database, write_policy, local_relay, rejected_events_index, - )) + ) .await; - - match reprocess_result { - ProcessResult::Saved => { - tracing::info!( - event_id = %state_event.id, - pubkey = %event.pubkey, - identifier = %identifier, - "State event accepted on re-processing" - ); - } - ProcessResult::Duplicate => { - tracing::debug!( - event_id = %state_event.id, - "State event already exists (duplicate)" - ); - } - other => { - tracing::warn!( - event_id = %state_event.id, - pubkey = %event.pubkey, - identifier = %identifier, - result = ?other, - "State event still rejected on re-processing" - ); - } - } } } } -- cgit v1.2.3