From b101afa00bc28e1b55286145cb81e32a5b3decc9 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 14 Jan 2026 10:19:18 +0000 Subject: feat(sync): add rejected events cache persistence and integrate with shutdown/startup Implement save/restore functionality for rejected events cache and integrate persistence with relay shutdown/startup lifecycle. Both purgatory and rejected cache now survive relay restarts. Key features: - Serialize rejected events cache to JSON (rejected-events-cache.json) - Save both hot cache (2min, full events) and cold index (7day, metadata) - Restore with downtime adjustment (preserves remaining TTL) - Graceful degradation (missing/corrupted files don't crash) - File cleanup after successful restore - Automatic restoration in SyncManager::new() Integration: - Shutdown hook saves both purgatory and rejected cache - Startup hook restores both and re-queues repositories - Non-fatal errors (logs warnings, continues on failure) Files: - src/sync/rejected_index.rs: save_to_disk/restore_from_disk methods - src/sync/mod.rs: SyncManager integration and auto-restore - src/main.rs: Shutdown/startup hooks for both caches - tests/purgatory_persistence.rs: 17 integration tests Tests: 13 unit tests + 17 integration tests covering full lifecycle --- src/sync/mod.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 59 insertions(+), 12 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e2d55bd..3213dfb 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -43,6 +43,7 @@ pub use health::RelayHealthTracker; use tokio::time::sleep; use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -581,6 +582,7 @@ impl SyncManager { /// * `write_policy` - Policy for validating events before storage /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) /// * `config` - Configuration for sync settings + /// * `data_path` - Path to git data directory (for persistence) /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) pub fn new( bootstrap_relay_url: Option, @@ -589,11 +591,42 @@ impl SyncManager { write_policy: Nip34WritePolicy, local_relay: LocalRelay, config: &Config, + data_path: PathBuf, sync_metrics: Option, ) -> Self { // Extract purgatory from write_policy for read-only access let purgatory = write_policy.purgatory().clone(); + // Create rejected events index + let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics { + RejectedEventsIndex::with_metrics( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + metrics.clone(), + ) + } else { + RejectedEventsIndex::new( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + ) + }); + + // Attempt to restore rejected events index from disk + let rejected_index_path = data_path.join("rejected-events-cache.json"); + if rejected_index_path.exists() { + match rejected_events_index.restore_from_disk(&rejected_index_path) { + Ok(()) => { + tracing::info!("Restored rejected events index from disk"); + } + Err(e) => { + tracing::warn!( + "Failed to restore rejected events index: {}, starting empty", + e + ); + } + } + } + Self { bootstrap_relay_url, service_domain, @@ -605,18 +638,7 @@ impl SyncManager { repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), pending_sync_index: Arc::new(RwLock::new(HashMap::new())), - rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { - RejectedEventsIndex::with_metrics( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - metrics.clone(), - ) - } else { - RejectedEventsIndex::new( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - ) - }), + rejected_events_index, connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, @@ -637,6 +659,31 @@ impl SyncManager { self.next_batch_id } + /// Get a clone of the rejected events index Arc. + /// + /// This allows access to the rejected events index for persistence + /// even after the SyncManager has been moved into a task. + /// + /// # Returns + /// Arc clone of the rejected events index + pub fn rejected_events_index(&self) -> Arc { + self.rejected_events_index.clone() + } + + /// Save rejected events index to disk. + /// + /// This is called during shutdown to persist the rejected events cache, + /// allowing us to avoid re-downloading rejected events after restart. + /// + /// # Arguments + /// * `path` - Path to save the rejected index file + /// + /// # Returns + /// Ok(()) on success, Err if save fails + pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box> { + self.rejected_events_index.save_to_disk(path) + } + /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: -- cgit v1.2.3