upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 16:27:38 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 16:27:38 +0000
commit895359aeb6746b98ff82944e4fca503f4a6e5439 (patch)
treeea01bf45433282a365748dd3ba102879946d2426 /src/sync/mod.rs
parent83d29a446d96f87e5c947faf49fb33f18db4fc17 (diff)
feat(sync): add cleanup loops and metrics for rejected events index
Add automatic cleanup and Prometheus metrics for the two-tier rejected events index that caches rejected announcements for re-processing. Cleanup loops: - Hot cache: Every 60 seconds (events expire after 2 minutes) - Cold index: Every 24 hours (metadata expires after 7 days) - Background task with graceful shutdown support New Prometheus metrics (7): - Gauges: hot_cache_current, cold_index_current - Counters: hits, misses, hot_expired, cold_expired, invalidated This completes the maintainer announcement re-processing feature, reducing wait time from 24 hours to <1 second when a maintainer's announcement arrives before the repository owner's announcement. Memory is bounded through automatic cleanup, and comprehensive metrics enable monitoring of hit rates, memory usage, and cleanup effectiveness. Changes: - src/sync/metrics.rs: Added 7 metrics with recording methods - src/sync/rejected_index.rs: Added optional metrics support - src/sync/mod.rs: Added cleanup background task Tests: 248 library tests passing, 3 integration tests passing
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs86
1 files changed, 81 insertions, 5 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 35a8afb..f296c0f 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -367,6 +367,66 @@ async fn run_daily_timer(
367/// Run the combined health and metrics checker 367/// Run the combined health and metrics checker
368/// 368///
369/// This function runs in a loop with a 2-second interval, performing three tasks: 369/// This function runs in a loop with a 2-second interval, performing three tasks:
370/// Background task for cleaning up expired entries from the rejected events index
371///
372/// This task runs two cleanup operations at different intervals:
373/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache
374/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index
375///
376/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly).
377/// The cold index cleanup runs daily since metadata is small and expires slowly.
378async fn run_rejected_index_cleanup(
379 sync_manager: Arc<Mutex<SyncManager>>,
380 mut shutdown_rx: broadcast::Receiver<()>,
381) {
382 let hot_cache_interval = Duration::from_secs(60);
383 let cold_index_interval = Duration::from_secs(86400); // 24 hours
384
385 tracing::info!(
386 "Rejected index cleanup started (hot cache: 60s, cold index: daily)"
387 );
388
389 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval);
390 let mut cold_index_timer = tokio::time::interval(cold_index_interval);
391
392 // Tick immediately to set the initial delay
393 hot_cache_timer.tick().await;
394 cold_index_timer.tick().await;
395
396 loop {
397 tokio::select! {
398 _ = hot_cache_timer.tick() => {
399 let manager = sync_manager.lock().await;
400 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired();
401 if hot_expired > 0 {
402 tracing::debug!(
403 "Cleaned up {} expired entries from rejected announcements hot cache",
404 hot_expired
405 );
406 }
407 }
408 _ = cold_index_timer.tick() => {
409 let manager = sync_manager.lock().await;
410 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired();
411 if cold_expired > 0 {
412 tracing::info!(
413 "Cleaned up {} expired entries from rejected announcements cold index",
414 cold_expired
415 );
416 }
417 }
418 _ = shutdown_rx.recv() => {
419 tracing::info!("Rejected index cleanup received shutdown signal");
420 break;
421 }
422 }
423 }
424}
425
426/// Background task for checking relay health and updating metrics
427///
428/// This task runs every 2 seconds and performs three operations:
429///
370/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones 430/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones
371/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired 431/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired
372/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker 432/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker
@@ -499,10 +559,18 @@ impl SyncManager {
499 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 559 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
500 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 560 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
501 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 561 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
502 rejected_events_index: Arc::new(RejectedEventsIndex::new( 562 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics {
503 Duration::from_secs(config.rejected_hot_cache_duration_secs), 563 RejectedEventsIndex::with_metrics(
504 Duration::from_secs(config.rejected_cold_index_expiry_secs), 564 Duration::from_secs(config.rejected_hot_cache_duration_secs),
505 )), 565 Duration::from_secs(config.rejected_cold_index_expiry_secs),
566 metrics.clone(),
567 )
568 } else {
569 RejectedEventsIndex::new(
570 Duration::from_secs(config.rejected_hot_cache_duration_secs),
571 Duration::from_secs(config.rejected_cold_index_expiry_secs),
572 )
573 }),
506 connections: HashMap::new(), 574 connections: HashMap::new(),
507 health_tracker: Arc::new(RelayHealthTracker::new(config)), 575 health_tracker: Arc::new(RelayHealthTracker::new(config)),
508 next_batch_id: 0, 576 next_batch_id: 0,
@@ -1154,7 +1222,15 @@ impl SyncManager {
1154 run_health_and_metrics_checker(checker_manager, checker_shutdown).await; 1222 run_health_and_metrics_checker(checker_manager, checker_shutdown).await;
1155 }); 1223 });
1156 1224
1157 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 1225 // 10. Spawn rejected events index cleanup task
1226 // Hot cache cleanup every 60s, cold index cleanup daily
1227 let cleanup_manager = Arc::clone(&sync_manager);
1228 let cleanup_shutdown = shutdown_tx.subscribe();
1229 tokio::spawn(async move {
1230 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await;
1231 });
1232
1233 // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
1158 loop { 1234 loop {
1159 // Wait for an event without holding the lock 1235 // Wait for an event without holding the lock
1160 tokio::select! { 1236 tokio::select! {