From 895359aeb6746b98ff82944e4fca503f4a6e5439 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 16:27:38 +0000 Subject: feat(sync): add cleanup loops and metrics for rejected events index Add automatic cleanup and Prometheus metrics for the two-tier rejected events index that caches rejected announcements for re-processing. Cleanup loops: - Hot cache: Every 60 seconds (events expire after 2 minutes) - Cold index: Every 24 hours (metadata expires after 7 days) - Background task with graceful shutdown support New Prometheus metrics (7): - Gauges: hot_cache_current, cold_index_current - Counters: hits, misses, hot_expired, cold_expired, invalidated This completes the maintainer announcement re-processing feature, reducing wait time from 24 hours to <1 second when a maintainer's announcement arrives before the repository owner's announcement. Memory is bounded through automatic cleanup, and comprehensive metrics enable monitoring of hit rates, memory usage, and cleanup effectiveness. Changes: - src/sync/metrics.rs: Added 7 metrics with recording methods - src/sync/rejected_index.rs: Added optional metrics support - src/sync/mod.rs: Added cleanup background task Tests: 248 library tests passing, 3 integration tests passing --- src/sync/mod.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 5 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 35a8afb..f296c0f 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -367,6 +367,66 @@ async fn run_daily_timer( /// Run the combined health and metrics checker /// /// This function runs in a loop with a 2-second interval, performing three tasks: +/// Background task for cleaning up expired entries from the rejected events index +/// +/// This task runs two cleanup operations at different intervals: +/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache +/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index +/// +/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). +/// The cold index cleanup runs daily since metadata is small and expires slowly. +async fn run_rejected_index_cleanup( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { + let hot_cache_interval = Duration::from_secs(60); + let cold_index_interval = Duration::from_secs(86400); // 24 hours + + tracing::info!( + "Rejected index cleanup started (hot cache: 60s, cold index: daily)" + ); + + let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); + let mut cold_index_timer = tokio::time::interval(cold_index_interval); + + // Tick immediately to set the initial delay + hot_cache_timer.tick().await; + cold_index_timer.tick().await; + + loop { + tokio::select! { + _ = hot_cache_timer.tick() => { + let manager = sync_manager.lock().await; + let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); + if hot_expired > 0 { + tracing::debug!( + "Cleaned up {} expired entries from rejected announcements hot cache", + hot_expired + ); + } + } + _ = cold_index_timer.tick() => { + let manager = sync_manager.lock().await; + let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); + if cold_expired > 0 { + tracing::info!( + "Cleaned up {} expired entries from rejected announcements cold index", + cold_expired + ); + } + } + _ = shutdown_rx.recv() => { + tracing::info!("Rejected index cleanup received shutdown signal"); + break; + } + } + } +} + +/// Background task for checking relay health and updating metrics +/// +/// This task runs every 2 seconds and performs three operations: +/// /// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones /// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired /// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker @@ -499,10 +559,18 @@ impl SyncManager { repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), pending_sync_index: Arc::new(RwLock::new(HashMap::new())), - rejected_events_index: Arc::new(RejectedEventsIndex::new( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - )), + rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { + RejectedEventsIndex::with_metrics( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + metrics.clone(), + ) + } else { + RejectedEventsIndex::new( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + ) + }), connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, @@ -1154,7 +1222,15 @@ impl SyncManager { run_health_and_metrics_checker(checker_manager, checker_shutdown).await; }); - // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications + // 10. Spawn rejected events index cleanup task + // Hot cache cleanup every 60s, cold index cleanup daily + let cleanup_manager = Arc::clone(&sync_manager); + let cleanup_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { + run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; + }); + + // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications loop { // Wait for an event without holding the lock tokio::select! { -- cgit v1.2.3