diff options
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 40 |
1 files changed, 39 insertions, 1 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index f296c0f..93b0e38 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -367,12 +367,14 @@ async fn run_daily_timer( | |||
| 367 | /// Run the combined health and metrics checker | 367 | /// Run the combined health and metrics checker |
| 368 | /// | 368 | /// |
| 369 | /// This function runs in a loop with a 2-second interval, performing three tasks: | 369 | /// This function runs in a loop with a 2-second interval, performing three tasks: |
| 370 | /// Background task for cleaning up expired entries from the rejected events index | 370 | /// Background task for cleaning up expired entries from the rejected events indexes |
| 371 | /// | 371 | /// |
| 372 | /// This task runs two cleanup operations at different intervals: | 372 | /// This task runs two cleanup operations at different intervals: |
| 373 | /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache | 373 | /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache |
| 374 | /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index | 374 | /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index |
| 375 | /// | 375 | /// |
| 376 | /// Cleans up both the announcements index and the states index. | ||
| 377 | /// | ||
| 376 | /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). | 378 | /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). |
| 377 | /// The cold index cleanup runs daily since metadata is small and expires slowly. | 379 | /// The cold index cleanup runs daily since metadata is small and expires slowly. |
| 378 | async fn run_rejected_index_cleanup( | 380 | async fn run_rejected_index_cleanup( |
| @@ -397,6 +399,8 @@ async fn run_rejected_index_cleanup( | |||
| 397 | tokio::select! { | 399 | tokio::select! { |
| 398 | _ = hot_cache_timer.tick() => { | 400 | _ = hot_cache_timer.tick() => { |
| 399 | let manager = sync_manager.lock().await; | 401 | let manager = sync_manager.lock().await; |
| 402 | |||
| 403 | // Clean up announcements index | ||
| 400 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); | 404 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); |
| 401 | if hot_expired > 0 { | 405 | if hot_expired > 0 { |
| 402 | tracing::debug!( | 406 | tracing::debug!( |
| @@ -404,9 +408,20 @@ async fn run_rejected_index_cleanup( | |||
| 404 | hot_expired | 408 | hot_expired |
| 405 | ); | 409 | ); |
| 406 | } | 410 | } |
| 411 | |||
| 412 | // Clean up states index | ||
| 413 | let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); | ||
| 414 | if states_hot_expired > 0 { | ||
| 415 | tracing::debug!( | ||
| 416 | "Cleaned up {} expired entries from rejected states hot cache", | ||
| 417 | states_hot_expired | ||
| 418 | ); | ||
| 419 | } | ||
| 407 | } | 420 | } |
| 408 | _ = cold_index_timer.tick() => { | 421 | _ = cold_index_timer.tick() => { |
| 409 | let manager = sync_manager.lock().await; | 422 | let manager = sync_manager.lock().await; |
| 423 | |||
| 424 | // Clean up announcements index | ||
| 410 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); | 425 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); |
| 411 | if cold_expired > 0 { | 426 | if cold_expired > 0 { |
| 412 | tracing::info!( | 427 | tracing::info!( |
| @@ -414,6 +429,15 @@ async fn run_rejected_index_cleanup( | |||
| 414 | cold_expired | 429 | cold_expired |
| 415 | ); | 430 | ); |
| 416 | } | 431 | } |
| 432 | |||
| 433 | // Clean up states index | ||
| 434 | let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); | ||
| 435 | if states_cold_expired > 0 { | ||
| 436 | tracing::info!( | ||
| 437 | "Cleaned up {} expired entries from rejected states cold index", | ||
| 438 | states_cold_expired | ||
| 439 | ); | ||
| 440 | } | ||
| 417 | } | 441 | } |
| 418 | _ = shutdown_rx.recv() => { | 442 | _ = shutdown_rx.recv() => { |
| 419 | tracing::info!("Rejected index cleanup received shutdown signal"); | 443 | tracing::info!("Rejected index cleanup received shutdown signal"); |
| @@ -507,6 +531,8 @@ pub struct SyncManager { | |||
| 507 | pending_sync_index: PendingSyncIndex, | 531 | pending_sync_index: PendingSyncIndex, |
| 508 | /// Rejected announcement events (30617/30618) - two-tier storage for re-processing | 532 | /// Rejected announcement events (30617/30618) - two-tier storage for re-processing |
| 509 | rejected_events_index: Arc<RejectedEventsIndex>, | 533 | rejected_events_index: Arc<RejectedEventsIndex>, |
| 534 | /// Rejected state events (30618) - two-tier storage for re-processing | ||
| 535 | rejected_states_index: Arc<RejectedEventsIndex>, | ||
| 510 | /// Active relay connections - keyed by relay URL | 536 | /// Active relay connections - keyed by relay URL |
| 511 | connections: HashMap<String, RelayConnection>, | 537 | connections: HashMap<String, RelayConnection>, |
| 512 | /// Health tracker for relay connection state | 538 | /// Health tracker for relay connection state |
| @@ -571,6 +597,18 @@ impl SyncManager { | |||
| 571 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | 597 | Duration::from_secs(config.rejected_cold_index_expiry_secs), |
| 572 | ) | 598 | ) |
| 573 | }), | 599 | }), |
| 600 | rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 601 | RejectedEventsIndex::with_metrics( | ||
| 602 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 603 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 604 | metrics.clone(), | ||
| 605 | ) | ||
| 606 | } else { | ||
| 607 | RejectedEventsIndex::new( | ||
| 608 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 609 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 610 | ) | ||
| 611 | }), | ||
| 574 | connections: HashMap::new(), | 612 | connections: HashMap::new(), |
| 575 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 613 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 576 | next_batch_id: 0, | 614 | next_batch_id: 0, |