upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:46:30 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:46:30 +0000
commit4c8f1813fada9ce2bfd371095b0721bff68173e3 (patch)
treed42869f89e4916bb8dc36fd26c9ac5f888e042ac /src
parent7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff)
parentb101afa00bc28e1b55286145cb81e32a5b3decc9 (diff)
Add purgatory persistence to survive relay restarts
Implement save/restore functionality for both purgatory state and rejected events cache. Events are now saved to disk on graceful shutdown and restored on startup, preventing data loss during relay restarts. Key features: - Purgatory state persisted to JSON (state events, PR events, expired events) - Rejected events cache persisted (hot cache + cold index) - Downtime adjustment preserves remaining TTL - Graceful degradation on missing/corrupted files - Automatic re-queueing of restored repositories - Comprehensive test coverage (45 tests)
Diffstat (limited to 'src')
-rw-r--r--src/main.rs50
-rw-r--r--src/purgatory/mod.rs925
-rw-r--r--src/purgatory/persistence.rs198
-rw-r--r--src/purgatory/types.rs20
-rw-r--r--src/sync/mod.rs71
-rw-r--r--src/sync/rejected_index.rs726
6 files changed, 1970 insertions, 20 deletions
diff --git a/src/main.rs b/src/main.rs
index a6f1d9d..5e5b83a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc};
3 3
4use anyhow::Result; 4use anyhow::Result;
5use tokio::signal; 5use tokio::signal;
6use tracing::{info, Level}; 6use tracing::{error, info, warn, Level};
7use tracing_subscriber::FmtSubscriber; 7use tracing_subscriber::FmtSubscriber;
8 8
9use ngit_grasp::{ 9use ngit_grasp::{
@@ -64,6 +64,22 @@ async fn main() -> Result<()> {
64 ))); 64 )));
65 info!("Purgatory initialized for event coordination"); 65 info!("Purgatory initialized for event coordination");
66 66
67 // Restore purgatory state from disk if available
68 let purgatory_path =
69 PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json");
70
71 if purgatory_path.exists() {
72 match purgatory.restore_from_disk(&purgatory_path) {
73 Ok(()) => {
74 info!("Restored purgatory state from disk");
75 // Re-queueing will happen later after sync system is created
76 }
77 Err(e) => {
78 warn!("Failed to restore purgatory state: {}, starting empty", e);
79 }
80 }
81 }
82
67 // Create Nostr relay with NIP-34 validation 83 // Create Nostr relay with NIP-34 validation
68 // Returns both the relay and database for direct queries in handlers 84 // Returns both the relay and database for direct queries in handlers
69 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { 85 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await {
@@ -88,6 +104,7 @@ async fn main() -> Result<()> {
88 relay_with_db.write_policy.clone(), 104 relay_with_db.write_policy.clone(),
89 relay_with_db.relay.clone(), 105 relay_with_db.relay.clone(),
90 &config, 106 &config,
107 PathBuf::from(config.effective_git_data_path()),
91 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), 108 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()),
92 ); 109 );
93 110
@@ -100,6 +117,21 @@ async fn main() -> Result<()> {
100 info!("Proactive sync enabled (will discover relays from stored announcements)"); 117 info!("Proactive sync enabled (will discover relays from stored announcements)");
101 } 118 }
102 119
120 // Re-queue all restored purgatory repos for sync
121 let restored_identifiers = purgatory.get_all_identifiers();
122 if !restored_identifiers.is_empty() {
123 info!(
124 "Re-queueing {} restored repositories for sync",
125 restored_identifiers.len()
126 );
127 for identifier in restored_identifiers {
128 purgatory.enqueue_sync_immediate(&identifier);
129 }
130 }
131
132 // Get a reference to the rejected events index for shutdown persistence
133 let shutdown_rejected_index = sync_manager.rejected_events_index();
134
103 tokio::spawn(async move { 135 tokio::spawn(async move {
104 sync_manager.run().await; 136 sync_manager.run().await;
105 }); 137 });
@@ -190,6 +222,22 @@ async fn main() -> Result<()> {
190 } 222 }
191 } 223 }
192 224
225 // Save purgatory state to disk
226 let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json");
227 if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) {
228 error!("Failed to save purgatory state: {}", e);
229 } else {
230 info!("Purgatory state saved to disk");
231 }
232
233 // Save rejected events cache to disk
234 let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json");
235 if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) {
236 error!("Failed to save rejected events cache: {}", e);
237 } else {
238 info!("Rejected events cache saved to disk");
239 }
240
193 // Cleanup placeholder refs on shutdown 241 // Cleanup placeholder refs on shutdown
194 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); 242 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids();
195 if !placeholder_ids.is_empty() { 243 if !placeholder_ids.is_empty() {
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 20df19b..47798a6 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -12,6 +12,7 @@
12//! - **Separate stores**: State events and PR events use different indexing strategies 12//! - **Separate stores**: State events and PR events use different indexing strategies
13 13
14mod helpers; 14mod helpers;
15pub mod persistence;
15pub mod sync; 16pub mod sync;
16mod types; 17mod types;
17 18
@@ -20,10 +21,12 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
20 21
21use dashmap::DashMap; 22use dashmap::DashMap;
22use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
23use std::collections::HashSet; 26use std::collections::HashSet;
24use std::path::PathBuf; 27use std::path::{Path, PathBuf};
25use std::sync::Arc; 28use std::sync::Arc;
26use std::time::{Duration, Instant}; 29use std::time::{Duration, Instant, SystemTime};
27 30
28pub use sync::SyncQueueEntry; 31pub use sync::SyncQueueEntry;
29 32
@@ -38,6 +41,63 @@ const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180);
38/// Used for batching burst arrivals during negentropy sync. 41/// Used for batching burst arrivals during negentropy sync.
39const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); 42const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500);
40 43
44/// Serializable wrapper for `StatePurgatoryEntry` with time offsets.
45///
46/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
47/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49struct SerializableStatePurgatoryEntry {
50 /// The nostr state event (kind 30618) awaiting git data
51 event: Event,
52 /// The repository identifier from the event's 'd' tag
53 identifier: String,
54 /// Event author pubkey
55 author: PublicKey,
56 /// Duration offset from saved_at for created_at
57 created_at_offset_secs: u64,
58 /// Duration offset from saved_at for expires_at
59 expires_at_offset_secs: u64,
60}
61
62/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
63///
64/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
65/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67struct SerializablePrPurgatoryEntry {
68 /// The nostr PR event, if received (None = git data arrived first)
69 event: Option<Event>,
70 /// The expected commit SHA from 'c' tag (if event exists)
71 /// or the actual commit pushed (if git arrived first)
72 commit: String,
73 /// Duration offset from saved_at for created_at
74 created_at_offset_secs: u64,
75 /// Duration offset from saved_at for expires_at
76 expires_at_offset_secs: u64,
77}
78
79/// Serializable purgatory state for disk persistence.
80///
81/// Contains all purgatory data needed to restore state across restarts:
82/// - State events (indexed by identifier)
83/// - PR events (indexed by event ID)
84/// - Expired events (to prevent re-sync loops)
85/// - Version number for future compatibility
86/// - Saved timestamp for downtime calculation
87#[derive(Debug, Clone, Serialize, Deserialize)]
88struct PurgatoryState {
89 /// Version number for state format (currently 1)
90 version: u32,
91 /// When this state was saved to disk
92 saved_at: SystemTime,
93 /// State events indexed by repository identifier
94 state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>,
95 /// PR events indexed by event ID (hex string)
96 pr_events: HashMap<String, SerializablePrPurgatoryEntry>,
97 /// Expired event IDs with their expiry timestamps
98 expired_events: HashMap<String, SystemTime>,
99}
100
41/// Main purgatory structure holding events awaiting git data. 101/// Main purgatory structure holding events awaiting git data.
42/// 102///
43/// Provides thread-safe concurrent access to two separate stores: 103/// Provides thread-safe concurrent access to two separate stores:
@@ -667,6 +727,260 @@ impl Purgatory {
667 pub fn sync_queue_size(&self) -> usize { 727 pub fn sync_queue_size(&self) -> usize {
668 self.sync_queue.len() 728 self.sync_queue.len()
669 } 729 }
730
731 /// Get all repository identifiers currently in purgatory.
732 ///
733 /// Returns a list of all unique repository identifiers that have state events
734 /// in purgatory. This is useful for re-queueing repositories after restore.
735 ///
736 /// # Returns
737 /// Vector of repository identifiers (e.g., "owner/repo")
738 ///
739 /// # Example
740 /// ```no_run
741 /// use ngit_grasp::purgatory::Purgatory;
742 /// use std::path::PathBuf;
743 ///
744 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
745 /// let identifiers = purgatory.get_all_identifiers();
746 /// for id in identifiers {
747 /// println!("Repository in purgatory: {}", id);
748 /// }
749 /// ```
750 pub fn get_all_identifiers(&self) -> Vec<String> {
751 self.state_events
752 .iter()
753 .map(|entry| entry.key().clone())
754 .collect()
755 }
756
757 /// Save purgatory state to disk.
758 ///
759 /// Serializes the current purgatory state (state_events, pr_events, expired_events)
760 /// to JSON and saves it to the specified path. Time-based fields (`Instant`) are
761 /// converted to duration offsets from the current `SystemTime` for persistence.
762 ///
763 /// Note: The sync_queue is NOT persisted - it will be rebuilt when events are
764 /// restored from disk.
765 ///
766 /// # Arguments
767 /// * `path` - Path to save the state file
768 ///
769 /// # Returns
770 /// Ok(()) on success, Err on failure
771 ///
772 /// # Example
773 /// ```no_run
774 /// use ngit_grasp::purgatory::Purgatory;
775 /// use std::path::PathBuf;
776 ///
777 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
778 /// purgatory.save_to_disk(&PathBuf::from("/tmp/purgatory.json")).unwrap();
779 /// ```
780 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
781 let saved_at = SystemTime::now();
782 let now_instant = Instant::now();
783
784 // Convert state_events to serializable format
785 let mut state_events = HashMap::new();
786 for entry in self.state_events.iter() {
787 let identifier = entry.key().clone();
788 let entries: Vec<SerializableStatePurgatoryEntry> = entry
789 .value()
790 .iter()
791 .map(|e| {
792 let created_offset =
793 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
794 let expires_offset =
795 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
796
797 SerializableStatePurgatoryEntry {
798 event: e.event.clone(),
799 identifier: e.identifier.clone(),
800 author: e.author,
801 created_at_offset_secs: created_offset.as_secs(),
802 expires_at_offset_secs: expires_offset.as_secs(),
803 }
804 })
805 .collect();
806 state_events.insert(identifier, entries);
807 }
808
809 // Convert pr_events to serializable format
810 let mut pr_events = HashMap::new();
811 for entry in self.pr_events.iter() {
812 let event_id = entry.key().clone();
813 let e = entry.value();
814
815 let created_offset =
816 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
817 let expires_offset =
818 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
819
820 let serializable = SerializablePrPurgatoryEntry {
821 event: e.event.clone(),
822 commit: e.commit.clone(),
823 created_at_offset_secs: created_offset.as_secs(),
824 expires_at_offset_secs: expires_offset.as_secs(),
825 };
826 pr_events.insert(event_id, serializable);
827 }
828
829 // Convert expired_events to serializable format
830 // We use SystemTime instead of Instant offsets for expired events since
831 // we don't need high precision for cleanup timing
832 let mut expired_events = HashMap::new();
833 for entry in self.expired_events.iter() {
834 let event_id = entry.key().to_hex();
835 // Convert Instant to SystemTime (approximate)
836 let expired_at_instant = *entry.value();
837 let elapsed_since_expire = now_instant.saturating_duration_since(expired_at_instant);
838 let expired_at_system = saved_at - elapsed_since_expire;
839 expired_events.insert(event_id, expired_at_system);
840 }
841
842 // Create state structure
843 let state = PurgatoryState {
844 version: 1,
845 saved_at,
846 state_events,
847 pr_events,
848 expired_events,
849 };
850
851 // Serialize to JSON and write to file
852 let json = serde_json::to_string_pretty(&state)?;
853 std::fs::write(path, json)?;
854
855 tracing::info!(
856 path = %path.display(),
857 state_events = state.state_events.len(),
858 pr_events = state.pr_events.len(),
859 expired_events = state.expired_events.len(),
860 "Saved purgatory state to disk"
861 );
862
863 Ok(())
864 }
865
866 /// Restore purgatory state from disk.
867 ///
868 /// Loads a previously saved purgatory state from the specified path and populates
869 /// the current purgatory instance. Adjusts time-based fields to account for downtime
870 /// between save and restore.
871 ///
872 /// After successful restore, the state file is deleted to prevent accidental
873 /// double-restore.
874 ///
875 /// # Arguments
876 /// * `path` - Path to the saved state file
877 ///
878 /// # Returns
879 /// Ok(()) on success, Err if file doesn't exist or is corrupted
880 ///
881 /// # Example
882 /// ```no_run
883 /// use ngit_grasp::purgatory::Purgatory;
884 /// use std::path::PathBuf;
885 ///
886 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
887 /// match purgatory.restore_from_disk(&PathBuf::from("/tmp/purgatory.json")) {
888 /// Ok(()) => println!("State restored successfully"),
889 /// Err(e) => eprintln!("Failed to restore state: {}", e),
890 /// }
891 /// ```
892 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
893 // Read and parse state file
894 let json = std::fs::read_to_string(path)?;
895 let state: PurgatoryState = serde_json::from_str(&json)?;
896
897 // Verify version
898 if state.version != 1 {
899 return Err(format!("Unsupported state version: {}", state.version).into());
900 }
901
902 let now_instant = Instant::now();
903
904 // Restore state_events
905 for (identifier, entries) in state.state_events {
906 let restored_entries: Vec<StatePurgatoryEntry> = entries
907 .into_iter()
908 .map(|e| {
909 let created_at = persistence::offset_to_instant(
910 Duration::from_secs(e.created_at_offset_secs),
911 state.saved_at,
912 now_instant,
913 );
914 let expires_at = persistence::offset_to_instant(
915 Duration::from_secs(e.expires_at_offset_secs),
916 state.saved_at,
917 now_instant,
918 );
919
920 StatePurgatoryEntry {
921 event: e.event,
922 identifier: e.identifier,
923 author: e.author,
924 created_at,
925 expires_at,
926 }
927 })
928 .collect();
929
930 self.state_events.insert(identifier, restored_entries);
931 }
932
933 // Restore pr_events
934 for (event_id, e) in state.pr_events {
935 let created_at = persistence::offset_to_instant(
936 Duration::from_secs(e.created_at_offset_secs),
937 state.saved_at,
938 now_instant,
939 );
940 let expires_at = persistence::offset_to_instant(
941 Duration::from_secs(e.expires_at_offset_secs),
942 state.saved_at,
943 now_instant,
944 );
945
946 let entry = PrPurgatoryEntry {
947 event: e.event,
948 commit: e.commit,
949 created_at,
950 expires_at,
951 };
952
953 self.pr_events.insert(event_id, entry);
954 }
955
956 // Restore expired_events
957 for (event_id_hex, expired_at_system) in state.expired_events {
958 if let Ok(event_id) = EventId::from_hex(&event_id_hex) {
959 // Convert SystemTime back to Instant (approximate)
960 let elapsed_since_expire = SystemTime::now()
961 .duration_since(expired_at_system)
962 .unwrap_or(Duration::ZERO);
963 let expired_at_instant = now_instant - elapsed_since_expire;
964
965 self.expired_events.insert(event_id, expired_at_instant);
966 }
967 }
968
969 tracing::info!(
970 path = %path.display(),
971 state_events = self.state_events.len(),
972 pr_events = self.pr_events.len(),
973 expired_events = self.expired_events.len(),
974 saved_at = ?state.saved_at,
975 "Restored purgatory state from disk"
976 );
977
978 // Delete state file after successful restore
979 std::fs::remove_file(path)?;
980 tracing::debug!(path = %path.display(), "Deleted state file after restore");
981
982 Ok(())
983 }
670} 984}
671 985
672#[cfg(test)] 986#[cfg(test)]
@@ -1249,3 +1563,610 @@ fn test_user_can_resubmit_expired_event() {
1249 // - Skip the expired check for user-submitted events 1563 // - Skip the expired check for user-submitted events
1250 // - Allow the event to be re-added to purgatory or accepted if git data now exists 1564 // - Allow the event to be re-added to purgatory or accepted if git data now exists
1251} 1565}
1566
1567// ============================================================================
1568// Persistence Serialization Tests
1569// ============================================================================
1570
1571#[tokio::test]
1572async fn test_save_and_restore_state_events() {
1573 use tempfile::tempdir;
1574
1575 let temp_dir = tempdir().unwrap();
1576 let state_file = temp_dir.path().join("purgatory_state.json");
1577
1578 let purgatory = Purgatory::new(PathBuf::new());
1579 let keys = Keys::generate();
1580
1581 // Add multiple state events for the same identifier
1582 let event1 = EventBuilder::text_note("state event 1")
1583 .sign_with_keys(&keys)
1584 .unwrap();
1585 let event2 = EventBuilder::text_note("state event 2")
1586 .sign_with_keys(&keys)
1587 .unwrap();
1588
1589 let event1_id = event1.id;
1590 let event2_id = event2.id;
1591
1592 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key());
1593 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key());
1594
1595 // Save to disk
1596 purgatory.save_to_disk(&state_file).unwrap();
1597
1598 // Verify file exists
1599 assert!(state_file.exists());
1600
1601 // Create new purgatory and restore
1602 let purgatory2 = Purgatory::new(PathBuf::new());
1603 purgatory2.restore_from_disk(&state_file).unwrap();
1604
1605 // Verify file was deleted after restore
1606 assert!(!state_file.exists());
1607
1608 // Verify state events were restored
1609 let (state_count, _) = purgatory2.count();
1610 assert_eq!(state_count, 2);
1611
1612 let restored_entries = purgatory2.find_state("test-repo");
1613 assert_eq!(restored_entries.len(), 2);
1614
1615 // Verify event IDs match
1616 let restored_ids: Vec<EventId> = restored_entries.iter().map(|e| e.event.id).collect();
1617 assert!(restored_ids.contains(&event1_id));
1618 assert!(restored_ids.contains(&event2_id));
1619
1620 // Verify identifiers and authors match
1621 for entry in &restored_entries {
1622 assert_eq!(entry.identifier, "test-repo");
1623 assert_eq!(entry.author, keys.public_key());
1624 }
1625}
1626
1627#[tokio::test]
1628async fn test_save_and_restore_pr_events() {
1629 use nostr_sdk::{Kind, Tag, TagKind};
1630 use tempfile::tempdir;
1631
1632 let temp_dir = tempdir().unwrap();
1633 let state_file = temp_dir.path().join("purgatory_state.json");
1634
1635 let purgatory = Purgatory::new(PathBuf::new());
1636 let keys = Keys::generate();
1637
1638 // Add PR event with actual event
1639 let tags = vec![Tag::custom(
1640 TagKind::Custom("a".into()),
1641 vec!["30617:abc123:test-repo".to_string()],
1642 )];
1643
1644 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
1645 .tags(tags)
1646 .sign_with_keys(&keys)
1647 .unwrap();
1648
1649 let pr_event_id = pr_event.id;
1650
1651 purgatory.add_pr(
1652 pr_event.clone(),
1653 "pr-event-id".to_string(),
1654 "commit-abc".to_string(),
1655 );
1656
1657 // Save to disk
1658 purgatory.save_to_disk(&state_file).unwrap();
1659
1660 // Create new purgatory and restore
1661 let purgatory2 = Purgatory::new(PathBuf::new());
1662 purgatory2.restore_from_disk(&state_file).unwrap();
1663
1664 // Verify PR event was restored
1665 let (_, pr_count) = purgatory2.count();
1666 assert_eq!(pr_count, 1);
1667
1668 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap();
1669 assert!(restored_entry.event.is_some());
1670 assert_eq!(restored_entry.event.unwrap().id, pr_event_id);
1671 assert_eq!(restored_entry.commit, "commit-abc");
1672}
1673
1674#[tokio::test]
1675async fn test_save_and_restore_pr_placeholders() {
1676 use tempfile::tempdir;
1677
1678 let temp_dir = tempdir().unwrap();
1679 let state_file = temp_dir.path().join("purgatory_state.json");
1680
1681 let purgatory = Purgatory::new(PathBuf::new());
1682
1683 // Add PR placeholder (git data arrived first)
1684 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-def".to_string());
1685
1686 // Save to disk
1687 purgatory.save_to_disk(&state_file).unwrap();
1688
1689 // Create new purgatory and restore
1690 let purgatory2 = Purgatory::new(PathBuf::new());
1691 purgatory2.restore_from_disk(&state_file).unwrap();
1692
1693 // Verify placeholder was restored
1694 let (_, pr_count) = purgatory2.count();
1695 assert_eq!(pr_count, 1);
1696
1697 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap();
1698 assert!(restored_entry.event.is_none()); // Still a placeholder
1699 assert_eq!(restored_entry.commit, "commit-def");
1700
1701 // Verify it's findable as a placeholder
1702 assert_eq!(
1703 purgatory2.find_pr_placeholder("placeholder-id"),
1704 Some("commit-def".to_string())
1705 );
1706}
1707
1708#[tokio::test]
1709async fn test_save_and_restore_expired_events() {
1710 use tempfile::tempdir;
1711
1712 let temp_dir = tempdir().unwrap();
1713 let state_file = temp_dir.path().join("purgatory_state.json");
1714
1715 let purgatory = Purgatory::new(PathBuf::new());
1716 let keys = Keys::generate();
1717
1718 let event = EventBuilder::text_note("test")
1719 .sign_with_keys(&keys)
1720 .unwrap();
1721 let event_id = event.id;
1722
1723 // Add and expire event
1724 purgatory.add_state(event, "repo".to_string(), keys.public_key());
1725 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1726 for entry in entries.iter_mut() {
1727 entry.expires_at = Instant::now() - Duration::from_secs(1);
1728 }
1729 }
1730 purgatory.cleanup();
1731
1732 // Verify event is marked as expired
1733 assert!(purgatory.is_expired(&event_id));
1734 assert_eq!(purgatory.expired_count(), 1);
1735
1736 // Save to disk
1737 purgatory.save_to_disk(&state_file).unwrap();
1738
1739 // Create new purgatory and restore
1740 let purgatory2 = Purgatory::new(PathBuf::new());
1741 purgatory2.restore_from_disk(&state_file).unwrap();
1742
1743 // Verify expired event was restored
1744 assert!(purgatory2.is_expired(&event_id));
1745 assert_eq!(purgatory2.expired_count(), 1);
1746
1747 // Verify it's included in event_ids()
1748 let ids = purgatory2.event_ids();
1749 assert!(ids.contains(&event_id));
1750}
1751
1752#[tokio::test]
1753async fn test_save_and_restore_empty_purgatory() {
1754 use tempfile::tempdir;
1755
1756 let temp_dir = tempdir().unwrap();
1757 let state_file = temp_dir.path().join("purgatory_state.json");
1758
1759 let purgatory = Purgatory::new(PathBuf::new());
1760
1761 // Save empty purgatory
1762 purgatory.save_to_disk(&state_file).unwrap();
1763
1764 // Verify file exists
1765 assert!(state_file.exists());
1766
1767 // Create new purgatory and restore
1768 let purgatory2 = Purgatory::new(PathBuf::new());
1769 purgatory2.restore_from_disk(&state_file).unwrap();
1770
1771 // Verify purgatory is still empty
1772 let (state_count, pr_count) = purgatory2.count();
1773 assert_eq!(state_count, 0);
1774 assert_eq!(pr_count, 0);
1775 assert_eq!(purgatory2.expired_count(), 0);
1776}
1777
1778#[tokio::test]
1779async fn test_restore_missing_file() {
1780 use tempfile::tempdir;
1781
1782 let temp_dir = tempdir().unwrap();
1783 let state_file = temp_dir.path().join("nonexistent.json");
1784
1785 let purgatory = Purgatory::new(PathBuf::new());
1786
1787 // Attempting to restore from missing file should error
1788 let result = purgatory.restore_from_disk(&state_file);
1789 assert!(result.is_err());
1790
1791 // Purgatory should remain empty
1792 let (state_count, pr_count) = purgatory.count();
1793 assert_eq!(state_count, 0);
1794 assert_eq!(pr_count, 0);
1795}
1796
1797#[tokio::test]
1798async fn test_restore_corrupted_json() {
1799 use tempfile::tempdir;
1800
1801 let temp_dir = tempdir().unwrap();
1802 let state_file = temp_dir.path().join("corrupted.json");
1803
1804 // Write invalid JSON
1805 std::fs::write(&state_file, "{ this is not valid json }").unwrap();
1806
1807 let purgatory = Purgatory::new(PathBuf::new());
1808
1809 // Attempting to restore corrupted file should error
1810 let result = purgatory.restore_from_disk(&state_file);
1811 assert!(result.is_err());
1812
1813 // Purgatory should remain empty
1814 let (state_count, pr_count) = purgatory.count();
1815 assert_eq!(state_count, 0);
1816 assert_eq!(pr_count, 0);
1817}
1818
1819#[tokio::test]
1820async fn test_restore_unsupported_version() {
1821 use tempfile::tempdir;
1822
1823 let temp_dir = tempdir().unwrap();
1824 let state_file = temp_dir.path().join("wrong_version.json");
1825
1826 // Write state with unsupported version
1827 let state = r#"{
1828 "version": 999,
1829 "saved_at": {"secs_since_epoch": 1000000000, "nanos_since_epoch": 0},
1830 "state_events": {},
1831 "pr_events": {},
1832 "expired_events": {}
1833 }"#;
1834 std::fs::write(&state_file, state).unwrap();
1835
1836 let purgatory = Purgatory::new(PathBuf::new());
1837
1838 // Attempting to restore unsupported version should error
1839 let result = purgatory.restore_from_disk(&state_file);
1840 assert!(result.is_err());
1841 assert!(result
1842 .unwrap_err()
1843 .to_string()
1844 .contains("Unsupported state version"));
1845}
1846
1847#[tokio::test]
1848async fn test_downtime_calculation() {
1849 use tempfile::tempdir;
1850 use tokio::time::sleep;
1851
1852 let temp_dir = tempdir().unwrap();
1853 let state_file = temp_dir.path().join("purgatory_state.json");
1854
1855 let purgatory = Purgatory::new(PathBuf::new());
1856 let keys = Keys::generate();
1857
1858 // Add state event
1859 let event = EventBuilder::text_note("test")
1860 .sign_with_keys(&keys)
1861 .unwrap();
1862
1863 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1864
1865 // Get original expiry time
1866 let original_entries = purgatory.find_state("repo");
1867 let original_entry = &original_entries[0];
1868 let original_expires_at = original_entry.expires_at;
1869 let original_remaining = original_expires_at.saturating_duration_since(Instant::now());
1870
1871 // Save to disk
1872 purgatory.save_to_disk(&state_file).unwrap();
1873
1874 // Simulate downtime (100ms)
1875 sleep(Duration::from_millis(100)).await;
1876
1877 // Create new purgatory and restore
1878 let purgatory2 = Purgatory::new(PathBuf::new());
1879 purgatory2.restore_from_disk(&state_file).unwrap();
1880
1881 // Get restored expiry time
1882 let restored_entries = purgatory2.find_state("repo");
1883 let restored_entry = &restored_entries[0];
1884 let restored_expires_at = restored_entry.expires_at;
1885 let restored_remaining = restored_expires_at.saturating_duration_since(Instant::now());
1886
1887 // Remaining time should be approximately the same (accounting for downtime)
1888 // Allow 2000ms tolerance for test execution time and sleep duration
1889 let diff = if restored_remaining > original_remaining {
1890 restored_remaining.as_millis() - original_remaining.as_millis()
1891 } else {
1892 original_remaining.as_millis() - restored_remaining.as_millis()
1893 };
1894
1895 assert!(
1896 diff < 2000,
1897 "Downtime calculation should preserve remaining TTL. Original: {}ms, Restored: {}ms, Diff: {}ms",
1898 original_remaining.as_millis(),
1899 restored_remaining.as_millis(),
1900 diff
1901 );
1902}
1903
1904#[tokio::test]
1905async fn test_expiry_times_preserved() {
1906 use tempfile::tempdir;
1907
1908 let temp_dir = tempdir().unwrap();
1909 let state_file = temp_dir.path().join("purgatory_state.json");
1910
1911 let purgatory = Purgatory::new(PathBuf::new());
1912 let keys = Keys::generate();
1913
1914 // Add state event
1915 let event = EventBuilder::text_note("test")
1916 .sign_with_keys(&keys)
1917 .unwrap();
1918
1919 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1920
1921 // Manually set expiry to a specific time in the future
1922 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
1923 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1924 for entry in entries.iter_mut() {
1925 entry.expires_at = custom_expiry;
1926 }
1927 }
1928
1929 // Save to disk
1930 purgatory.save_to_disk(&state_file).unwrap();
1931
1932 // Create new purgatory and restore
1933 let purgatory2 = Purgatory::new(PathBuf::new());
1934 purgatory2.restore_from_disk(&state_file).unwrap();
1935
1936 // Get restored expiry time
1937 let restored_entries = purgatory2.find_state("repo");
1938 let restored_entry = &restored_entries[0];
1939 let restored_remaining = restored_entry
1940 .expires_at
1941 .saturating_duration_since(Instant::now());
1942
1943 // Should be approximately 600 seconds (allow 3 second tolerance for test execution)
1944 assert!(
1945 restored_remaining.as_secs() >= 597 && restored_remaining.as_secs() <= 603,
1946 "Expected ~600s remaining, got {}s",
1947 restored_remaining.as_secs()
1948 );
1949}
1950
1951#[tokio::test]
1952async fn test_multiple_state_events_same_identifier() {
1953 use tempfile::tempdir;
1954
1955 let temp_dir = tempdir().unwrap();
1956 let state_file = temp_dir.path().join("purgatory_state.json");
1957
1958 let purgatory = Purgatory::new(PathBuf::new());
1959 let keys1 = Keys::generate();
1960 let keys2 = Keys::generate();
1961 let keys3 = Keys::generate();
1962
1963 // Add multiple state events for the same identifier from different authors
1964 let event1 = EventBuilder::text_note("maintainer 1")
1965 .sign_with_keys(&keys1)
1966 .unwrap();
1967 let event2 = EventBuilder::text_note("maintainer 2")
1968 .sign_with_keys(&keys2)
1969 .unwrap();
1970 let event3 = EventBuilder::text_note("maintainer 3")
1971 .sign_with_keys(&keys3)
1972 .unwrap();
1973
1974 purgatory.add_state(
1975 event1.clone(),
1976 "shared-repo".to_string(),
1977 keys1.public_key(),
1978 );
1979 purgatory.add_state(
1980 event2.clone(),
1981 "shared-repo".to_string(),
1982 keys2.public_key(),
1983 );
1984 purgatory.add_state(
1985 event3.clone(),
1986 "shared-repo".to_string(),
1987 keys3.public_key(),
1988 );
1989
1990 // Save to disk
1991 purgatory.save_to_disk(&state_file).unwrap();
1992
1993 // Create new purgatory and restore
1994 let purgatory2 = Purgatory::new(PathBuf::new());
1995 purgatory2.restore_from_disk(&state_file).unwrap();
1996
1997 // Verify all three events were restored
1998 let restored_entries = purgatory2.find_state("shared-repo");
1999 assert_eq!(restored_entries.len(), 3);
2000
2001 // Verify all authors are present
2002 let authors: Vec<PublicKey> = restored_entries.iter().map(|e| e.author).collect();
2003 assert!(authors.contains(&keys1.public_key()));
2004 assert!(authors.contains(&keys2.public_key()));
2005 assert!(authors.contains(&keys3.public_key()));
2006}
2007
2008#[tokio::test]
2009async fn test_mixed_pr_events_and_placeholders() {
2010 use nostr_sdk::{Kind, Tag, TagKind};
2011 use tempfile::tempdir;
2012
2013 let temp_dir = tempdir().unwrap();
2014 let state_file = temp_dir.path().join("purgatory_state.json");
2015
2016 let purgatory = Purgatory::new(PathBuf::new());
2017 let keys = Keys::generate();
2018
2019 // Add PR event with actual event
2020 let tags = vec![Tag::custom(
2021 TagKind::Custom("a".into()),
2022 vec!["30617:abc123:test-repo".to_string()],
2023 )];
2024
2025 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
2026 .tags(tags)
2027 .sign_with_keys(&keys)
2028 .unwrap();
2029
2030 purgatory.add_pr(
2031 pr_event.clone(),
2032 "pr-with-event".to_string(),
2033 "commit-abc".to_string(),
2034 );
2035
2036 // Add PR placeholder
2037 purgatory.add_pr_placeholder("pr-placeholder".to_string(), "commit-def".to_string());
2038
2039 // Save to disk
2040 purgatory.save_to_disk(&state_file).unwrap();
2041
2042 // Create new purgatory and restore
2043 let purgatory2 = Purgatory::new(PathBuf::new());
2044 purgatory2.restore_from_disk(&state_file).unwrap();
2045
2046 // Verify both were restored correctly
2047 let (_, pr_count) = purgatory2.count();
2048 assert_eq!(pr_count, 2);
2049
2050 // Verify PR event
2051 let pr_entry = purgatory2.find_pr("pr-with-event").unwrap();
2052 assert!(pr_entry.event.is_some());
2053 assert_eq!(pr_entry.commit, "commit-abc");
2054
2055 // Verify placeholder
2056 let placeholder_entry = purgatory2.find_pr("pr-placeholder").unwrap();
2057 assert!(placeholder_entry.event.is_none());
2058 assert_eq!(placeholder_entry.commit, "commit-def");
2059 assert_eq!(
2060 purgatory2.find_pr_placeholder("pr-placeholder"),
2061 Some("commit-def".to_string())
2062 );
2063}
2064
2065#[tokio::test]
2066async fn test_file_cleanup_after_successful_restore() {
2067 use tempfile::tempdir;
2068
2069 let temp_dir = tempdir().unwrap();
2070 let state_file = temp_dir.path().join("purgatory_state.json");
2071
2072 let purgatory = Purgatory::new(PathBuf::new());
2073 let keys = Keys::generate();
2074
2075 // Add some data
2076 let event = EventBuilder::text_note("test")
2077 .sign_with_keys(&keys)
2078 .unwrap();
2079 purgatory.add_state(event, "repo".to_string(), keys.public_key());
2080
2081 // Save to disk
2082 purgatory.save_to_disk(&state_file).unwrap();
2083 assert!(state_file.exists());
2084
2085 // Restore
2086 let purgatory2 = Purgatory::new(PathBuf::new());
2087 purgatory2.restore_from_disk(&state_file).unwrap();
2088
2089 // File should be deleted after successful restore
2090 assert!(!state_file.exists());
2091}
2092
2093#[tokio::test]
2094async fn test_comprehensive_roundtrip() {
2095 use nostr_sdk::{Kind, Tag, TagKind};
2096 use tempfile::tempdir;
2097
2098 let temp_dir = tempdir().unwrap();
2099 let state_file = temp_dir.path().join("purgatory_state.json");
2100
2101 let purgatory = Purgatory::new(PathBuf::new());
2102 let keys1 = Keys::generate();
2103 let keys2 = Keys::generate();
2104
2105 // Add multiple state events
2106 let state1 = EventBuilder::text_note("state 1")
2107 .sign_with_keys(&keys1)
2108 .unwrap();
2109 let state2 = EventBuilder::text_note("state 2")
2110 .sign_with_keys(&keys2)
2111 .unwrap();
2112
2113 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key());
2114 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key());
2115
2116 // Add PR event
2117 let tags = vec![Tag::custom(
2118 TagKind::Custom("a".into()),
2119 vec!["30617:abc123:repo1".to_string()],
2120 )];
2121 let pr_event = EventBuilder::new(Kind::from(1618), "PR")
2122 .tags(tags)
2123 .sign_with_keys(&keys1)
2124 .unwrap();
2125 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string());
2126
2127 // Add PR placeholder
2128 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
2129
2130 // Add and expire an event
2131 let expired_event = EventBuilder::text_note("expired")
2132 .sign_with_keys(&keys1)
2133 .unwrap();
2134 let expired_id = expired_event.id;
2135 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key());
2136 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2137 for entry in entries.iter_mut() {
2138 entry.expires_at = Instant::now() - Duration::from_secs(1);
2139 }
2140 }
2141 purgatory.cleanup();
2142
2143 // Verify initial state
2144 let (state_count, pr_count) = purgatory.count();
2145 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up)
2146 assert_eq!(pr_count, 2); // pr-1, pr-2
2147 assert_eq!(purgatory.expired_count(), 1); // expired_event
2148
2149 // Save to disk
2150 purgatory.save_to_disk(&state_file).unwrap();
2151
2152 // Create new purgatory and restore
2153 let purgatory2 = Purgatory::new(PathBuf::new());
2154 purgatory2.restore_from_disk(&state_file).unwrap();
2155
2156 // Verify all data was restored correctly
2157 let (state_count2, pr_count2) = purgatory2.count();
2158 assert_eq!(state_count2, 2);
2159 assert_eq!(pr_count2, 2);
2160 assert_eq!(purgatory2.expired_count(), 1);
2161
2162 // Verify state events
2163 assert_eq!(purgatory2.find_state("repo1").len(), 1);
2164 assert_eq!(purgatory2.find_state("repo2").len(), 1);
2165
2166 // Verify PR events
2167 assert!(purgatory2.find_pr("pr-1").unwrap().event.is_some());
2168 assert!(purgatory2.find_pr("pr-2").unwrap().event.is_none());
2169
2170 // Verify expired event
2171 assert!(purgatory2.is_expired(&expired_id));
2172}
diff --git a/src/purgatory/persistence.rs b/src/purgatory/persistence.rs
new file mode 100644
index 0000000..7fca2cf
--- /dev/null
+++ b/src/purgatory/persistence.rs
@@ -0,0 +1,198 @@
1//! Persistence utilities for purgatory state.
2//!
3//! This module provides conversion functions between `Instant` (which cannot be
4//! serialized) and `Duration` offsets from a reference `SystemTime`. This allows
5//! purgatory state to be persisted to disk and restored across restarts.
6//!
7//! ## Time Handling
8//!
9//! - `Instant` is monotonic but cannot be serialized
10//! - `SystemTime` can be serialized but may go backwards (NTP, user changes)
11//! - We use `SystemTime` for persistence and convert to/from `Instant` at runtime
12//! - Downtime is accounted for when restoring state (elapsed time is preserved)
13
14use std::time::{Duration, Instant, SystemTime};
15
16/// Convert an `Instant` to a `Duration` offset from a reference `SystemTime`.
17///
18/// This allows storing an `Instant` as a serializable offset that can be
19/// restored later, accounting for system downtime.
20///
21/// # Arguments
22/// * `instant` - The `Instant` to convert
23/// * `reference_time` - The reference `SystemTime` (typically SystemTime::now())
24/// * `reference_instant` - The corresponding `Instant` (typically Instant::now())
25///
26/// # Returns
27/// Duration offset from the reference time
28///
29/// # Example
30/// ```
31/// use std::time::{Duration, Instant, SystemTime};
32/// use ngit_grasp::purgatory::persistence::instant_to_offset;
33///
34/// let now_system = SystemTime::now();
35/// let now_instant = Instant::now();
36/// let future = now_instant + Duration::from_secs(60);
37///
38/// let offset = instant_to_offset(future, now_system, now_instant);
39/// assert!(offset.as_secs() >= 60);
40/// ```
41pub fn instant_to_offset(
42 instant: Instant,
43 _reference_time: SystemTime,
44 reference_instant: Instant,
45) -> Duration {
46 if instant >= reference_instant {
47 // Future instant - return positive offset
48 instant.duration_since(reference_instant)
49 } else {
50 // Past instant - this shouldn't happen in normal operation,
51 // but we handle it by returning zero duration
52 Duration::ZERO
53 }
54}
55
56/// Convert a `Duration` offset back to an `Instant`, accounting for downtime.
57///
58/// This restores an `Instant` from a serialized offset, adjusting for the time
59/// that has elapsed since the state was saved.
60///
61/// # Arguments
62/// * `offset` - The duration offset from the saved reference time
63/// * `saved_at` - The `SystemTime` when the state was saved
64/// * `reference_instant` - The current `Instant` (typically Instant::now())
65///
66/// # Returns
67/// The restored `Instant`, adjusted for downtime
68///
69/// # Example
70/// ```
71/// use std::time::{Duration, Instant, SystemTime};
72/// use ngit_grasp::purgatory::persistence::offset_to_instant;
73///
74/// let saved_at = SystemTime::now();
75/// let offset = Duration::from_secs(60);
76/// let now_instant = Instant::now();
77///
78/// let restored = offset_to_instant(offset, saved_at, now_instant);
79/// // restored will be approximately now_instant + 60 seconds
80/// ```
81pub fn offset_to_instant(
82 offset: Duration,
83 saved_at: SystemTime,
84 reference_instant: Instant,
85) -> Instant {
86 // Calculate how much time has elapsed since the state was saved
87 let now_system = SystemTime::now();
88 let elapsed_since_save = now_system
89 .duration_since(saved_at)
90 .unwrap_or(Duration::ZERO);
91
92 // The original deadline was: saved_at + offset
93 // Time remaining = (saved_at + offset) - now_system
94 // = offset - elapsed_since_save
95
96 if offset > elapsed_since_save {
97 // Deadline is still in the future
98 let remaining = offset - elapsed_since_save;
99 reference_instant + remaining
100 } else {
101 // Deadline has already passed or is right now
102 reference_instant
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use std::thread;
110 use std::time::Duration;
111
112 #[test]
113 fn test_instant_to_offset_future() {
114 let now_system = SystemTime::now();
115 let now_instant = Instant::now();
116 let future = now_instant + Duration::from_secs(60);
117
118 let offset = instant_to_offset(future, now_system, now_instant);
119
120 // Should be approximately 60 seconds (within tolerance)
121 assert!(offset.as_secs() >= 59 && offset.as_secs() <= 61);
122 }
123
124 #[test]
125 fn test_instant_to_offset_past() {
126 let now_system = SystemTime::now();
127 let past_instant = Instant::now();
128 // Simulate some time passing
129 thread::sleep(Duration::from_millis(10));
130 let now_instant = Instant::now();
131
132 let offset = instant_to_offset(past_instant, now_system, now_instant);
133
134 // Past instants return zero duration
135 assert_eq!(offset, Duration::ZERO);
136 }
137
138 #[test]
139 fn test_offset_to_instant_with_time_remaining() {
140 let saved_at = SystemTime::now();
141 let offset = Duration::from_secs(60);
142
143 // Simulate a very short downtime (< 10ms)
144 thread::sleep(Duration::from_millis(5));
145
146 let now_instant = Instant::now();
147 let restored = offset_to_instant(offset, saved_at, now_instant);
148
149 // Should be approximately 60 seconds in the future
150 let remaining = restored.duration_since(now_instant);
151 assert!(
152 remaining.as_secs() >= 59 && remaining.as_secs() <= 61,
153 "Expected ~60s, got {}s",
154 remaining.as_secs()
155 );
156 }
157
158 #[test]
159 fn test_offset_to_instant_deadline_passed() {
160 // Simulate state saved 70 seconds ago with 60 second offset
161 let saved_at = SystemTime::now() - Duration::from_secs(70);
162 let offset = Duration::from_secs(60);
163
164 let now_instant = Instant::now();
165 let restored = offset_to_instant(offset, saved_at, now_instant);
166
167 // Deadline has passed, should be now or in the past
168 let remaining = restored.saturating_duration_since(now_instant);
169 assert_eq!(remaining, Duration::ZERO);
170 }
171
172 #[test]
173 fn test_round_trip_conversion() {
174 let now_system = SystemTime::now();
175 let now_instant = Instant::now();
176 let future = now_instant + Duration::from_secs(120);
177
178 // Convert to offset
179 let offset = instant_to_offset(future, now_system, now_instant);
180
181 // Immediately convert back (minimal downtime)
182 let restored = offset_to_instant(offset, now_system, now_instant);
183
184 // Should be very close to the original future instant
185 let diff = if restored > future {
186 restored.duration_since(future)
187 } else {
188 future.duration_since(restored)
189 };
190
191 // Allow for small timing differences (< 100ms)
192 assert!(
193 diff < Duration::from_millis(100),
194 "Round trip should preserve instant within 100ms, got {}ms",
195 diff.as_millis()
196 );
197 }
198}
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index 9c47616..919504b 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -5,8 +5,14 @@
5//! problem where either the nostr event or git push can arrive first. 5//! problem where either the nostr event or git push can arrive first.
6 6
7use nostr_sdk::prelude::*; 7use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize};
8use std::time::Instant; 9use std::time::Instant;
9 10
11/// Default value for Instant fields during deserialization
12fn instant_now() -> Instant {
13 Instant::now()
14}
15
10/// A reference name and its target object. 16/// A reference name and its target object.
11/// 17///
12/// Used to identify specific git refs (branches, tags) that a state event 18/// Used to identify specific git refs (branches, tags) that a state event
@@ -59,7 +65,10 @@ impl RefUpdate {
59/// State events declare the current state of a repository but may arrive 65/// State events declare the current state of a repository but may arrive
60/// before the corresponding git data has been pushed. This entry holds 66/// before the corresponding git data has been pushed. This entry holds
61/// the event and associated metadata until the git data arrives. 67/// the event and associated metadata until the git data arrives.
62#[derive(Debug, Clone)] 68///
69/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
70/// module to convert to/from serializable wrapper types.
71#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct StatePurgatoryEntry { 72pub struct StatePurgatoryEntry {
64 /// The nostr state event (kind 30618) awaiting git data 73 /// The nostr state event (kind 30618) awaiting git data
65 pub event: Event, 74 pub event: Event,
@@ -71,9 +80,11 @@ pub struct StatePurgatoryEntry {
71 pub author: PublicKey, 80 pub author: PublicKey,
72 81
73 /// When this entry was added to purgatory 82 /// When this entry was added to purgatory
83 #[serde(skip, default = "instant_now")]
74 pub created_at: Instant, 84 pub created_at: Instant,
75 85
76 /// Expiry deadline (30 min from creation, may be extended) 86 /// Expiry deadline (30 min from creation, may be extended)
87 #[serde(skip, default = "instant_now")]
77 pub expires_at: Instant, 88 pub expires_at: Instant,
78} 89}
79 90
@@ -82,7 +93,10 @@ pub struct StatePurgatoryEntry {
82/// PR events reference specific commits but may arrive before the git push 93/// PR events reference specific commits but may arrive before the git push
83/// containing those commits. Alternatively, a git push may arrive first, 94/// containing those commits. Alternatively, a git push may arrive first,
84/// creating a placeholder entry waiting for the corresponding PR event. 95/// creating a placeholder entry waiting for the corresponding PR event.
85#[derive(Debug, Clone)] 96///
97/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
98/// module to convert to/from serializable wrapper types.
99#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PrPurgatoryEntry { 100pub struct PrPurgatoryEntry {
87 /// The nostr PR event, if received (None = git data arrived first) 101 /// The nostr PR event, if received (None = git data arrived first)
88 pub event: Option<Event>, 102 pub event: Option<Event>,
@@ -92,8 +106,10 @@ pub struct PrPurgatoryEntry {
92 pub commit: String, 106 pub commit: String,
93 107
94 /// When this entry was added to purgatory 108 /// When this entry was added to purgatory
109 #[serde(skip, default = "instant_now")]
95 pub created_at: Instant, 110 pub created_at: Instant,
96 111
97 /// Expiry deadline (30 min from creation, may be extended) 112 /// Expiry deadline (30 min from creation, may be extended)
113 #[serde(skip, default = "instant_now")]
98 pub expires_at: Instant, 114 pub expires_at: Instant,
99} 115}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e2d55bd..3213dfb 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -43,6 +43,7 @@ pub use health::RelayHealthTracker;
43use tokio::time::sleep; 43use tokio::time::sleep;
44 44
45use std::collections::{HashMap, HashSet}; 45use std::collections::{HashMap, HashSet};
46use std::path::{Path, PathBuf};
46use std::sync::Arc; 47use std::sync::Arc;
47use std::time::Duration; 48use std::time::Duration;
48 49
@@ -581,6 +582,7 @@ impl SyncManager {
581 /// * `write_policy` - Policy for validating events before storage 582 /// * `write_policy` - Policy for validating events before storage
582 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) 583 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast)
583 /// * `config` - Configuration for sync settings 584 /// * `config` - Configuration for sync settings
585 /// * `data_path` - Path to git data directory (for persistence)
584 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) 586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled)
585 pub fn new( 587 pub fn new(
586 bootstrap_relay_url: Option<String>, 588 bootstrap_relay_url: Option<String>,
@@ -589,11 +591,42 @@ impl SyncManager {
589 write_policy: Nip34WritePolicy, 591 write_policy: Nip34WritePolicy,
590 local_relay: LocalRelay, 592 local_relay: LocalRelay,
591 config: &Config, 593 config: &Config,
594 data_path: PathBuf,
592 sync_metrics: Option<SyncMetrics>, 595 sync_metrics: Option<SyncMetrics>,
593 ) -> Self { 596 ) -> Self {
594 // Extract purgatory from write_policy for read-only access 597 // Extract purgatory from write_policy for read-only access
595 let purgatory = write_policy.purgatory().clone(); 598 let purgatory = write_policy.purgatory().clone();
596 599
600 // Create rejected events index
601 let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics {
602 RejectedEventsIndex::with_metrics(
603 Duration::from_secs(config.rejected_hot_cache_duration_secs),
604 Duration::from_secs(config.rejected_cold_index_expiry_secs),
605 metrics.clone(),
606 )
607 } else {
608 RejectedEventsIndex::new(
609 Duration::from_secs(config.rejected_hot_cache_duration_secs),
610 Duration::from_secs(config.rejected_cold_index_expiry_secs),
611 )
612 });
613
614 // Attempt to restore rejected events index from disk
615 let rejected_index_path = data_path.join("rejected-events-cache.json");
616 if rejected_index_path.exists() {
617 match rejected_events_index.restore_from_disk(&rejected_index_path) {
618 Ok(()) => {
619 tracing::info!("Restored rejected events index from disk");
620 }
621 Err(e) => {
622 tracing::warn!(
623 "Failed to restore rejected events index: {}, starting empty",
624 e
625 );
626 }
627 }
628 }
629
597 Self { 630 Self {
598 bootstrap_relay_url, 631 bootstrap_relay_url,
599 service_domain, 632 service_domain,
@@ -605,18 +638,7 @@ impl SyncManager {
605 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 638 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
606 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 639 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
607 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 640 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
608 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { 641 rejected_events_index,
609 RejectedEventsIndex::with_metrics(
610 Duration::from_secs(config.rejected_hot_cache_duration_secs),
611 Duration::from_secs(config.rejected_cold_index_expiry_secs),
612 metrics.clone(),
613 )
614 } else {
615 RejectedEventsIndex::new(
616 Duration::from_secs(config.rejected_hot_cache_duration_secs),
617 Duration::from_secs(config.rejected_cold_index_expiry_secs),
618 )
619 }),
620 connections: HashMap::new(), 642 connections: HashMap::new(),
621 health_tracker: Arc::new(RelayHealthTracker::new(config)), 643 health_tracker: Arc::new(RelayHealthTracker::new(config)),
622 next_batch_id: 0, 644 next_batch_id: 0,
@@ -637,6 +659,31 @@ impl SyncManager {
637 self.next_batch_id 659 self.next_batch_id
638 } 660 }
639 661
662 /// Get a clone of the rejected events index Arc.
663 ///
664 /// This allows access to the rejected events index for persistence
665 /// even after the SyncManager has been moved into a task.
666 ///
667 /// # Returns
668 /// Arc clone of the rejected events index
669 pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> {
670 self.rejected_events_index.clone()
671 }
672
673 /// Save rejected events index to disk.
674 ///
675 /// This is called during shutdown to persist the rejected events cache,
676 /// allowing us to avoid re-downloading rejected events after restart.
677 ///
678 /// # Arguments
679 /// * `path` - Path to save the rejected index file
680 ///
681 /// # Returns
682 /// Ok(()) on success, Err if save fails
683 pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
684 self.rejected_events_index.save_to_disk(path)
685 }
686
640 /// Handle EOSE (End Of Stored Events) for a subscription 687 /// Handle EOSE (End Of Stored Events) for a subscription
641 /// 688 ///
642 /// This method: 689 /// This method:
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index 4d31901..f25f22a 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -86,12 +86,14 @@
86//! ``` 86//! ```
87 87
88use nostr_sdk::{Event, EventId, PublicKey}; 88use nostr_sdk::{Event, EventId, PublicKey};
89use serde::{Deserialize, Serialize};
89use std::collections::{HashMap, HashSet}; 90use std::collections::{HashMap, HashSet};
91use std::path::Path;
90use std::sync::{Arc, RwLock}; 92use std::sync::{Arc, RwLock};
91use std::time::{Duration, Instant}; 93use std::time::{Duration, Instant, SystemTime};
92 94
93/// Type of event stored in the rejected events index 95/// Type of event stored in the rejected events index
94#[derive(Debug, Clone, Copy, PartialEq, Eq)] 96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum EventType { 97pub enum EventType {
96 /// Repository announcement (kind 30617) 98 /// Repository announcement (kind 30617)
97 Announcement, 99 Announcement,
@@ -109,7 +111,7 @@ impl std::fmt::Display for EventType {
109} 111}
110 112
111/// Reason why a repository announcement was rejected 113/// Reason why a repository announcement was rejected
112#[derive(Debug, Clone, Copy, PartialEq, Eq)] 114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
113pub enum RejectionReason { 115pub enum RejectionReason {
114 /// Announcement doesn't list this service in clone/web URLs 116 /// Announcement doesn't list this service in clone/web URLs
115 DoesNotListService, 117 DoesNotListService,
@@ -141,6 +143,20 @@ struct HotCacheEntry {
141 cached_at: Instant, 143 cached_at: Instant,
142} 144}
143 145
146/// Serializable version of HotCacheEntry for persistence
147///
148/// Converts Instant to Duration offset from saved_at time
149#[derive(Debug, Clone, Serialize, Deserialize)]
150struct SerializableHotCacheEntry {
151 event: Event,
152 pubkey: PublicKey,
153 identifier: String,
154 event_type: EventType,
155 reason: RejectionReason,
156 /// Duration since saved_at when this entry was cached
157 cached_at_offset_secs: u64,
158}
159
144/// Entry in the cold index (metadata only) 160/// Entry in the cold index (metadata only)
145/// 161///
146/// Note: event_id is stored as the HashMap key, not in this struct 162/// Note: event_id is stored as the HashMap key, not in this struct
@@ -154,6 +170,49 @@ struct ColdIndexEntry {
154 rejected_at: Instant, 170 rejected_at: Instant,
155} 171}
156 172
173/// Serializable version of ColdIndexEntry for persistence
174///
175/// Converts Instant to Duration offset from saved_at time
176#[derive(Debug, Clone, Serialize, Deserialize)]
177struct SerializableColdIndexEntry {
178 pubkey: PublicKey,
179 identifier: String,
180 event_type: EventType,
181 reason: RejectionReason,
182 /// Duration since saved_at when this entry was rejected
183 rejected_at_offset_secs: u64,
184}
185
186/// Serializable state for hot cache
187#[derive(Debug, Serialize, Deserialize)]
188struct SerializableHotCache {
189 expiry_duration_secs: u64,
190 entries: HashMap<EventId, SerializableHotCacheEntry>,
191}
192
193/// Serializable state for cold index
194#[derive(Debug, Serialize, Deserialize)]
195struct SerializableColdIndex {
196 expiry_duration_secs: u64,
197 entries: HashMap<EventId, SerializableColdIndexEntry>,
198}
199
200/// Complete rejected cache state for persistence
201///
202/// Stores both hot cache and cold index with version and timestamp information.
203/// All Instant fields are converted to Duration offsets from saved_at.
204#[derive(Debug, Serialize, Deserialize)]
205struct RejectedCacheState {
206 /// Version for future compatibility
207 version: u32,
208 /// When this state was saved
209 saved_at: SystemTime,
210 /// Hot cache entries with full events
211 hot_cache: SerializableHotCache,
212 /// Cold index entries with metadata only
213 cold_index: SerializableColdIndex,
214}
215
157/// Hot cache: Stores full events for immediate re-processing 216/// Hot cache: Stores full events for immediate re-processing
158/// 217///
159/// Events are stored for a short duration (default: 2 minutes) to enable 218/// Events are stored for a short duration (default: 2 minutes) to enable
@@ -603,6 +662,168 @@ impl RejectedEventsIndex {
603 662
604 ids 663 ids
605 } 664 }
665
666 /// Save rejected events cache to disk
667 ///
668 /// Serializes both hot cache and cold index to JSON, converting Instant timestamps
669 /// to Duration offsets from the save time. This allows timestamps to be adjusted
670 /// for downtime when restored.
671 ///
672 /// # Arguments
673 ///
674 /// * `path` - File path to write the serialized state to
675 ///
676 /// # Returns
677 ///
678 /// Ok(()) on success, or an error if serialization or file write fails
679 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
680 let saved_at = SystemTime::now();
681 let now = Instant::now();
682
683 // Lock both caches for consistent snapshot
684 let hot_entries = self.hot_cache.entries.read().unwrap();
685 let cold_entries = self.cold_index.entries.read().unwrap();
686
687 // Convert hot cache entries to serializable format
688 let serializable_hot_entries: HashMap<EventId, SerializableHotCacheEntry> = hot_entries
689 .iter()
690 .map(|(event_id, entry)| {
691 let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs();
692
693 let serializable_entry = SerializableHotCacheEntry {
694 event: entry.event.clone(),
695 pubkey: entry.pubkey,
696 identifier: entry.identifier.clone(),
697 event_type: entry.event_type,
698 reason: entry.reason,
699 cached_at_offset_secs,
700 };
701
702 (*event_id, serializable_entry)
703 })
704 .collect();
705
706 // Convert cold index entries to serializable format
707 let serializable_cold_entries: HashMap<EventId, SerializableColdIndexEntry> = cold_entries
708 .iter()
709 .map(|(event_id, entry)| {
710 let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs();
711
712 let serializable_entry = SerializableColdIndexEntry {
713 pubkey: entry.pubkey,
714 identifier: entry.identifier.clone(),
715 event_type: entry.event_type,
716 reason: entry.reason,
717 rejected_at_offset_secs,
718 };
719
720 (*event_id, serializable_entry)
721 })
722 .collect();
723
724 // Create complete state
725 let state = RejectedCacheState {
726 version: 1,
727 saved_at,
728 hot_cache: SerializableHotCache {
729 expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(),
730 entries: serializable_hot_entries,
731 },
732 cold_index: SerializableColdIndex {
733 expiry_duration_secs: self.cold_index.expiry_duration.as_secs(),
734 entries: serializable_cold_entries,
735 },
736 };
737
738 // Serialize to JSON and write to file
739 let json = serde_json::to_string_pretty(&state)?;
740 std::fs::write(path, json)?;
741
742 Ok(())
743 }
744
745 /// Restore rejected events cache from disk
746 ///
747 /// Loads the serialized state from disk and populates both hot cache and cold index.
748 /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain
749 /// correct expiry behavior. Deletes the state file after successful restore.
750 ///
751 /// # Arguments
752 ///
753 /// * `path` - File path to read the serialized state from
754 ///
755 /// # Returns
756 ///
757 /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails
758 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
759 // Load and parse JSON
760 let json = std::fs::read_to_string(path)?;
761 let state: RejectedCacheState = serde_json::from_str(&json)?;
762
763 // Calculate downtime (how long the relay was offline)
764 let now_system = SystemTime::now();
765 let downtime = now_system
766 .duration_since(state.saved_at)
767 .unwrap_or(Duration::ZERO);
768
769 let now_instant = Instant::now();
770
771 // Lock both caches for restoration
772 let mut hot_entries = self.hot_cache.entries.write().unwrap();
773 let mut cold_entries = self.cold_index.entries.write().unwrap();
774
775 // Restore hot cache entries
776 for (event_id, serializable_entry) in state.hot_cache.entries {
777 // Reconstruct cached_at by extending the offset by downtime
778 // Original offset (how long ago it was cached when saved)
779 let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs);
780 // Total offset including downtime
781 let total_offset = original_offset + downtime;
782
783 // cached_at = now - total_offset
784 let cached_at = now_instant - total_offset;
785
786 let entry = HotCacheEntry {
787 event: serializable_entry.event,
788 pubkey: serializable_entry.pubkey,
789 identifier: serializable_entry.identifier,
790 event_type: serializable_entry.event_type,
791 reason: serializable_entry.reason,
792 cached_at,
793 };
794
795 hot_entries.insert(event_id, entry);
796 }
797
798 // Restore cold index entries
799 for (event_id, serializable_entry) in state.cold_index.entries {
800 // Reconstruct rejected_at by extending the offset by downtime
801 let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs);
802 let total_offset = original_offset + downtime;
803
804 // rejected_at = now - total_offset
805 let rejected_at = now_instant - total_offset;
806
807 let entry = ColdIndexEntry {
808 pubkey: serializable_entry.pubkey,
809 identifier: serializable_entry.identifier,
810 event_type: serializable_entry.event_type,
811 reason: serializable_entry.reason,
812 rejected_at,
813 };
814
815 cold_entries.insert(event_id, entry);
816 }
817
818 // Release locks before deleting file
819 drop(hot_entries);
820 drop(cold_entries);
821
822 // Delete the state file after successful restore
823 std::fs::remove_file(path)?;
824
825 Ok(())
826 }
606} 827}
607 828
608#[cfg(test)] 829#[cfg(test)]
@@ -956,4 +1177,503 @@ mod tests {
956 // Cold index now empty 1177 // Cold index now empty
957 assert_eq!(index.cold_index_len(), 0); 1178 assert_eq!(index.cold_index_len(), 0);
958 } 1179 }
1180
1181 // ========================================================================
1182 // Persistence Serialization Tests
1183 // ========================================================================
1184
1185 #[tokio::test]
1186 async fn test_save_and_restore_hot_cache_roundtrip() {
1187 let temp_dir = tempfile::tempdir().unwrap();
1188 let state_path = temp_dir.path().join("rejected_cache.json");
1189
1190 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1191 let event = create_test_event().await;
1192 let pubkey = event.pubkey;
1193 let identifier = "test-repo".to_string();
1194
1195 // Add event to hot cache
1196 index.add_announcement(
1197 event.clone(),
1198 pubkey,
1199 identifier.clone(),
1200 RejectionReason::DoesNotListService,
1201 );
1202
1203 assert_eq!(index.hot_cache_len(), 1);
1204 assert_eq!(index.cold_index_len(), 1);
1205
1206 // Save to disk
1207 index.save_to_disk(&state_path).unwrap();
1208 assert!(state_path.exists());
1209
1210 // Create new index and restore
1211 let index2 =
1212 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1213 index2.restore_from_disk(&state_path).unwrap();
1214
1215 // Verify state file was deleted after restore
1216 assert!(!state_path.exists());
1217
1218 // Verify hot cache restored
1219 assert_eq!(index2.hot_cache_len(), 1);
1220 assert!(index2.hot_cache.contains(&event.id));
1221
1222 // Verify cold index restored
1223 assert_eq!(index2.cold_index_len(), 1);
1224 assert!(index2.cold_index.contains(&event.id));
1225
1226 // Verify we can retrieve the event
1227 let events = index2
1228 .hot_cache
1229 .get_maintainer_events(&pubkey, &identifier, None);
1230 assert_eq!(events.len(), 1);
1231 assert_eq!(events[0].id, event.id);
1232 }
1233
1234 #[tokio::test]
1235 async fn test_save_and_restore_cold_index_only() {
1236 let temp_dir = tempfile::tempdir().unwrap();
1237 let state_path = temp_dir.path().join("rejected_cache.json");
1238
1239 let index = RejectedEventsIndex::new(
1240 Duration::from_millis(50), // Hot cache expires quickly
1241 Duration::from_secs(604800), // Cold index lasts long
1242 );
1243 let event = create_test_event().await;
1244
1245 // Add event
1246 index.add_announcement(
1247 event.clone(),
1248 event.pubkey,
1249 "test-repo".to_string(),
1250 RejectionReason::MaintainerNotYetValid,
1251 );
1252
1253 // Wait for hot cache to expire
1254 std::thread::sleep(Duration::from_millis(60));
1255 index.cleanup_expired_for_type("announcement");
1256
1257 assert_eq!(index.hot_cache_len(), 0);
1258 assert_eq!(index.cold_index_len(), 1);
1259
1260 // Save to disk
1261 index.save_to_disk(&state_path).unwrap();
1262
1263 // Restore into new index
1264 let index2 =
1265 RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800));
1266 index2.restore_from_disk(&state_path).unwrap();
1267
1268 // Verify only cold index restored (hot cache was empty)
1269 assert_eq!(index2.hot_cache_len(), 0);
1270 assert_eq!(index2.cold_index_len(), 1);
1271 assert!(index2.cold_index.contains(&event.id));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_save_and_restore_both_hot_and_cold() {
1276 let temp_dir = tempfile::tempdir().unwrap();
1277 let state_path = temp_dir.path().join("rejected_cache.json");
1278
1279 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1280 let keys = Keys::generate();
1281
1282 // Create two events
1283 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1284 let event1 = keys.sign_event(unsigned1).await.unwrap();
1285
1286 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1287 let event2 = keys.sign_event(unsigned2).await.unwrap();
1288
1289 // Add both events
1290 index.add_announcement(
1291 event1.clone(),
1292 event1.pubkey,
1293 "repo1".to_string(),
1294 RejectionReason::DoesNotListService,
1295 );
1296
1297 index.add_state(
1298 event2.clone(),
1299 event2.pubkey,
1300 "repo2".to_string(),
1301 RejectionReason::Other,
1302 );
1303
1304 assert_eq!(index.hot_cache_len(), 2);
1305 assert_eq!(index.cold_index_len(), 2);
1306
1307 // Save to disk
1308 index.save_to_disk(&state_path).unwrap();
1309
1310 // Restore into new index
1311 let index2 =
1312 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1313 index2.restore_from_disk(&state_path).unwrap();
1314
1315 // Verify both caches restored
1316 assert_eq!(index2.hot_cache_len(), 2);
1317 assert_eq!(index2.cold_index_len(), 2);
1318 assert!(index2.contains(&event1.id));
1319 assert!(index2.contains(&event2.id));
1320 }
1321
1322 #[tokio::test]
1323 async fn test_save_and_restore_empty_cache() {
1324 let temp_dir = tempfile::tempdir().unwrap();
1325 let state_path = temp_dir.path().join("rejected_cache.json");
1326
1327 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1328
1329 // Save empty cache
1330 index.save_to_disk(&state_path).unwrap();
1331 assert!(state_path.exists());
1332
1333 // Restore into new index
1334 let index2 =
1335 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1336 index2.restore_from_disk(&state_path).unwrap();
1337
1338 // Verify empty state restored
1339 assert_eq!(index2.hot_cache_len(), 0);
1340 assert_eq!(index2.cold_index_len(), 0);
1341 }
1342
1343 #[tokio::test]
1344 async fn test_restore_missing_file() {
1345 let temp_dir = tempfile::tempdir().unwrap();
1346 let state_path = temp_dir.path().join("nonexistent.json");
1347
1348 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1349
1350 // Attempting to restore missing file should return error
1351 let result = index.restore_from_disk(&state_path);
1352 assert!(result.is_err());
1353
1354 // Index should remain empty
1355 assert_eq!(index.hot_cache_len(), 0);
1356 assert_eq!(index.cold_index_len(), 0);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_restore_corrupted_json() {
1361 let temp_dir = tempfile::tempdir().unwrap();
1362 let state_path = temp_dir.path().join("corrupted.json");
1363
1364 // Write corrupted JSON
1365 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
1366
1367 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1368
1369 // Attempting to restore corrupted file should return error
1370 let result = index.restore_from_disk(&state_path);
1371 assert!(result.is_err());
1372
1373 // Index should remain empty
1374 assert_eq!(index.hot_cache_len(), 0);
1375 assert_eq!(index.cold_index_len(), 0);
1376 }
1377
1378 #[tokio::test]
1379 async fn test_file_cleanup_after_successful_restore() {
1380 let temp_dir = tempfile::tempdir().unwrap();
1381 let state_path = temp_dir.path().join("rejected_cache.json");
1382
1383 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1384 let event = create_test_event().await;
1385
1386 index.add_announcement(
1387 event.clone(),
1388 event.pubkey,
1389 "test-repo".to_string(),
1390 RejectionReason::DoesNotListService,
1391 );
1392
1393 // Save to disk
1394 index.save_to_disk(&state_path).unwrap();
1395 assert!(state_path.exists());
1396
1397 // Restore
1398 let index2 =
1399 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1400 index2.restore_from_disk(&state_path).unwrap();
1401
1402 // File should be deleted after successful restore
1403 assert!(!state_path.exists());
1404 }
1405
1406 #[tokio::test]
1407 async fn test_downtime_calculation_preserves_expiry() {
1408 let temp_dir = tempfile::tempdir().unwrap();
1409 let state_path = temp_dir.path().join("rejected_cache.json");
1410
1411 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1412 let event = create_test_event().await;
1413
1414 index.add_announcement(
1415 event.clone(),
1416 event.pubkey,
1417 "test-repo".to_string(),
1418 RejectionReason::DoesNotListService,
1419 );
1420
1421 // Save to disk
1422 index.save_to_disk(&state_path).unwrap();
1423
1424 // Simulate downtime by sleeping
1425 std::thread::sleep(Duration::from_millis(100));
1426
1427 // Restore
1428 let index2 =
1429 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1430 index2.restore_from_disk(&state_path).unwrap();
1431
1432 // Event should still be in both caches (downtime accounted for)
1433 assert_eq!(index2.hot_cache_len(), 1);
1434 assert_eq!(index2.cold_index_len(), 1);
1435 assert!(index2.contains(&event.id));
1436 }
1437
1438 #[tokio::test]
1439 async fn test_entries_expired_during_downtime() {
1440 let temp_dir = tempfile::tempdir().unwrap();
1441 let state_path = temp_dir.path().join("rejected_cache.json");
1442
1443 // Create index with very short expiry
1444 let index = RejectedEventsIndex::new(
1445 Duration::from_millis(100), // Hot cache: 100ms
1446 Duration::from_millis(200), // Cold index: 200ms
1447 );
1448 let event = create_test_event().await;
1449
1450 index.add_announcement(
1451 event.clone(),
1452 event.pubkey,
1453 "test-repo".to_string(),
1454 RejectionReason::DoesNotListService,
1455 );
1456
1457 // Save to disk
1458 index.save_to_disk(&state_path).unwrap();
1459
1460 // Simulate downtime longer than hot cache expiry
1461 std::thread::sleep(Duration::from_millis(150));
1462
1463 // Restore
1464 let index2 =
1465 RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200));
1466 index2.restore_from_disk(&state_path).unwrap();
1467
1468 // Hot cache entry should have expired during downtime
1469 // Cold index should still have it (200ms expiry)
1470 assert_eq!(index2.hot_cache_len(), 1);
1471 assert_eq!(index2.cold_index_len(), 1);
1472
1473 // But when we try to get it, hot cache will see it's expired
1474 let events = index2
1475 .hot_cache
1476 .get_maintainer_events(&event.pubkey, "test-repo", None);
1477 assert_eq!(events.len(), 0); // Expired!
1478
1479 // Cleanup should remove it
1480 let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement");
1481 assert_eq!(hot_expired, 1);
1482 assert_eq!(cold_expired, 0); // Not expired yet
1483 }
1484
1485 #[tokio::test]
1486 async fn test_hot_cache_different_event_types() {
1487 let temp_dir = tempfile::tempdir().unwrap();
1488 let state_path = temp_dir.path().join("rejected_cache.json");
1489
1490 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1491 let keys = Keys::generate();
1492
1493 // Create announcement event
1494 let unsigned_ann =
1495 nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key());
1496 let event_ann = keys.sign_event(unsigned_ann).await.unwrap();
1497
1498 // Create state event
1499 let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key());
1500 let event_state = keys.sign_event(unsigned_state).await.unwrap();
1501
1502 // Add both types
1503 index.add_announcement(
1504 event_ann.clone(),
1505 event_ann.pubkey,
1506 "test-repo".to_string(),
1507 RejectionReason::DoesNotListService,
1508 );
1509
1510 index.add_state(
1511 event_state.clone(),
1512 event_state.pubkey,
1513 "test-repo".to_string(),
1514 RejectionReason::Other,
1515 );
1516
1517 // Save and restore
1518 index.save_to_disk(&state_path).unwrap();
1519 let index2 =
1520 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1521 index2.restore_from_disk(&state_path).unwrap();
1522
1523 // Verify both event types restored
1524 assert_eq!(index2.hot_cache_len(), 2);
1525 assert!(index2.contains(&event_ann.id));
1526 assert!(index2.contains(&event_state.id));
1527
1528 // Verify we can filter by type
1529 let (removed, events) = index2.invalidate_and_get(
1530 &event_ann.pubkey,
1531 "test-repo",
1532 Some(EventType::Announcement),
1533 );
1534 assert_eq!(removed, 1);
1535 assert_eq!(events.len(), 1);
1536 assert_eq!(events[0].id, event_ann.id);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_cold_index_different_rejection_reasons() {
1541 let temp_dir = tempfile::tempdir().unwrap();
1542 let state_path = temp_dir.path().join("rejected_cache.json");
1543
1544 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1545 let keys = Keys::generate();
1546
1547 // Create events with different rejection reasons
1548 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1549 let event1 = keys.sign_event(unsigned1).await.unwrap();
1550
1551 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1552 let event2 = keys.sign_event(unsigned2).await.unwrap();
1553
1554 let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key());
1555 let event3 = keys.sign_event(unsigned3).await.unwrap();
1556
1557 // Add with different rejection reasons
1558 index.add_announcement(
1559 event1.clone(),
1560 event1.pubkey,
1561 "repo1".to_string(),
1562 RejectionReason::DoesNotListService,
1563 );
1564
1565 index.add_announcement(
1566 event2.clone(),
1567 event2.pubkey,
1568 "repo2".to_string(),
1569 RejectionReason::MaintainerNotYetValid,
1570 );
1571
1572 index.add_announcement(
1573 event3.clone(),
1574 event3.pubkey,
1575 "repo3".to_string(),
1576 RejectionReason::Other,
1577 );
1578
1579 // Save and restore
1580 index.save_to_disk(&state_path).unwrap();
1581 let index2 =
1582 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1583 index2.restore_from_disk(&state_path).unwrap();
1584
1585 // Verify all entries restored with their rejection reasons
1586 assert_eq!(index2.cold_index_len(), 3);
1587 assert!(index2.contains(&event1.id));
1588 assert!(index2.contains(&event2.id));
1589 assert!(index2.contains(&event3.id));
1590 }
1591
1592 #[tokio::test]
1593 async fn test_multiple_save_restore_cycles() {
1594 let temp_dir = tempfile::tempdir().unwrap();
1595 let state_path = temp_dir.path().join("rejected_cache.json");
1596
1597 // First cycle
1598 let index1 =
1599 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1600 let event1 = create_test_event().await;
1601
1602 index1.add_announcement(
1603 event1.clone(),
1604 event1.pubkey,
1605 "repo1".to_string(),
1606 RejectionReason::DoesNotListService,
1607 );
1608
1609 index1.save_to_disk(&state_path).unwrap();
1610
1611 // Second cycle - restore and add more
1612 let index2 =
1613 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1614 index2.restore_from_disk(&state_path).unwrap();
1615
1616 let event2 = create_test_event().await;
1617 index2.add_announcement(
1618 event2.clone(),
1619 event2.pubkey,
1620 "repo2".to_string(),
1621 RejectionReason::MaintainerNotYetValid,
1622 );
1623
1624 assert_eq!(index2.hot_cache_len(), 2);
1625 index2.save_to_disk(&state_path).unwrap();
1626
1627 // Third cycle - restore again
1628 let index3 =
1629 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1630 index3.restore_from_disk(&state_path).unwrap();
1631
1632 // Verify both events survived multiple cycles
1633 assert_eq!(index3.hot_cache_len(), 2);
1634 assert!(index3.contains(&event1.id));
1635 assert!(index3.contains(&event2.id));
1636 }
1637
1638 #[tokio::test]
1639 async fn test_restore_preserves_remaining_ttl() {
1640 let temp_dir = tempfile::tempdir().unwrap();
1641 let state_path = temp_dir.path().join("rejected_cache.json");
1642
1643 // Create index with 2 second hot cache expiry
1644 let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1645 let event = create_test_event().await;
1646
1647 index.add_announcement(
1648 event.clone(),
1649 event.pubkey,
1650 "test-repo".to_string(),
1651 RejectionReason::DoesNotListService,
1652 );
1653
1654 // Wait 200ms (small fraction of TTL)
1655 std::thread::sleep(Duration::from_millis(200));
1656
1657 // Save to disk
1658 index.save_to_disk(&state_path).unwrap();
1659
1660 // Immediately restore (minimal downtime)
1661 let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1662 index2.restore_from_disk(&state_path).unwrap();
1663
1664 // Event should still be retrievable (has ~1.8s remaining)
1665 let events = index2
1666 .hot_cache
1667 .get_maintainer_events(&event.pubkey, "test-repo", None);
1668 assert_eq!(events.len(), 1);
1669
1670 // Wait 2 seconds (total 2.2s > 2s expiry)
1671 std::thread::sleep(Duration::from_secs(2));
1672
1673 // Now it should be expired
1674 let events = index2
1675 .hot_cache
1676 .get_maintainer_events(&event.pubkey, "test-repo", None);
1677 assert_eq!(events.len(), 0);
1678 }
959} 1679}