upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs40
1 files changed, 39 insertions, 1 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index f296c0f..93b0e38 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -367,12 +367,14 @@ async fn run_daily_timer(
367/// Run the combined health and metrics checker 367/// Run the combined health and metrics checker
368/// 368///
369/// This function runs in a loop with a 2-second interval, performing three tasks: 369/// This function runs in a loop with a 2-second interval, performing three tasks:
370/// Background task for cleaning up expired entries from the rejected events index 370/// Background task for cleaning up expired entries from the rejected events indexes
371/// 371///
372/// This task runs two cleanup operations at different intervals: 372/// This task runs two cleanup operations at different intervals:
373/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache 373/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache
374/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index 374/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index
375/// 375///
376/// Cleans up both the announcements index and the states index.
377///
376/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). 378/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly).
377/// The cold index cleanup runs daily since metadata is small and expires slowly. 379/// The cold index cleanup runs daily since metadata is small and expires slowly.
378async fn run_rejected_index_cleanup( 380async fn run_rejected_index_cleanup(
@@ -397,6 +399,8 @@ async fn run_rejected_index_cleanup(
397 tokio::select! { 399 tokio::select! {
398 _ = hot_cache_timer.tick() => { 400 _ = hot_cache_timer.tick() => {
399 let manager = sync_manager.lock().await; 401 let manager = sync_manager.lock().await;
402
403 // Clean up announcements index
400 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); 404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired();
401 if hot_expired > 0 { 405 if hot_expired > 0 {
402 tracing::debug!( 406 tracing::debug!(
@@ -404,9 +408,20 @@ async fn run_rejected_index_cleanup(
404 hot_expired 408 hot_expired
405 ); 409 );
406 } 410 }
411
412 // Clean up states index
413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired();
414 if states_hot_expired > 0 {
415 tracing::debug!(
416 "Cleaned up {} expired entries from rejected states hot cache",
417 states_hot_expired
418 );
419 }
407 } 420 }
408 _ = cold_index_timer.tick() => { 421 _ = cold_index_timer.tick() => {
409 let manager = sync_manager.lock().await; 422 let manager = sync_manager.lock().await;
423
424 // Clean up announcements index
410 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); 425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired();
411 if cold_expired > 0 { 426 if cold_expired > 0 {
412 tracing::info!( 427 tracing::info!(
@@ -414,6 +429,15 @@ async fn run_rejected_index_cleanup(
414 cold_expired 429 cold_expired
415 ); 430 );
416 } 431 }
432
433 // Clean up states index
434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired();
435 if states_cold_expired > 0 {
436 tracing::info!(
437 "Cleaned up {} expired entries from rejected states cold index",
438 states_cold_expired
439 );
440 }
417 } 441 }
418 _ = shutdown_rx.recv() => { 442 _ = shutdown_rx.recv() => {
419 tracing::info!("Rejected index cleanup received shutdown signal"); 443 tracing::info!("Rejected index cleanup received shutdown signal");
@@ -507,6 +531,8 @@ pub struct SyncManager {
507 pending_sync_index: PendingSyncIndex, 531 pending_sync_index: PendingSyncIndex,
508 /// Rejected announcement events (30617/30618) - two-tier storage for re-processing 532 /// Rejected announcement events (30617/30618) - two-tier storage for re-processing
509 rejected_events_index: Arc<RejectedEventsIndex>, 533 rejected_events_index: Arc<RejectedEventsIndex>,
534 /// Rejected state events (30618) - two-tier storage for re-processing
535 rejected_states_index: Arc<RejectedEventsIndex>,
510 /// Active relay connections - keyed by relay URL 536 /// Active relay connections - keyed by relay URL
511 connections: HashMap<String, RelayConnection>, 537 connections: HashMap<String, RelayConnection>,
512 /// Health tracker for relay connection state 538 /// Health tracker for relay connection state
@@ -571,6 +597,18 @@ impl SyncManager {
571 Duration::from_secs(config.rejected_cold_index_expiry_secs), 597 Duration::from_secs(config.rejected_cold_index_expiry_secs),
572 ) 598 )
573 }), 599 }),
600 rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics {
601 RejectedEventsIndex::with_metrics(
602 Duration::from_secs(config.rejected_hot_cache_duration_secs),
603 Duration::from_secs(config.rejected_cold_index_expiry_secs),
604 metrics.clone(),
605 )
606 } else {
607 RejectedEventsIndex::new(
608 Duration::from_secs(config.rejected_hot_cache_duration_secs),
609 Duration::from_secs(config.rejected_cold_index_expiry_secs),
610 )
611 }),
574 connections: HashMap::new(), 612 connections: HashMap::new(),
575 health_tracker: Arc::new(RelayHealthTracker::new(config)), 613 health_tracker: Arc::new(RelayHealthTracker::new(config)),
576 next_batch_id: 0, 614 next_batch_id: 0,