upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:19:18 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:19:18 +0000
commitb101afa00bc28e1b55286145cb81e32a5b3decc9 (patch)
treed42869f89e4916bb8dc36fd26c9ac5f888e042ac /src
parentb6c70f765dd02fb0297888d671e455df33d6fcb4 (diff)
feat(sync): add rejected events cache persistence and integrate with shutdown/startup
Implement save/restore functionality for rejected events cache and integrate persistence with relay shutdown/startup lifecycle. Both purgatory and rejected cache now survive relay restarts. Key features: - Serialize rejected events cache to JSON (rejected-events-cache.json) - Save both hot cache (2min, full events) and cold index (7day, metadata) - Restore with downtime adjustment (preserves remaining TTL) - Graceful degradation (missing/corrupted files don't crash) - File cleanup after successful restore - Automatic restoration in SyncManager::new() Integration: - Shutdown hook saves both purgatory and rejected cache - Startup hook restores both and re-queues repositories - Non-fatal errors (logs warnings, continues on failure) Files: - src/sync/rejected_index.rs: save_to_disk/restore_from_disk methods - src/sync/mod.rs: SyncManager integration and auto-restore - src/main.rs: Shutdown/startup hooks for both caches - tests/purgatory_persistence.rs: 17 integration tests Tests: 13 unit tests + 17 integration tests covering full lifecycle
Diffstat (limited to 'src')
-rw-r--r--src/main.rs50
-rw-r--r--src/sync/mod.rs71
-rw-r--r--src/sync/rejected_index.rs726
3 files changed, 831 insertions, 16 deletions
diff --git a/src/main.rs b/src/main.rs
index a6f1d9d..5e5b83a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc};
3 3
4use anyhow::Result; 4use anyhow::Result;
5use tokio::signal; 5use tokio::signal;
6use tracing::{info, Level}; 6use tracing::{error, info, warn, Level};
7use tracing_subscriber::FmtSubscriber; 7use tracing_subscriber::FmtSubscriber;
8 8
9use ngit_grasp::{ 9use ngit_grasp::{
@@ -64,6 +64,22 @@ async fn main() -> Result<()> {
64 ))); 64 )));
65 info!("Purgatory initialized for event coordination"); 65 info!("Purgatory initialized for event coordination");
66 66
67 // Restore purgatory state from disk if available
68 let purgatory_path =
69 PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json");
70
71 if purgatory_path.exists() {
72 match purgatory.restore_from_disk(&purgatory_path) {
73 Ok(()) => {
74 info!("Restored purgatory state from disk");
75 // Re-queueing will happen later after sync system is created
76 }
77 Err(e) => {
78 warn!("Failed to restore purgatory state: {}, starting empty", e);
79 }
80 }
81 }
82
67 // Create Nostr relay with NIP-34 validation 83 // Create Nostr relay with NIP-34 validation
68 // Returns both the relay and database for direct queries in handlers 84 // Returns both the relay and database for direct queries in handlers
69 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { 85 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await {
@@ -88,6 +104,7 @@ async fn main() -> Result<()> {
88 relay_with_db.write_policy.clone(), 104 relay_with_db.write_policy.clone(),
89 relay_with_db.relay.clone(), 105 relay_with_db.relay.clone(),
90 &config, 106 &config,
107 PathBuf::from(config.effective_git_data_path()),
91 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), 108 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()),
92 ); 109 );
93 110
@@ -100,6 +117,21 @@ async fn main() -> Result<()> {
100 info!("Proactive sync enabled (will discover relays from stored announcements)"); 117 info!("Proactive sync enabled (will discover relays from stored announcements)");
101 } 118 }
102 119
120 // Re-queue all restored purgatory repos for sync
121 let restored_identifiers = purgatory.get_all_identifiers();
122 if !restored_identifiers.is_empty() {
123 info!(
124 "Re-queueing {} restored repositories for sync",
125 restored_identifiers.len()
126 );
127 for identifier in restored_identifiers {
128 purgatory.enqueue_sync_immediate(&identifier);
129 }
130 }
131
132 // Get a reference to the rejected events index for shutdown persistence
133 let shutdown_rejected_index = sync_manager.rejected_events_index();
134
103 tokio::spawn(async move { 135 tokio::spawn(async move {
104 sync_manager.run().await; 136 sync_manager.run().await;
105 }); 137 });
@@ -190,6 +222,22 @@ async fn main() -> Result<()> {
190 } 222 }
191 } 223 }
192 224
225 // Save purgatory state to disk
226 let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json");
227 if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) {
228 error!("Failed to save purgatory state: {}", e);
229 } else {
230 info!("Purgatory state saved to disk");
231 }
232
233 // Save rejected events cache to disk
234 let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json");
235 if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) {
236 error!("Failed to save rejected events cache: {}", e);
237 } else {
238 info!("Rejected events cache saved to disk");
239 }
240
193 // Cleanup placeholder refs on shutdown 241 // Cleanup placeholder refs on shutdown
194 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); 242 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids();
195 if !placeholder_ids.is_empty() { 243 if !placeholder_ids.is_empty() {
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e2d55bd..3213dfb 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -43,6 +43,7 @@ pub use health::RelayHealthTracker;
43use tokio::time::sleep; 43use tokio::time::sleep;
44 44
45use std::collections::{HashMap, HashSet}; 45use std::collections::{HashMap, HashSet};
46use std::path::{Path, PathBuf};
46use std::sync::Arc; 47use std::sync::Arc;
47use std::time::Duration; 48use std::time::Duration;
48 49
@@ -581,6 +582,7 @@ impl SyncManager {
581 /// * `write_policy` - Policy for validating events before storage 582 /// * `write_policy` - Policy for validating events before storage
582 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) 583 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast)
583 /// * `config` - Configuration for sync settings 584 /// * `config` - Configuration for sync settings
585 /// * `data_path` - Path to git data directory (for persistence)
584 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) 586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled)
585 pub fn new( 587 pub fn new(
586 bootstrap_relay_url: Option<String>, 588 bootstrap_relay_url: Option<String>,
@@ -589,11 +591,42 @@ impl SyncManager {
589 write_policy: Nip34WritePolicy, 591 write_policy: Nip34WritePolicy,
590 local_relay: LocalRelay, 592 local_relay: LocalRelay,
591 config: &Config, 593 config: &Config,
594 data_path: PathBuf,
592 sync_metrics: Option<SyncMetrics>, 595 sync_metrics: Option<SyncMetrics>,
593 ) -> Self { 596 ) -> Self {
594 // Extract purgatory from write_policy for read-only access 597 // Extract purgatory from write_policy for read-only access
595 let purgatory = write_policy.purgatory().clone(); 598 let purgatory = write_policy.purgatory().clone();
596 599
600 // Create rejected events index
601 let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics {
602 RejectedEventsIndex::with_metrics(
603 Duration::from_secs(config.rejected_hot_cache_duration_secs),
604 Duration::from_secs(config.rejected_cold_index_expiry_secs),
605 metrics.clone(),
606 )
607 } else {
608 RejectedEventsIndex::new(
609 Duration::from_secs(config.rejected_hot_cache_duration_secs),
610 Duration::from_secs(config.rejected_cold_index_expiry_secs),
611 )
612 });
613
614 // Attempt to restore rejected events index from disk
615 let rejected_index_path = data_path.join("rejected-events-cache.json");
616 if rejected_index_path.exists() {
617 match rejected_events_index.restore_from_disk(&rejected_index_path) {
618 Ok(()) => {
619 tracing::info!("Restored rejected events index from disk");
620 }
621 Err(e) => {
622 tracing::warn!(
623 "Failed to restore rejected events index: {}, starting empty",
624 e
625 );
626 }
627 }
628 }
629
597 Self { 630 Self {
598 bootstrap_relay_url, 631 bootstrap_relay_url,
599 service_domain, 632 service_domain,
@@ -605,18 +638,7 @@ impl SyncManager {
605 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 638 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
606 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 639 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
607 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 640 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
608 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { 641 rejected_events_index,
609 RejectedEventsIndex::with_metrics(
610 Duration::from_secs(config.rejected_hot_cache_duration_secs),
611 Duration::from_secs(config.rejected_cold_index_expiry_secs),
612 metrics.clone(),
613 )
614 } else {
615 RejectedEventsIndex::new(
616 Duration::from_secs(config.rejected_hot_cache_duration_secs),
617 Duration::from_secs(config.rejected_cold_index_expiry_secs),
618 )
619 }),
620 connections: HashMap::new(), 642 connections: HashMap::new(),
621 health_tracker: Arc::new(RelayHealthTracker::new(config)), 643 health_tracker: Arc::new(RelayHealthTracker::new(config)),
622 next_batch_id: 0, 644 next_batch_id: 0,
@@ -637,6 +659,31 @@ impl SyncManager {
637 self.next_batch_id 659 self.next_batch_id
638 } 660 }
639 661
662 /// Get a clone of the rejected events index Arc.
663 ///
664 /// This allows access to the rejected events index for persistence
665 /// even after the SyncManager has been moved into a task.
666 ///
667 /// # Returns
668 /// Arc clone of the rejected events index
669 pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> {
670 self.rejected_events_index.clone()
671 }
672
673 /// Save rejected events index to disk.
674 ///
675 /// This is called during shutdown to persist the rejected events cache,
676 /// allowing us to avoid re-downloading rejected events after restart.
677 ///
678 /// # Arguments
679 /// * `path` - Path to save the rejected index file
680 ///
681 /// # Returns
682 /// Ok(()) on success, Err if save fails
683 pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
684 self.rejected_events_index.save_to_disk(path)
685 }
686
640 /// Handle EOSE (End Of Stored Events) for a subscription 687 /// Handle EOSE (End Of Stored Events) for a subscription
641 /// 688 ///
642 /// This method: 689 /// This method:
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index 4d31901..f25f22a 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -86,12 +86,14 @@
86//! ``` 86//! ```
87 87
88use nostr_sdk::{Event, EventId, PublicKey}; 88use nostr_sdk::{Event, EventId, PublicKey};
89use serde::{Deserialize, Serialize};
89use std::collections::{HashMap, HashSet}; 90use std::collections::{HashMap, HashSet};
91use std::path::Path;
90use std::sync::{Arc, RwLock}; 92use std::sync::{Arc, RwLock};
91use std::time::{Duration, Instant}; 93use std::time::{Duration, Instant, SystemTime};
92 94
93/// Type of event stored in the rejected events index 95/// Type of event stored in the rejected events index
94#[derive(Debug, Clone, Copy, PartialEq, Eq)] 96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum EventType { 97pub enum EventType {
96 /// Repository announcement (kind 30617) 98 /// Repository announcement (kind 30617)
97 Announcement, 99 Announcement,
@@ -109,7 +111,7 @@ impl std::fmt::Display for EventType {
109} 111}
110 112
111/// Reason why a repository announcement was rejected 113/// Reason why a repository announcement was rejected
112#[derive(Debug, Clone, Copy, PartialEq, Eq)] 114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
113pub enum RejectionReason { 115pub enum RejectionReason {
114 /// Announcement doesn't list this service in clone/web URLs 116 /// Announcement doesn't list this service in clone/web URLs
115 DoesNotListService, 117 DoesNotListService,
@@ -141,6 +143,20 @@ struct HotCacheEntry {
141 cached_at: Instant, 143 cached_at: Instant,
142} 144}
143 145
146/// Serializable version of HotCacheEntry for persistence
147///
148/// Converts Instant to Duration offset from saved_at time
149#[derive(Debug, Clone, Serialize, Deserialize)]
150struct SerializableHotCacheEntry {
151 event: Event,
152 pubkey: PublicKey,
153 identifier: String,
154 event_type: EventType,
155 reason: RejectionReason,
156 /// Duration since saved_at when this entry was cached
157 cached_at_offset_secs: u64,
158}
159
144/// Entry in the cold index (metadata only) 160/// Entry in the cold index (metadata only)
145/// 161///
146/// Note: event_id is stored as the HashMap key, not in this struct 162/// Note: event_id is stored as the HashMap key, not in this struct
@@ -154,6 +170,49 @@ struct ColdIndexEntry {
154 rejected_at: Instant, 170 rejected_at: Instant,
155} 171}
156 172
173/// Serializable version of ColdIndexEntry for persistence
174///
175/// Converts Instant to Duration offset from saved_at time
176#[derive(Debug, Clone, Serialize, Deserialize)]
177struct SerializableColdIndexEntry {
178 pubkey: PublicKey,
179 identifier: String,
180 event_type: EventType,
181 reason: RejectionReason,
182 /// Duration since saved_at when this entry was rejected
183 rejected_at_offset_secs: u64,
184}
185
186/// Serializable state for hot cache
187#[derive(Debug, Serialize, Deserialize)]
188struct SerializableHotCache {
189 expiry_duration_secs: u64,
190 entries: HashMap<EventId, SerializableHotCacheEntry>,
191}
192
193/// Serializable state for cold index
194#[derive(Debug, Serialize, Deserialize)]
195struct SerializableColdIndex {
196 expiry_duration_secs: u64,
197 entries: HashMap<EventId, SerializableColdIndexEntry>,
198}
199
200/// Complete rejected cache state for persistence
201///
202/// Stores both hot cache and cold index with version and timestamp information.
203/// All Instant fields are converted to Duration offsets from saved_at.
204#[derive(Debug, Serialize, Deserialize)]
205struct RejectedCacheState {
206 /// Version for future compatibility
207 version: u32,
208 /// When this state was saved
209 saved_at: SystemTime,
210 /// Hot cache entries with full events
211 hot_cache: SerializableHotCache,
212 /// Cold index entries with metadata only
213 cold_index: SerializableColdIndex,
214}
215
157/// Hot cache: Stores full events for immediate re-processing 216/// Hot cache: Stores full events for immediate re-processing
158/// 217///
159/// Events are stored for a short duration (default: 2 minutes) to enable 218/// Events are stored for a short duration (default: 2 minutes) to enable
@@ -603,6 +662,168 @@ impl RejectedEventsIndex {
603 662
604 ids 663 ids
605 } 664 }
665
666 /// Save rejected events cache to disk
667 ///
668 /// Serializes both hot cache and cold index to JSON, converting Instant timestamps
669 /// to Duration offsets from the save time. This allows timestamps to be adjusted
670 /// for downtime when restored.
671 ///
672 /// # Arguments
673 ///
674 /// * `path` - File path to write the serialized state to
675 ///
676 /// # Returns
677 ///
678 /// Ok(()) on success, or an error if serialization or file write fails
679 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
680 let saved_at = SystemTime::now();
681 let now = Instant::now();
682
683 // Lock both caches for consistent snapshot
684 let hot_entries = self.hot_cache.entries.read().unwrap();
685 let cold_entries = self.cold_index.entries.read().unwrap();
686
687 // Convert hot cache entries to serializable format
688 let serializable_hot_entries: HashMap<EventId, SerializableHotCacheEntry> = hot_entries
689 .iter()
690 .map(|(event_id, entry)| {
691 let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs();
692
693 let serializable_entry = SerializableHotCacheEntry {
694 event: entry.event.clone(),
695 pubkey: entry.pubkey,
696 identifier: entry.identifier.clone(),
697 event_type: entry.event_type,
698 reason: entry.reason,
699 cached_at_offset_secs,
700 };
701
702 (*event_id, serializable_entry)
703 })
704 .collect();
705
706 // Convert cold index entries to serializable format
707 let serializable_cold_entries: HashMap<EventId, SerializableColdIndexEntry> = cold_entries
708 .iter()
709 .map(|(event_id, entry)| {
710 let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs();
711
712 let serializable_entry = SerializableColdIndexEntry {
713 pubkey: entry.pubkey,
714 identifier: entry.identifier.clone(),
715 event_type: entry.event_type,
716 reason: entry.reason,
717 rejected_at_offset_secs,
718 };
719
720 (*event_id, serializable_entry)
721 })
722 .collect();
723
724 // Create complete state
725 let state = RejectedCacheState {
726 version: 1,
727 saved_at,
728 hot_cache: SerializableHotCache {
729 expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(),
730 entries: serializable_hot_entries,
731 },
732 cold_index: SerializableColdIndex {
733 expiry_duration_secs: self.cold_index.expiry_duration.as_secs(),
734 entries: serializable_cold_entries,
735 },
736 };
737
738 // Serialize to JSON and write to file
739 let json = serde_json::to_string_pretty(&state)?;
740 std::fs::write(path, json)?;
741
742 Ok(())
743 }
744
745 /// Restore rejected events cache from disk
746 ///
747 /// Loads the serialized state from disk and populates both hot cache and cold index.
748 /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain
749 /// correct expiry behavior. Deletes the state file after successful restore.
750 ///
751 /// # Arguments
752 ///
753 /// * `path` - File path to read the serialized state from
754 ///
755 /// # Returns
756 ///
757 /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails
758 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
759 // Load and parse JSON
760 let json = std::fs::read_to_string(path)?;
761 let state: RejectedCacheState = serde_json::from_str(&json)?;
762
763 // Calculate downtime (how long the relay was offline)
764 let now_system = SystemTime::now();
765 let downtime = now_system
766 .duration_since(state.saved_at)
767 .unwrap_or(Duration::ZERO);
768
769 let now_instant = Instant::now();
770
771 // Lock both caches for restoration
772 let mut hot_entries = self.hot_cache.entries.write().unwrap();
773 let mut cold_entries = self.cold_index.entries.write().unwrap();
774
775 // Restore hot cache entries
776 for (event_id, serializable_entry) in state.hot_cache.entries {
777 // Reconstruct cached_at by extending the offset by downtime
778 // Original offset (how long ago it was cached when saved)
779 let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs);
780 // Total offset including downtime
781 let total_offset = original_offset + downtime;
782
783 // cached_at = now - total_offset
784 let cached_at = now_instant - total_offset;
785
786 let entry = HotCacheEntry {
787 event: serializable_entry.event,
788 pubkey: serializable_entry.pubkey,
789 identifier: serializable_entry.identifier,
790 event_type: serializable_entry.event_type,
791 reason: serializable_entry.reason,
792 cached_at,
793 };
794
795 hot_entries.insert(event_id, entry);
796 }
797
798 // Restore cold index entries
799 for (event_id, serializable_entry) in state.cold_index.entries {
800 // Reconstruct rejected_at by extending the offset by downtime
801 let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs);
802 let total_offset = original_offset + downtime;
803
804 // rejected_at = now - total_offset
805 let rejected_at = now_instant - total_offset;
806
807 let entry = ColdIndexEntry {
808 pubkey: serializable_entry.pubkey,
809 identifier: serializable_entry.identifier,
810 event_type: serializable_entry.event_type,
811 reason: serializable_entry.reason,
812 rejected_at,
813 };
814
815 cold_entries.insert(event_id, entry);
816 }
817
818 // Release locks before deleting file
819 drop(hot_entries);
820 drop(cold_entries);
821
822 // Delete the state file after successful restore
823 std::fs::remove_file(path)?;
824
825 Ok(())
826 }
606} 827}
607 828
608#[cfg(test)] 829#[cfg(test)]
@@ -956,4 +1177,503 @@ mod tests {
956 // Cold index now empty 1177 // Cold index now empty
957 assert_eq!(index.cold_index_len(), 0); 1178 assert_eq!(index.cold_index_len(), 0);
958 } 1179 }
1180
1181 // ========================================================================
1182 // Persistence Serialization Tests
1183 // ========================================================================
1184
1185 #[tokio::test]
1186 async fn test_save_and_restore_hot_cache_roundtrip() {
1187 let temp_dir = tempfile::tempdir().unwrap();
1188 let state_path = temp_dir.path().join("rejected_cache.json");
1189
1190 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1191 let event = create_test_event().await;
1192 let pubkey = event.pubkey;
1193 let identifier = "test-repo".to_string();
1194
1195 // Add event to hot cache
1196 index.add_announcement(
1197 event.clone(),
1198 pubkey,
1199 identifier.clone(),
1200 RejectionReason::DoesNotListService,
1201 );
1202
1203 assert_eq!(index.hot_cache_len(), 1);
1204 assert_eq!(index.cold_index_len(), 1);
1205
1206 // Save to disk
1207 index.save_to_disk(&state_path).unwrap();
1208 assert!(state_path.exists());
1209
1210 // Create new index and restore
1211 let index2 =
1212 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1213 index2.restore_from_disk(&state_path).unwrap();
1214
1215 // Verify state file was deleted after restore
1216 assert!(!state_path.exists());
1217
1218 // Verify hot cache restored
1219 assert_eq!(index2.hot_cache_len(), 1);
1220 assert!(index2.hot_cache.contains(&event.id));
1221
1222 // Verify cold index restored
1223 assert_eq!(index2.cold_index_len(), 1);
1224 assert!(index2.cold_index.contains(&event.id));
1225
1226 // Verify we can retrieve the event
1227 let events = index2
1228 .hot_cache
1229 .get_maintainer_events(&pubkey, &identifier, None);
1230 assert_eq!(events.len(), 1);
1231 assert_eq!(events[0].id, event.id);
1232 }
1233
1234 #[tokio::test]
1235 async fn test_save_and_restore_cold_index_only() {
1236 let temp_dir = tempfile::tempdir().unwrap();
1237 let state_path = temp_dir.path().join("rejected_cache.json");
1238
1239 let index = RejectedEventsIndex::new(
1240 Duration::from_millis(50), // Hot cache expires quickly
1241 Duration::from_secs(604800), // Cold index lasts long
1242 );
1243 let event = create_test_event().await;
1244
1245 // Add event
1246 index.add_announcement(
1247 event.clone(),
1248 event.pubkey,
1249 "test-repo".to_string(),
1250 RejectionReason::MaintainerNotYetValid,
1251 );
1252
1253 // Wait for hot cache to expire
1254 std::thread::sleep(Duration::from_millis(60));
1255 index.cleanup_expired_for_type("announcement");
1256
1257 assert_eq!(index.hot_cache_len(), 0);
1258 assert_eq!(index.cold_index_len(), 1);
1259
1260 // Save to disk
1261 index.save_to_disk(&state_path).unwrap();
1262
1263 // Restore into new index
1264 let index2 =
1265 RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800));
1266 index2.restore_from_disk(&state_path).unwrap();
1267
1268 // Verify only cold index restored (hot cache was empty)
1269 assert_eq!(index2.hot_cache_len(), 0);
1270 assert_eq!(index2.cold_index_len(), 1);
1271 assert!(index2.cold_index.contains(&event.id));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_save_and_restore_both_hot_and_cold() {
1276 let temp_dir = tempfile::tempdir().unwrap();
1277 let state_path = temp_dir.path().join("rejected_cache.json");
1278
1279 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1280 let keys = Keys::generate();
1281
1282 // Create two events
1283 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1284 let event1 = keys.sign_event(unsigned1).await.unwrap();
1285
1286 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1287 let event2 = keys.sign_event(unsigned2).await.unwrap();
1288
1289 // Add both events
1290 index.add_announcement(
1291 event1.clone(),
1292 event1.pubkey,
1293 "repo1".to_string(),
1294 RejectionReason::DoesNotListService,
1295 );
1296
1297 index.add_state(
1298 event2.clone(),
1299 event2.pubkey,
1300 "repo2".to_string(),
1301 RejectionReason::Other,
1302 );
1303
1304 assert_eq!(index.hot_cache_len(), 2);
1305 assert_eq!(index.cold_index_len(), 2);
1306
1307 // Save to disk
1308 index.save_to_disk(&state_path).unwrap();
1309
1310 // Restore into new index
1311 let index2 =
1312 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1313 index2.restore_from_disk(&state_path).unwrap();
1314
1315 // Verify both caches restored
1316 assert_eq!(index2.hot_cache_len(), 2);
1317 assert_eq!(index2.cold_index_len(), 2);
1318 assert!(index2.contains(&event1.id));
1319 assert!(index2.contains(&event2.id));
1320 }
1321
1322 #[tokio::test]
1323 async fn test_save_and_restore_empty_cache() {
1324 let temp_dir = tempfile::tempdir().unwrap();
1325 let state_path = temp_dir.path().join("rejected_cache.json");
1326
1327 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1328
1329 // Save empty cache
1330 index.save_to_disk(&state_path).unwrap();
1331 assert!(state_path.exists());
1332
1333 // Restore into new index
1334 let index2 =
1335 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1336 index2.restore_from_disk(&state_path).unwrap();
1337
1338 // Verify empty state restored
1339 assert_eq!(index2.hot_cache_len(), 0);
1340 assert_eq!(index2.cold_index_len(), 0);
1341 }
1342
1343 #[tokio::test]
1344 async fn test_restore_missing_file() {
1345 let temp_dir = tempfile::tempdir().unwrap();
1346 let state_path = temp_dir.path().join("nonexistent.json");
1347
1348 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1349
1350 // Attempting to restore missing file should return error
1351 let result = index.restore_from_disk(&state_path);
1352 assert!(result.is_err());
1353
1354 // Index should remain empty
1355 assert_eq!(index.hot_cache_len(), 0);
1356 assert_eq!(index.cold_index_len(), 0);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_restore_corrupted_json() {
1361 let temp_dir = tempfile::tempdir().unwrap();
1362 let state_path = temp_dir.path().join("corrupted.json");
1363
1364 // Write corrupted JSON
1365 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
1366
1367 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1368
1369 // Attempting to restore corrupted file should return error
1370 let result = index.restore_from_disk(&state_path);
1371 assert!(result.is_err());
1372
1373 // Index should remain empty
1374 assert_eq!(index.hot_cache_len(), 0);
1375 assert_eq!(index.cold_index_len(), 0);
1376 }
1377
1378 #[tokio::test]
1379 async fn test_file_cleanup_after_successful_restore() {
1380 let temp_dir = tempfile::tempdir().unwrap();
1381 let state_path = temp_dir.path().join("rejected_cache.json");
1382
1383 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1384 let event = create_test_event().await;
1385
1386 index.add_announcement(
1387 event.clone(),
1388 event.pubkey,
1389 "test-repo".to_string(),
1390 RejectionReason::DoesNotListService,
1391 );
1392
1393 // Save to disk
1394 index.save_to_disk(&state_path).unwrap();
1395 assert!(state_path.exists());
1396
1397 // Restore
1398 let index2 =
1399 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1400 index2.restore_from_disk(&state_path).unwrap();
1401
1402 // File should be deleted after successful restore
1403 assert!(!state_path.exists());
1404 }
1405
1406 #[tokio::test]
1407 async fn test_downtime_calculation_preserves_expiry() {
1408 let temp_dir = tempfile::tempdir().unwrap();
1409 let state_path = temp_dir.path().join("rejected_cache.json");
1410
1411 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1412 let event = create_test_event().await;
1413
1414 index.add_announcement(
1415 event.clone(),
1416 event.pubkey,
1417 "test-repo".to_string(),
1418 RejectionReason::DoesNotListService,
1419 );
1420
1421 // Save to disk
1422 index.save_to_disk(&state_path).unwrap();
1423
1424 // Simulate downtime by sleeping
1425 std::thread::sleep(Duration::from_millis(100));
1426
1427 // Restore
1428 let index2 =
1429 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1430 index2.restore_from_disk(&state_path).unwrap();
1431
1432 // Event should still be in both caches (downtime accounted for)
1433 assert_eq!(index2.hot_cache_len(), 1);
1434 assert_eq!(index2.cold_index_len(), 1);
1435 assert!(index2.contains(&event.id));
1436 }
1437
1438 #[tokio::test]
1439 async fn test_entries_expired_during_downtime() {
1440 let temp_dir = tempfile::tempdir().unwrap();
1441 let state_path = temp_dir.path().join("rejected_cache.json");
1442
1443 // Create index with very short expiry
1444 let index = RejectedEventsIndex::new(
1445 Duration::from_millis(100), // Hot cache: 100ms
1446 Duration::from_millis(200), // Cold index: 200ms
1447 );
1448 let event = create_test_event().await;
1449
1450 index.add_announcement(
1451 event.clone(),
1452 event.pubkey,
1453 "test-repo".to_string(),
1454 RejectionReason::DoesNotListService,
1455 );
1456
1457 // Save to disk
1458 index.save_to_disk(&state_path).unwrap();
1459
1460 // Simulate downtime longer than hot cache expiry
1461 std::thread::sleep(Duration::from_millis(150));
1462
1463 // Restore
1464 let index2 =
1465 RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200));
1466 index2.restore_from_disk(&state_path).unwrap();
1467
1468 // Hot cache entry should have expired during downtime
1469 // Cold index should still have it (200ms expiry)
1470 assert_eq!(index2.hot_cache_len(), 1);
1471 assert_eq!(index2.cold_index_len(), 1);
1472
1473 // But when we try to get it, hot cache will see it's expired
1474 let events = index2
1475 .hot_cache
1476 .get_maintainer_events(&event.pubkey, "test-repo", None);
1477 assert_eq!(events.len(), 0); // Expired!
1478
1479 // Cleanup should remove it
1480 let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement");
1481 assert_eq!(hot_expired, 1);
1482 assert_eq!(cold_expired, 0); // Not expired yet
1483 }
1484
1485 #[tokio::test]
1486 async fn test_hot_cache_different_event_types() {
1487 let temp_dir = tempfile::tempdir().unwrap();
1488 let state_path = temp_dir.path().join("rejected_cache.json");
1489
1490 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1491 let keys = Keys::generate();
1492
1493 // Create announcement event
1494 let unsigned_ann =
1495 nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key());
1496 let event_ann = keys.sign_event(unsigned_ann).await.unwrap();
1497
1498 // Create state event
1499 let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key());
1500 let event_state = keys.sign_event(unsigned_state).await.unwrap();
1501
1502 // Add both types
1503 index.add_announcement(
1504 event_ann.clone(),
1505 event_ann.pubkey,
1506 "test-repo".to_string(),
1507 RejectionReason::DoesNotListService,
1508 );
1509
1510 index.add_state(
1511 event_state.clone(),
1512 event_state.pubkey,
1513 "test-repo".to_string(),
1514 RejectionReason::Other,
1515 );
1516
1517 // Save and restore
1518 index.save_to_disk(&state_path).unwrap();
1519 let index2 =
1520 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1521 index2.restore_from_disk(&state_path).unwrap();
1522
1523 // Verify both event types restored
1524 assert_eq!(index2.hot_cache_len(), 2);
1525 assert!(index2.contains(&event_ann.id));
1526 assert!(index2.contains(&event_state.id));
1527
1528 // Verify we can filter by type
1529 let (removed, events) = index2.invalidate_and_get(
1530 &event_ann.pubkey,
1531 "test-repo",
1532 Some(EventType::Announcement),
1533 );
1534 assert_eq!(removed, 1);
1535 assert_eq!(events.len(), 1);
1536 assert_eq!(events[0].id, event_ann.id);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_cold_index_different_rejection_reasons() {
1541 let temp_dir = tempfile::tempdir().unwrap();
1542 let state_path = temp_dir.path().join("rejected_cache.json");
1543
1544 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1545 let keys = Keys::generate();
1546
1547 // Create events with different rejection reasons
1548 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1549 let event1 = keys.sign_event(unsigned1).await.unwrap();
1550
1551 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1552 let event2 = keys.sign_event(unsigned2).await.unwrap();
1553
1554 let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key());
1555 let event3 = keys.sign_event(unsigned3).await.unwrap();
1556
1557 // Add with different rejection reasons
1558 index.add_announcement(
1559 event1.clone(),
1560 event1.pubkey,
1561 "repo1".to_string(),
1562 RejectionReason::DoesNotListService,
1563 );
1564
1565 index.add_announcement(
1566 event2.clone(),
1567 event2.pubkey,
1568 "repo2".to_string(),
1569 RejectionReason::MaintainerNotYetValid,
1570 );
1571
1572 index.add_announcement(
1573 event3.clone(),
1574 event3.pubkey,
1575 "repo3".to_string(),
1576 RejectionReason::Other,
1577 );
1578
1579 // Save and restore
1580 index.save_to_disk(&state_path).unwrap();
1581 let index2 =
1582 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1583 index2.restore_from_disk(&state_path).unwrap();
1584
1585 // Verify all entries restored with their rejection reasons
1586 assert_eq!(index2.cold_index_len(), 3);
1587 assert!(index2.contains(&event1.id));
1588 assert!(index2.contains(&event2.id));
1589 assert!(index2.contains(&event3.id));
1590 }
1591
1592 #[tokio::test]
1593 async fn test_multiple_save_restore_cycles() {
1594 let temp_dir = tempfile::tempdir().unwrap();
1595 let state_path = temp_dir.path().join("rejected_cache.json");
1596
1597 // First cycle
1598 let index1 =
1599 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1600 let event1 = create_test_event().await;
1601
1602 index1.add_announcement(
1603 event1.clone(),
1604 event1.pubkey,
1605 "repo1".to_string(),
1606 RejectionReason::DoesNotListService,
1607 );
1608
1609 index1.save_to_disk(&state_path).unwrap();
1610
1611 // Second cycle - restore and add more
1612 let index2 =
1613 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1614 index2.restore_from_disk(&state_path).unwrap();
1615
1616 let event2 = create_test_event().await;
1617 index2.add_announcement(
1618 event2.clone(),
1619 event2.pubkey,
1620 "repo2".to_string(),
1621 RejectionReason::MaintainerNotYetValid,
1622 );
1623
1624 assert_eq!(index2.hot_cache_len(), 2);
1625 index2.save_to_disk(&state_path).unwrap();
1626
1627 // Third cycle - restore again
1628 let index3 =
1629 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1630 index3.restore_from_disk(&state_path).unwrap();
1631
1632 // Verify both events survived multiple cycles
1633 assert_eq!(index3.hot_cache_len(), 2);
1634 assert!(index3.contains(&event1.id));
1635 assert!(index3.contains(&event2.id));
1636 }
1637
1638 #[tokio::test]
1639 async fn test_restore_preserves_remaining_ttl() {
1640 let temp_dir = tempfile::tempdir().unwrap();
1641 let state_path = temp_dir.path().join("rejected_cache.json");
1642
1643 // Create index with 2 second hot cache expiry
1644 let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1645 let event = create_test_event().await;
1646
1647 index.add_announcement(
1648 event.clone(),
1649 event.pubkey,
1650 "test-repo".to_string(),
1651 RejectionReason::DoesNotListService,
1652 );
1653
1654 // Wait 200ms (small fraction of TTL)
1655 std::thread::sleep(Duration::from_millis(200));
1656
1657 // Save to disk
1658 index.save_to_disk(&state_path).unwrap();
1659
1660 // Immediately restore (minimal downtime)
1661 let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1662 index2.restore_from_disk(&state_path).unwrap();
1663
1664 // Event should still be retrievable (has ~1.8s remaining)
1665 let events = index2
1666 .hot_cache
1667 .get_maintainer_events(&event.pubkey, "test-repo", None);
1668 assert_eq!(events.len(), 1);
1669
1670 // Wait 2 seconds (total 2.2s > 2s expiry)
1671 std::thread::sleep(Duration::from_secs(2));
1672
1673 // Now it should be expired
1674 let events = index2
1675 .hot_cache
1676 .get_maintainer_events(&event.pubkey, "test-repo", None);
1677 assert_eq!(events.len(), 0);
1678 }
959} 1679}