From 895359aeb6746b98ff82944e4fca503f4a6e5439 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 16:27:38 +0000 Subject: feat(sync): add cleanup loops and metrics for rejected events index Add automatic cleanup and Prometheus metrics for the two-tier rejected events index that caches rejected announcements for re-processing. Cleanup loops: - Hot cache: Every 60 seconds (events expire after 2 minutes) - Cold index: Every 24 hours (metadata expires after 7 days) - Background task with graceful shutdown support New Prometheus metrics (7): - Gauges: hot_cache_current, cold_index_current - Counters: hits, misses, hot_expired, cold_expired, invalidated This completes the maintainer announcement re-processing feature, reducing wait time from 24 hours to <1 second when a maintainer's announcement arrives before the repository owner's announcement. Memory is bounded through automatic cleanup, and comprehensive metrics enable monitoring of hit rates, memory usage, and cleanup effectiveness. Changes: - src/sync/metrics.rs: Added 7 metrics with recording methods - src/sync/rejected_index.rs: Added optional metrics support - src/sync/mod.rs: Added cleanup background task Tests: 248 library tests passing, 3 integration tests passing --- src/sync/metrics.rs | 124 +++++++++++++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 86 +++++++++++++++++++++++++++++-- src/sync/rejected_index.rs | 80 ++++++++++++++++++++++++++++- 3 files changed, 284 insertions(+), 6 deletions(-) (limited to 'src/sync') diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 7907d8e..a175210 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs @@ -40,6 +40,22 @@ pub struct SyncMetrics { relays_connected_total: IntGauge, /// Relays marked as dead relays_dead_total: IntGauge, + + // === Rejected Announcements Index Metrics === + /// Current number of entries in hot cache + rejected_announcements_hot_cache_current: IntGauge, + /// Total hot cache hits (events re-processed from cache) + rejected_announcements_hot_cache_hits_total: IntCounter, + /// Total hot cache misses (events not in cache) + rejected_announcements_hot_cache_misses_total: IntCounter, + /// Total expired entries removed from hot cache + rejected_announcements_hot_cache_expired_total: IntCounter, + /// Current number of entries in cold index + rejected_announcements_cold_index_current: IntGauge, + /// Total cold index entries expired and removed + rejected_announcements_cold_index_expired_total: IntCounter, + /// Total invalidations (maintainer announcements invalidated) + rejected_announcements_invalidated_total: IntCounter, } impl SyncMetrics { @@ -113,6 +129,49 @@ impl SyncMetrics { ))?; registry.register(Box::new(relays_dead_total.clone()))?; + // Rejected announcements metrics + let rejected_announcements_hot_cache_current = IntGauge::with_opts(Opts::new( + "ngit_sync_rejected_announcements_hot_cache_current", + "Current number of entries in hot cache (full events, 2 min expiry)", + ))?; + registry.register(Box::new(rejected_announcements_hot_cache_current.clone()))?; + + let rejected_announcements_hot_cache_hits_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_announcements_hot_cache_hits_total", + "Total hot cache hits (events re-processed from cache)", + ))?; + registry.register(Box::new(rejected_announcements_hot_cache_hits_total.clone()))?; + + let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_announcements_hot_cache_misses_total", + "Total hot cache misses (events not in cache when invalidated)", + ))?; + registry.register(Box::new(rejected_announcements_hot_cache_misses_total.clone()))?; + + let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_announcements_hot_cache_expired_total", + "Total expired entries removed from hot cache", + ))?; + registry.register(Box::new(rejected_announcements_hot_cache_expired_total.clone()))?; + + let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new( + "ngit_sync_rejected_announcements_cold_index_current", + "Current number of entries in cold index (metadata only, 7 day expiry)", + ))?; + registry.register(Box::new(rejected_announcements_cold_index_current.clone()))?; + + let rejected_announcements_cold_index_expired_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_announcements_cold_index_expired_total", + "Total expired entries removed from cold index", + ))?; + registry.register(Box::new(rejected_announcements_cold_index_expired_total.clone()))?; + + let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_announcements_invalidated_total", + "Total invalidations (maintainer announcements invalidated when owner accepted)", + ))?; + registry.register(Box::new(rejected_announcements_invalidated_total.clone()))?; + Ok(Self { relay_connected, connection_attempts_total, @@ -122,6 +181,13 @@ impl SyncMetrics { relays_tracked_total, relays_connected_total, relays_dead_total, + rejected_announcements_hot_cache_current, + rejected_announcements_hot_cache_hits_total, + rejected_announcements_hot_cache_misses_total, + rejected_announcements_hot_cache_expired_total, + rejected_announcements_cold_index_current, + rejected_announcements_cold_index_expired_total, + rejected_announcements_invalidated_total, }) } @@ -293,6 +359,43 @@ impl SyncMetrics { pub fn get_dead_count(&self) -> i64 { self.relays_dead_total.get() } + + // === Rejected Announcements Recording Methods === + + /// Update hot cache current size gauge. + pub fn update_hot_cache_size(&self, size: usize) { + self.rejected_announcements_hot_cache_current.set(size as i64); + } + + /// Record hot cache hit (event re-processed from cache). + pub fn record_hot_cache_hit(&self) { + self.rejected_announcements_hot_cache_hits_total.inc(); + } + + /// Record hot cache miss (event not in cache when invalidated). + pub fn record_hot_cache_miss(&self) { + self.rejected_announcements_hot_cache_misses_total.inc(); + } + + /// Record hot cache expired entries. + pub fn record_hot_cache_expired(&self, count: usize) { + self.rejected_announcements_hot_cache_expired_total.inc_by(count as u64); + } + + /// Update cold index current size gauge. + pub fn update_cold_index_size(&self, size: usize) { + self.rejected_announcements_cold_index_current.set(size as i64); + } + + /// Record cold index expired entries. + pub fn record_cold_index_expired(&self, count: usize) { + self.rejected_announcements_cold_index_expired_total.inc_by(count as u64); + } + + /// Record invalidation (maintainer announcement invalidated). + pub fn record_invalidation(&self, count: usize) { + self.rejected_announcements_invalidated_total.inc_by(count as u64); + } } #[cfg(test)] @@ -395,4 +498,25 @@ mod tests { let metrics2 = SyncMetrics::register(®istry); assert!(metrics2.is_err()); } + + #[test] + fn test_rejected_announcements_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + // Test hot cache metrics + metrics.update_hot_cache_size(10); + metrics.record_hot_cache_hit(); + metrics.record_hot_cache_hit(); + metrics.record_hot_cache_miss(); + metrics.record_hot_cache_expired(5); + + // Test cold index metrics + metrics.update_cold_index_size(100); + metrics.record_cold_index_expired(10); + + // Test invalidation metrics + metrics.record_invalidation(3); + metrics.record_invalidation(2); + } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 35a8afb..f296c0f 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -367,6 +367,66 @@ async fn run_daily_timer( /// Run the combined health and metrics checker /// /// This function runs in a loop with a 2-second interval, performing three tasks: +/// Background task for cleaning up expired entries from the rejected events index +/// +/// This task runs two cleanup operations at different intervals: +/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache +/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index +/// +/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). +/// The cold index cleanup runs daily since metadata is small and expires slowly. +async fn run_rejected_index_cleanup( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { + let hot_cache_interval = Duration::from_secs(60); + let cold_index_interval = Duration::from_secs(86400); // 24 hours + + tracing::info!( + "Rejected index cleanup started (hot cache: 60s, cold index: daily)" + ); + + let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); + let mut cold_index_timer = tokio::time::interval(cold_index_interval); + + // Tick immediately to set the initial delay + hot_cache_timer.tick().await; + cold_index_timer.tick().await; + + loop { + tokio::select! { + _ = hot_cache_timer.tick() => { + let manager = sync_manager.lock().await; + let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); + if hot_expired > 0 { + tracing::debug!( + "Cleaned up {} expired entries from rejected announcements hot cache", + hot_expired + ); + } + } + _ = cold_index_timer.tick() => { + let manager = sync_manager.lock().await; + let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); + if cold_expired > 0 { + tracing::info!( + "Cleaned up {} expired entries from rejected announcements cold index", + cold_expired + ); + } + } + _ = shutdown_rx.recv() => { + tracing::info!("Rejected index cleanup received shutdown signal"); + break; + } + } + } +} + +/// Background task for checking relay health and updating metrics +/// +/// This task runs every 2 seconds and performs three operations: +/// /// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones /// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired /// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker @@ -499,10 +559,18 @@ impl SyncManager { repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), pending_sync_index: Arc::new(RwLock::new(HashMap::new())), - rejected_events_index: Arc::new(RejectedEventsIndex::new( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - )), + rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { + RejectedEventsIndex::with_metrics( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + metrics.clone(), + ) + } else { + RejectedEventsIndex::new( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + ) + }), connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, @@ -1154,7 +1222,15 @@ impl SyncManager { run_health_and_metrics_checker(checker_manager, checker_shutdown).await; }); - // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications + // 10. Spawn rejected events index cleanup task + // Hot cache cleanup every 60s, cold index cleanup daily + let cleanup_manager = Arc::clone(&sync_manager); + let cleanup_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { + run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; + }); + + // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications loop { // Wait for an event without holding the lock tokio::select! { diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 80d6b5b..4733d80 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs @@ -299,10 +299,22 @@ impl ColdIndex { /// /// Combines hot cache (full events, short duration) with cold index /// (metadata only, long duration) for efficient re-processing and deduplication. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct RejectedEventsIndex { hot_cache: HotCache, cold_index: ColdIndex, + metrics: Option, +} + +// Manual Debug impl to avoid requiring Debug on SyncMetrics +impl std::fmt::Debug for RejectedEventsIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RejectedEventsIndex") + .field("hot_cache", &self.hot_cache) + .field("cold_index", &self.cold_index) + .field("metrics", &self.metrics.is_some()) + .finish() + } } impl RejectedEventsIndex { @@ -316,6 +328,38 @@ impl RejectedEventsIndex { Self { hot_cache: HotCache::new(hot_cache_duration), cold_index: ColdIndex::new(cold_index_duration), + metrics: None, + } + } + + /// Create new rejected events index with metrics + /// + /// # Arguments + /// + /// * `hot_cache_duration` - How long to keep full events in hot cache (default: 2 minutes) + /// * `cold_index_duration` - How long to keep metadata in cold index (default: 7 days) + /// * `metrics` - Prometheus metrics for tracking index operations + pub fn with_metrics( + hot_cache_duration: Duration, + cold_index_duration: Duration, + metrics: super::metrics::SyncMetrics, + ) -> Self { + let index = Self { + hot_cache: HotCache::new(hot_cache_duration), + cold_index: ColdIndex::new(cold_index_duration), + metrics: Some(metrics), + }; + + // Initialize metrics with current sizes + index.update_metrics(); + index + } + + /// Update metrics with current sizes + fn update_metrics(&self) { + if let Some(ref metrics) = self.metrics { + metrics.update_hot_cache_size(self.hot_cache.len()); + metrics.update_cold_index_size(self.cold_index.len()); } } @@ -344,6 +388,9 @@ impl RejectedEventsIndex { // Add to cold index (metadata only) self.cold_index.add(event.id, pubkey, identifier, reason); + + // Update metrics + self.update_metrics(); } /// Check if event is already rejected (in either tier) @@ -375,6 +422,23 @@ impl RejectedEventsIndex { .hot_cache .get_maintainer_events(maintainer_pubkey, identifier); + // Track metrics + if let Some(ref metrics) = self.metrics { + if removed > 0 { + metrics.record_invalidation(removed); + } + if events.is_empty() { + metrics.record_hot_cache_miss(); + } else { + for _ in &events { + metrics.record_hot_cache_hit(); + } + } + } + + // Update size metrics + self.update_metrics(); + (removed, events) } @@ -384,6 +448,20 @@ impl RejectedEventsIndex { pub fn cleanup_expired(&self) -> (usize, usize) { let hot_expired = self.hot_cache.cleanup_expired(); let cold_expired = self.cold_index.cleanup_expired(); + + // Track metrics + if let Some(ref metrics) = self.metrics { + if hot_expired > 0 { + metrics.record_hot_cache_expired(hot_expired); + } + if cold_expired > 0 { + metrics.record_cold_index_expired(cold_expired); + } + } + + // Update size metrics + self.update_metrics(); + (hot_expired, cold_expired) } -- cgit v1.2.3