From b101afa00bc28e1b55286145cb81e32a5b3decc9 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 14 Jan 2026 10:19:18 +0000 Subject: feat(sync): add rejected events cache persistence and integrate with shutdown/startup Implement save/restore functionality for rejected events cache and integrate persistence with relay shutdown/startup lifecycle. Both purgatory and rejected cache now survive relay restarts. Key features: - Serialize rejected events cache to JSON (rejected-events-cache.json) - Save both hot cache (2min, full events) and cold index (7day, metadata) - Restore with downtime adjustment (preserves remaining TTL) - Graceful degradation (missing/corrupted files don't crash) - File cleanup after successful restore - Automatic restoration in SyncManager::new() Integration: - Shutdown hook saves both purgatory and rejected cache - Startup hook restores both and re-queues repositories - Non-fatal errors (logs warnings, continues on failure) Files: - src/sync/rejected_index.rs: save_to_disk/restore_from_disk methods - src/sync/mod.rs: SyncManager integration and auto-restore - src/main.rs: Shutdown/startup hooks for both caches - tests/purgatory_persistence.rs: 17 integration tests Tests: 13 unit tests + 17 integration tests covering full lifecycle --- src/main.rs | 50 +++- src/sync/mod.rs | 71 ++++- src/sync/rejected_index.rs | 726 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 831 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index a6f1d9d..5e5b83a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc}; use anyhow::Result; use tokio::signal; -use tracing::{info, Level}; +use tracing::{error, info, warn, Level}; use tracing_subscriber::FmtSubscriber; use ngit_grasp::{ @@ -64,6 +64,22 @@ async fn main() -> Result<()> { ))); info!("Purgatory initialized for event coordination"); + // Restore purgatory state from disk if available + let purgatory_path = + PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json"); + + if purgatory_path.exists() { + match purgatory.restore_from_disk(&purgatory_path) { + Ok(()) => { + info!("Restored purgatory state from disk"); + // Re-queueing will happen later after sync system is created + } + Err(e) => { + warn!("Failed to restore purgatory state: {}, starting empty", e); + } + } + } + // Create Nostr relay with NIP-34 validation // Returns both the relay and database for direct queries in handlers if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { @@ -88,6 +104,7 @@ async fn main() -> Result<()> { relay_with_db.write_policy.clone(), relay_with_db.relay.clone(), &config, + PathBuf::from(config.effective_git_data_path()), metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), ); @@ -100,6 +117,21 @@ async fn main() -> Result<()> { info!("Proactive sync enabled (will discover relays from stored announcements)"); } + // Re-queue all restored purgatory repos for sync + let restored_identifiers = purgatory.get_all_identifiers(); + if !restored_identifiers.is_empty() { + info!( + "Re-queueing {} restored repositories for sync", + restored_identifiers.len() + ); + for identifier in restored_identifiers { + purgatory.enqueue_sync_immediate(&identifier); + } + } + + // Get a reference to the rejected events index for shutdown persistence + let shutdown_rejected_index = sync_manager.rejected_events_index(); + tokio::spawn(async move { sync_manager.run().await; }); @@ -190,6 +222,22 @@ async fn main() -> Result<()> { } } + // Save purgatory state to disk + let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json"); + if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) { + error!("Failed to save purgatory state: {}", e); + } else { + info!("Purgatory state saved to disk"); + } + + // Save rejected events cache to disk + let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json"); + if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) { + error!("Failed to save rejected events cache: {}", e); + } else { + info!("Rejected events cache saved to disk"); + } + // Cleanup placeholder refs on shutdown let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); if !placeholder_ids.is_empty() { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e2d55bd..3213dfb 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -43,6 +43,7 @@ pub use health::RelayHealthTracker; use tokio::time::sleep; use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -581,6 +582,7 @@ impl SyncManager { /// * `write_policy` - Policy for validating events before storage /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) /// * `config` - Configuration for sync settings + /// * `data_path` - Path to git data directory (for persistence) /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) pub fn new( bootstrap_relay_url: Option, @@ -589,11 +591,42 @@ impl SyncManager { write_policy: Nip34WritePolicy, local_relay: LocalRelay, config: &Config, + data_path: PathBuf, sync_metrics: Option, ) -> Self { // Extract purgatory from write_policy for read-only access let purgatory = write_policy.purgatory().clone(); + // Create rejected events index + let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics { + RejectedEventsIndex::with_metrics( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + metrics.clone(), + ) + } else { + RejectedEventsIndex::new( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + ) + }); + + // Attempt to restore rejected events index from disk + let rejected_index_path = data_path.join("rejected-events-cache.json"); + if rejected_index_path.exists() { + match rejected_events_index.restore_from_disk(&rejected_index_path) { + Ok(()) => { + tracing::info!("Restored rejected events index from disk"); + } + Err(e) => { + tracing::warn!( + "Failed to restore rejected events index: {}, starting empty", + e + ); + } + } + } + Self { bootstrap_relay_url, service_domain, @@ -605,18 +638,7 @@ impl SyncManager { repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), pending_sync_index: Arc::new(RwLock::new(HashMap::new())), - rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { - RejectedEventsIndex::with_metrics( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - metrics.clone(), - ) - } else { - RejectedEventsIndex::new( - Duration::from_secs(config.rejected_hot_cache_duration_secs), - Duration::from_secs(config.rejected_cold_index_expiry_secs), - ) - }), + rejected_events_index, connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, @@ -637,6 +659,31 @@ impl SyncManager { self.next_batch_id } + /// Get a clone of the rejected events index Arc. + /// + /// This allows access to the rejected events index for persistence + /// even after the SyncManager has been moved into a task. + /// + /// # Returns + /// Arc clone of the rejected events index + pub fn rejected_events_index(&self) -> Arc { + self.rejected_events_index.clone() + } + + /// Save rejected events index to disk. + /// + /// This is called during shutdown to persist the rejected events cache, + /// allowing us to avoid re-downloading rejected events after restart. + /// + /// # Arguments + /// * `path` - Path to save the rejected index file + /// + /// # Returns + /// Ok(()) on success, Err if save fails + pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box> { + self.rejected_events_index.save_to_disk(path) + } + /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 4d31901..f25f22a 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs @@ -86,12 +86,14 @@ //! ``` use nostr_sdk::{Event, EventId, PublicKey}; +use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; +use std::path::Path; use std::sync::{Arc, RwLock}; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; /// Type of event stored in the rejected events index -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum EventType { /// Repository announcement (kind 30617) Announcement, @@ -109,7 +111,7 @@ impl std::fmt::Display for EventType { } /// Reason why a repository announcement was rejected -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum RejectionReason { /// Announcement doesn't list this service in clone/web URLs DoesNotListService, @@ -141,6 +143,20 @@ struct HotCacheEntry { cached_at: Instant, } +/// Serializable version of HotCacheEntry for persistence +/// +/// Converts Instant to Duration offset from saved_at time +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SerializableHotCacheEntry { + event: Event, + pubkey: PublicKey, + identifier: String, + event_type: EventType, + reason: RejectionReason, + /// Duration since saved_at when this entry was cached + cached_at_offset_secs: u64, +} + /// Entry in the cold index (metadata only) /// /// Note: event_id is stored as the HashMap key, not in this struct @@ -154,6 +170,49 @@ struct ColdIndexEntry { rejected_at: Instant, } +/// Serializable version of ColdIndexEntry for persistence +/// +/// Converts Instant to Duration offset from saved_at time +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SerializableColdIndexEntry { + pubkey: PublicKey, + identifier: String, + event_type: EventType, + reason: RejectionReason, + /// Duration since saved_at when this entry was rejected + rejected_at_offset_secs: u64, +} + +/// Serializable state for hot cache +#[derive(Debug, Serialize, Deserialize)] +struct SerializableHotCache { + expiry_duration_secs: u64, + entries: HashMap, +} + +/// Serializable state for cold index +#[derive(Debug, Serialize, Deserialize)] +struct SerializableColdIndex { + expiry_duration_secs: u64, + entries: HashMap, +} + +/// Complete rejected cache state for persistence +/// +/// Stores both hot cache and cold index with version and timestamp information. +/// All Instant fields are converted to Duration offsets from saved_at. +#[derive(Debug, Serialize, Deserialize)] +struct RejectedCacheState { + /// Version for future compatibility + version: u32, + /// When this state was saved + saved_at: SystemTime, + /// Hot cache entries with full events + hot_cache: SerializableHotCache, + /// Cold index entries with metadata only + cold_index: SerializableColdIndex, +} + /// Hot cache: Stores full events for immediate re-processing /// /// Events are stored for a short duration (default: 2 minutes) to enable @@ -603,6 +662,168 @@ impl RejectedEventsIndex { ids } + + /// Save rejected events cache to disk + /// + /// Serializes both hot cache and cold index to JSON, converting Instant timestamps + /// to Duration offsets from the save time. This allows timestamps to be adjusted + /// for downtime when restored. + /// + /// # Arguments + /// + /// * `path` - File path to write the serialized state to + /// + /// # Returns + /// + /// Ok(()) on success, or an error if serialization or file write fails + pub fn save_to_disk(&self, path: &Path) -> Result<(), Box> { + let saved_at = SystemTime::now(); + let now = Instant::now(); + + // Lock both caches for consistent snapshot + let hot_entries = self.hot_cache.entries.read().unwrap(); + let cold_entries = self.cold_index.entries.read().unwrap(); + + // Convert hot cache entries to serializable format + let serializable_hot_entries: HashMap = hot_entries + .iter() + .map(|(event_id, entry)| { + let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs(); + + let serializable_entry = SerializableHotCacheEntry { + event: entry.event.clone(), + pubkey: entry.pubkey, + identifier: entry.identifier.clone(), + event_type: entry.event_type, + reason: entry.reason, + cached_at_offset_secs, + }; + + (*event_id, serializable_entry) + }) + .collect(); + + // Convert cold index entries to serializable format + let serializable_cold_entries: HashMap = cold_entries + .iter() + .map(|(event_id, entry)| { + let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs(); + + let serializable_entry = SerializableColdIndexEntry { + pubkey: entry.pubkey, + identifier: entry.identifier.clone(), + event_type: entry.event_type, + reason: entry.reason, + rejected_at_offset_secs, + }; + + (*event_id, serializable_entry) + }) + .collect(); + + // Create complete state + let state = RejectedCacheState { + version: 1, + saved_at, + hot_cache: SerializableHotCache { + expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(), + entries: serializable_hot_entries, + }, + cold_index: SerializableColdIndex { + expiry_duration_secs: self.cold_index.expiry_duration.as_secs(), + entries: serializable_cold_entries, + }, + }; + + // Serialize to JSON and write to file + let json = serde_json::to_string_pretty(&state)?; + std::fs::write(path, json)?; + + Ok(()) + } + + /// Restore rejected events cache from disk + /// + /// Loads the serialized state from disk and populates both hot cache and cold index. + /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain + /// correct expiry behavior. Deletes the state file after successful restore. + /// + /// # Arguments + /// + /// * `path` - File path to read the serialized state from + /// + /// # Returns + /// + /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails + pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box> { + // Load and parse JSON + let json = std::fs::read_to_string(path)?; + let state: RejectedCacheState = serde_json::from_str(&json)?; + + // Calculate downtime (how long the relay was offline) + let now_system = SystemTime::now(); + let downtime = now_system + .duration_since(state.saved_at) + .unwrap_or(Duration::ZERO); + + let now_instant = Instant::now(); + + // Lock both caches for restoration + let mut hot_entries = self.hot_cache.entries.write().unwrap(); + let mut cold_entries = self.cold_index.entries.write().unwrap(); + + // Restore hot cache entries + for (event_id, serializable_entry) in state.hot_cache.entries { + // Reconstruct cached_at by extending the offset by downtime + // Original offset (how long ago it was cached when saved) + let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs); + // Total offset including downtime + let total_offset = original_offset + downtime; + + // cached_at = now - total_offset + let cached_at = now_instant - total_offset; + + let entry = HotCacheEntry { + event: serializable_entry.event, + pubkey: serializable_entry.pubkey, + identifier: serializable_entry.identifier, + event_type: serializable_entry.event_type, + reason: serializable_entry.reason, + cached_at, + }; + + hot_entries.insert(event_id, entry); + } + + // Restore cold index entries + for (event_id, serializable_entry) in state.cold_index.entries { + // Reconstruct rejected_at by extending the offset by downtime + let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs); + let total_offset = original_offset + downtime; + + // rejected_at = now - total_offset + let rejected_at = now_instant - total_offset; + + let entry = ColdIndexEntry { + pubkey: serializable_entry.pubkey, + identifier: serializable_entry.identifier, + event_type: serializable_entry.event_type, + reason: serializable_entry.reason, + rejected_at, + }; + + cold_entries.insert(event_id, entry); + } + + // Release locks before deleting file + drop(hot_entries); + drop(cold_entries); + + // Delete the state file after successful restore + std::fs::remove_file(path)?; + + Ok(()) + } } #[cfg(test)] @@ -956,4 +1177,503 @@ mod tests { // Cold index now empty assert_eq!(index.cold_index_len(), 0); } + + // ======================================================================== + // Persistence Serialization Tests + // ======================================================================== + + #[tokio::test] + async fn test_save_and_restore_hot_cache_roundtrip() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let event = create_test_event().await; + let pubkey = event.pubkey; + let identifier = "test-repo".to_string(); + + // Add event to hot cache + index.add_announcement( + event.clone(), + pubkey, + identifier.clone(), + RejectionReason::DoesNotListService, + ); + + assert_eq!(index.hot_cache_len(), 1); + assert_eq!(index.cold_index_len(), 1); + + // Save to disk + index.save_to_disk(&state_path).unwrap(); + assert!(state_path.exists()); + + // Create new index and restore + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Verify state file was deleted after restore + assert!(!state_path.exists()); + + // Verify hot cache restored + assert_eq!(index2.hot_cache_len(), 1); + assert!(index2.hot_cache.contains(&event.id)); + + // Verify cold index restored + assert_eq!(index2.cold_index_len(), 1); + assert!(index2.cold_index.contains(&event.id)); + + // Verify we can retrieve the event + let events = index2 + .hot_cache + .get_maintainer_events(&pubkey, &identifier, None); + assert_eq!(events.len(), 1); + assert_eq!(events[0].id, event.id); + } + + #[tokio::test] + async fn test_save_and_restore_cold_index_only() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new( + Duration::from_millis(50), // Hot cache expires quickly + Duration::from_secs(604800), // Cold index lasts long + ); + let event = create_test_event().await; + + // Add event + index.add_announcement( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::MaintainerNotYetValid, + ); + + // Wait for hot cache to expire + std::thread::sleep(Duration::from_millis(60)); + index.cleanup_expired_for_type("announcement"); + + assert_eq!(index.hot_cache_len(), 0); + assert_eq!(index.cold_index_len(), 1); + + // Save to disk + index.save_to_disk(&state_path).unwrap(); + + // Restore into new index + let index2 = + RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Verify only cold index restored (hot cache was empty) + assert_eq!(index2.hot_cache_len(), 0); + assert_eq!(index2.cold_index_len(), 1); + assert!(index2.cold_index.contains(&event.id)); + } + + #[tokio::test] + async fn test_save_and_restore_both_hot_and_cold() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let keys = Keys::generate(); + + // Create two events + let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); + let event1 = keys.sign_event(unsigned1).await.unwrap(); + + let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); + let event2 = keys.sign_event(unsigned2).await.unwrap(); + + // Add both events + index.add_announcement( + event1.clone(), + event1.pubkey, + "repo1".to_string(), + RejectionReason::DoesNotListService, + ); + + index.add_state( + event2.clone(), + event2.pubkey, + "repo2".to_string(), + RejectionReason::Other, + ); + + assert_eq!(index.hot_cache_len(), 2); + assert_eq!(index.cold_index_len(), 2); + + // Save to disk + index.save_to_disk(&state_path).unwrap(); + + // Restore into new index + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Verify both caches restored + assert_eq!(index2.hot_cache_len(), 2); + assert_eq!(index2.cold_index_len(), 2); + assert!(index2.contains(&event1.id)); + assert!(index2.contains(&event2.id)); + } + + #[tokio::test] + async fn test_save_and_restore_empty_cache() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + + // Save empty cache + index.save_to_disk(&state_path).unwrap(); + assert!(state_path.exists()); + + // Restore into new index + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Verify empty state restored + assert_eq!(index2.hot_cache_len(), 0); + assert_eq!(index2.cold_index_len(), 0); + } + + #[tokio::test] + async fn test_restore_missing_file() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("nonexistent.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + + // Attempting to restore missing file should return error + let result = index.restore_from_disk(&state_path); + assert!(result.is_err()); + + // Index should remain empty + assert_eq!(index.hot_cache_len(), 0); + assert_eq!(index.cold_index_len(), 0); + } + + #[tokio::test] + async fn test_restore_corrupted_json() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("corrupted.json"); + + // Write corrupted JSON + std::fs::write(&state_path, "{ invalid json !!!").unwrap(); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + + // Attempting to restore corrupted file should return error + let result = index.restore_from_disk(&state_path); + assert!(result.is_err()); + + // Index should remain empty + assert_eq!(index.hot_cache_len(), 0); + assert_eq!(index.cold_index_len(), 0); + } + + #[tokio::test] + async fn test_file_cleanup_after_successful_restore() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let event = create_test_event().await; + + index.add_announcement( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + // Save to disk + index.save_to_disk(&state_path).unwrap(); + assert!(state_path.exists()); + + // Restore + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // File should be deleted after successful restore + assert!(!state_path.exists()); + } + + #[tokio::test] + async fn test_downtime_calculation_preserves_expiry() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let event = create_test_event().await; + + index.add_announcement( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + // Save to disk + index.save_to_disk(&state_path).unwrap(); + + // Simulate downtime by sleeping + std::thread::sleep(Duration::from_millis(100)); + + // Restore + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Event should still be in both caches (downtime accounted for) + assert_eq!(index2.hot_cache_len(), 1); + assert_eq!(index2.cold_index_len(), 1); + assert!(index2.contains(&event.id)); + } + + #[tokio::test] + async fn test_entries_expired_during_downtime() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + // Create index with very short expiry + let index = RejectedEventsIndex::new( + Duration::from_millis(100), // Hot cache: 100ms + Duration::from_millis(200), // Cold index: 200ms + ); + let event = create_test_event().await; + + index.add_announcement( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + // Save to disk + index.save_to_disk(&state_path).unwrap(); + + // Simulate downtime longer than hot cache expiry + std::thread::sleep(Duration::from_millis(150)); + + // Restore + let index2 = + RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200)); + index2.restore_from_disk(&state_path).unwrap(); + + // Hot cache entry should have expired during downtime + // Cold index should still have it (200ms expiry) + assert_eq!(index2.hot_cache_len(), 1); + assert_eq!(index2.cold_index_len(), 1); + + // But when we try to get it, hot cache will see it's expired + let events = index2 + .hot_cache + .get_maintainer_events(&event.pubkey, "test-repo", None); + assert_eq!(events.len(), 0); // Expired! + + // Cleanup should remove it + let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement"); + assert_eq!(hot_expired, 1); + assert_eq!(cold_expired, 0); // Not expired yet + } + + #[tokio::test] + async fn test_hot_cache_different_event_types() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let keys = Keys::generate(); + + // Create announcement event + let unsigned_ann = + nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); + let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); + + // Create state event + let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); + let event_state = keys.sign_event(unsigned_state).await.unwrap(); + + // Add both types + index.add_announcement( + event_ann.clone(), + event_ann.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + index.add_state( + event_state.clone(), + event_state.pubkey, + "test-repo".to_string(), + RejectionReason::Other, + ); + + // Save and restore + index.save_to_disk(&state_path).unwrap(); + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Verify both event types restored + assert_eq!(index2.hot_cache_len(), 2); + assert!(index2.contains(&event_ann.id)); + assert!(index2.contains(&event_state.id)); + + // Verify we can filter by type + let (removed, events) = index2.invalidate_and_get( + &event_ann.pubkey, + "test-repo", + Some(EventType::Announcement), + ); + assert_eq!(removed, 1); + assert_eq!(events.len(), 1); + assert_eq!(events[0].id, event_ann.id); + } + + #[tokio::test] + async fn test_cold_index_different_rejection_reasons() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let keys = Keys::generate(); + + // Create events with different rejection reasons + let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); + let event1 = keys.sign_event(unsigned1).await.unwrap(); + + let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); + let event2 = keys.sign_event(unsigned2).await.unwrap(); + + let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key()); + let event3 = keys.sign_event(unsigned3).await.unwrap(); + + // Add with different rejection reasons + index.add_announcement( + event1.clone(), + event1.pubkey, + "repo1".to_string(), + RejectionReason::DoesNotListService, + ); + + index.add_announcement( + event2.clone(), + event2.pubkey, + "repo2".to_string(), + RejectionReason::MaintainerNotYetValid, + ); + + index.add_announcement( + event3.clone(), + event3.pubkey, + "repo3".to_string(), + RejectionReason::Other, + ); + + // Save and restore + index.save_to_disk(&state_path).unwrap(); + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Verify all entries restored with their rejection reasons + assert_eq!(index2.cold_index_len(), 3); + assert!(index2.contains(&event1.id)); + assert!(index2.contains(&event2.id)); + assert!(index2.contains(&event3.id)); + } + + #[tokio::test] + async fn test_multiple_save_restore_cycles() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + // First cycle + let index1 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + let event1 = create_test_event().await; + + index1.add_announcement( + event1.clone(), + event1.pubkey, + "repo1".to_string(), + RejectionReason::DoesNotListService, + ); + + index1.save_to_disk(&state_path).unwrap(); + + // Second cycle - restore and add more + let index2 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + let event2 = create_test_event().await; + index2.add_announcement( + event2.clone(), + event2.pubkey, + "repo2".to_string(), + RejectionReason::MaintainerNotYetValid, + ); + + assert_eq!(index2.hot_cache_len(), 2); + index2.save_to_disk(&state_path).unwrap(); + + // Third cycle - restore again + let index3 = + RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); + index3.restore_from_disk(&state_path).unwrap(); + + // Verify both events survived multiple cycles + assert_eq!(index3.hot_cache_len(), 2); + assert!(index3.contains(&event1.id)); + assert!(index3.contains(&event2.id)); + } + + #[tokio::test] + async fn test_restore_preserves_remaining_ttl() { + let temp_dir = tempfile::tempdir().unwrap(); + let state_path = temp_dir.path().join("rejected_cache.json"); + + // Create index with 2 second hot cache expiry + let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); + let event = create_test_event().await; + + index.add_announcement( + event.clone(), + event.pubkey, + "test-repo".to_string(), + RejectionReason::DoesNotListService, + ); + + // Wait 200ms (small fraction of TTL) + std::thread::sleep(Duration::from_millis(200)); + + // Save to disk + index.save_to_disk(&state_path).unwrap(); + + // Immediately restore (minimal downtime) + let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); + index2.restore_from_disk(&state_path).unwrap(); + + // Event should still be retrievable (has ~1.8s remaining) + let events = index2 + .hot_cache + .get_maintainer_events(&event.pubkey, "test-repo", None); + assert_eq!(events.len(), 1); + + // Wait 2 seconds (total 2.2s > 2s expiry) + std::thread::sleep(Duration::from_secs(2)); + + // Now it should be expired + let events = index2 + .hot_cache + .get_maintainer_events(&event.pubkey, "test-repo", None); + assert_eq!(events.len(), 0); + } } -- cgit v1.2.3