upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:46:30 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:46:30 +0000
commit4c8f1813fada9ce2bfd371095b0721bff68173e3 (patch)
treed42869f89e4916bb8dc36fd26c9ac5f888e042ac
parent7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff)
parentb101afa00bc28e1b55286145cb81e32a5b3decc9 (diff)
Add purgatory persistence to survive relay restarts
Implement save/restore functionality for both purgatory state and rejected events cache. Events are now saved to disk on graceful shutdown and restored on startup, preventing data loss during relay restarts. Key features: - Purgatory state persisted to JSON (state events, PR events, expired events) - Rejected events cache persisted (hot cache + cold index) - Downtime adjustment preserves remaining TTL - Graceful degradation on missing/corrupted files - Automatic re-queueing of restored repositories - Comprehensive test coverage (45 tests)
-rw-r--r--src/main.rs50
-rw-r--r--src/purgatory/mod.rs925
-rw-r--r--src/purgatory/persistence.rs198
-rw-r--r--src/purgatory/types.rs20
-rw-r--r--src/sync/mod.rs71
-rw-r--r--src/sync/rejected_index.rs726
-rw-r--r--tests/purgatory_persistence.rs755
7 files changed, 2725 insertions, 20 deletions
diff --git a/src/main.rs b/src/main.rs
index a6f1d9d..5e5b83a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc};
3 3
4use anyhow::Result; 4use anyhow::Result;
5use tokio::signal; 5use tokio::signal;
6use tracing::{info, Level}; 6use tracing::{error, info, warn, Level};
7use tracing_subscriber::FmtSubscriber; 7use tracing_subscriber::FmtSubscriber;
8 8
9use ngit_grasp::{ 9use ngit_grasp::{
@@ -64,6 +64,22 @@ async fn main() -> Result<()> {
64 ))); 64 )));
65 info!("Purgatory initialized for event coordination"); 65 info!("Purgatory initialized for event coordination");
66 66
67 // Restore purgatory state from disk if available
68 let purgatory_path =
69 PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json");
70
71 if purgatory_path.exists() {
72 match purgatory.restore_from_disk(&purgatory_path) {
73 Ok(()) => {
74 info!("Restored purgatory state from disk");
75 // Re-queueing will happen later after sync system is created
76 }
77 Err(e) => {
78 warn!("Failed to restore purgatory state: {}, starting empty", e);
79 }
80 }
81 }
82
67 // Create Nostr relay with NIP-34 validation 83 // Create Nostr relay with NIP-34 validation
68 // Returns both the relay and database for direct queries in handlers 84 // Returns both the relay and database for direct queries in handlers
69 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { 85 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await {
@@ -88,6 +104,7 @@ async fn main() -> Result<()> {
88 relay_with_db.write_policy.clone(), 104 relay_with_db.write_policy.clone(),
89 relay_with_db.relay.clone(), 105 relay_with_db.relay.clone(),
90 &config, 106 &config,
107 PathBuf::from(config.effective_git_data_path()),
91 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), 108 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()),
92 ); 109 );
93 110
@@ -100,6 +117,21 @@ async fn main() -> Result<()> {
100 info!("Proactive sync enabled (will discover relays from stored announcements)"); 117 info!("Proactive sync enabled (will discover relays from stored announcements)");
101 } 118 }
102 119
120 // Re-queue all restored purgatory repos for sync
121 let restored_identifiers = purgatory.get_all_identifiers();
122 if !restored_identifiers.is_empty() {
123 info!(
124 "Re-queueing {} restored repositories for sync",
125 restored_identifiers.len()
126 );
127 for identifier in restored_identifiers {
128 purgatory.enqueue_sync_immediate(&identifier);
129 }
130 }
131
132 // Get a reference to the rejected events index for shutdown persistence
133 let shutdown_rejected_index = sync_manager.rejected_events_index();
134
103 tokio::spawn(async move { 135 tokio::spawn(async move {
104 sync_manager.run().await; 136 sync_manager.run().await;
105 }); 137 });
@@ -190,6 +222,22 @@ async fn main() -> Result<()> {
190 } 222 }
191 } 223 }
192 224
225 // Save purgatory state to disk
226 let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json");
227 if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) {
228 error!("Failed to save purgatory state: {}", e);
229 } else {
230 info!("Purgatory state saved to disk");
231 }
232
233 // Save rejected events cache to disk
234 let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json");
235 if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) {
236 error!("Failed to save rejected events cache: {}", e);
237 } else {
238 info!("Rejected events cache saved to disk");
239 }
240
193 // Cleanup placeholder refs on shutdown 241 // Cleanup placeholder refs on shutdown
194 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); 242 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids();
195 if !placeholder_ids.is_empty() { 243 if !placeholder_ids.is_empty() {
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 20df19b..47798a6 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -12,6 +12,7 @@
12//! - **Separate stores**: State events and PR events use different indexing strategies 12//! - **Separate stores**: State events and PR events use different indexing strategies
13 13
14mod helpers; 14mod helpers;
15pub mod persistence;
15pub mod sync; 16pub mod sync;
16mod types; 17mod types;
17 18
@@ -20,10 +21,12 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
20 21
21use dashmap::DashMap; 22use dashmap::DashMap;
22use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
23use std::collections::HashSet; 26use std::collections::HashSet;
24use std::path::PathBuf; 27use std::path::{Path, PathBuf};
25use std::sync::Arc; 28use std::sync::Arc;
26use std::time::{Duration, Instant}; 29use std::time::{Duration, Instant, SystemTime};
27 30
28pub use sync::SyncQueueEntry; 31pub use sync::SyncQueueEntry;
29 32
@@ -38,6 +41,63 @@ const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180);
38/// Used for batching burst arrivals during negentropy sync. 41/// Used for batching burst arrivals during negentropy sync.
39const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); 42const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500);
40 43
44/// Serializable wrapper for `StatePurgatoryEntry` with time offsets.
45///
46/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
47/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49struct SerializableStatePurgatoryEntry {
50 /// The nostr state event (kind 30618) awaiting git data
51 event: Event,
52 /// The repository identifier from the event's 'd' tag
53 identifier: String,
54 /// Event author pubkey
55 author: PublicKey,
56 /// Duration offset from saved_at for created_at
57 created_at_offset_secs: u64,
58 /// Duration offset from saved_at for expires_at
59 expires_at_offset_secs: u64,
60}
61
62/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
63///
64/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
65/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67struct SerializablePrPurgatoryEntry {
68 /// The nostr PR event, if received (None = git data arrived first)
69 event: Option<Event>,
70 /// The expected commit SHA from 'c' tag (if event exists)
71 /// or the actual commit pushed (if git arrived first)
72 commit: String,
73 /// Duration offset from saved_at for created_at
74 created_at_offset_secs: u64,
75 /// Duration offset from saved_at for expires_at
76 expires_at_offset_secs: u64,
77}
78
79/// Serializable purgatory state for disk persistence.
80///
81/// Contains all purgatory data needed to restore state across restarts:
82/// - State events (indexed by identifier)
83/// - PR events (indexed by event ID)
84/// - Expired events (to prevent re-sync loops)
85/// - Version number for future compatibility
86/// - Saved timestamp for downtime calculation
87#[derive(Debug, Clone, Serialize, Deserialize)]
88struct PurgatoryState {
89 /// Version number for state format (currently 1)
90 version: u32,
91 /// When this state was saved to disk
92 saved_at: SystemTime,
93 /// State events indexed by repository identifier
94 state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>,
95 /// PR events indexed by event ID (hex string)
96 pr_events: HashMap<String, SerializablePrPurgatoryEntry>,
97 /// Expired event IDs with their expiry timestamps
98 expired_events: HashMap<String, SystemTime>,
99}
100
41/// Main purgatory structure holding events awaiting git data. 101/// Main purgatory structure holding events awaiting git data.
42/// 102///
43/// Provides thread-safe concurrent access to two separate stores: 103/// Provides thread-safe concurrent access to two separate stores:
@@ -667,6 +727,260 @@ impl Purgatory {
667 pub fn sync_queue_size(&self) -> usize { 727 pub fn sync_queue_size(&self) -> usize {
668 self.sync_queue.len() 728 self.sync_queue.len()
669 } 729 }
730
731 /// Get all repository identifiers currently in purgatory.
732 ///
733 /// Returns a list of all unique repository identifiers that have state events
734 /// in purgatory. This is useful for re-queueing repositories after restore.
735 ///
736 /// # Returns
737 /// Vector of repository identifiers (e.g., "owner/repo")
738 ///
739 /// # Example
740 /// ```no_run
741 /// use ngit_grasp::purgatory::Purgatory;
742 /// use std::path::PathBuf;
743 ///
744 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
745 /// let identifiers = purgatory.get_all_identifiers();
746 /// for id in identifiers {
747 /// println!("Repository in purgatory: {}", id);
748 /// }
749 /// ```
750 pub fn get_all_identifiers(&self) -> Vec<String> {
751 self.state_events
752 .iter()
753 .map(|entry| entry.key().clone())
754 .collect()
755 }
756
757 /// Save purgatory state to disk.
758 ///
759 /// Serializes the current purgatory state (state_events, pr_events, expired_events)
760 /// to JSON and saves it to the specified path. Time-based fields (`Instant`) are
761 /// converted to duration offsets from the current `SystemTime` for persistence.
762 ///
763 /// Note: The sync_queue is NOT persisted - it will be rebuilt when events are
764 /// restored from disk.
765 ///
766 /// # Arguments
767 /// * `path` - Path to save the state file
768 ///
769 /// # Returns
770 /// Ok(()) on success, Err on failure
771 ///
772 /// # Example
773 /// ```no_run
774 /// use ngit_grasp::purgatory::Purgatory;
775 /// use std::path::PathBuf;
776 ///
777 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
778 /// purgatory.save_to_disk(&PathBuf::from("/tmp/purgatory.json")).unwrap();
779 /// ```
780 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
781 let saved_at = SystemTime::now();
782 let now_instant = Instant::now();
783
784 // Convert state_events to serializable format
785 let mut state_events = HashMap::new();
786 for entry in self.state_events.iter() {
787 let identifier = entry.key().clone();
788 let entries: Vec<SerializableStatePurgatoryEntry> = entry
789 .value()
790 .iter()
791 .map(|e| {
792 let created_offset =
793 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
794 let expires_offset =
795 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
796
797 SerializableStatePurgatoryEntry {
798 event: e.event.clone(),
799 identifier: e.identifier.clone(),
800 author: e.author,
801 created_at_offset_secs: created_offset.as_secs(),
802 expires_at_offset_secs: expires_offset.as_secs(),
803 }
804 })
805 .collect();
806 state_events.insert(identifier, entries);
807 }
808
809 // Convert pr_events to serializable format
810 let mut pr_events = HashMap::new();
811 for entry in self.pr_events.iter() {
812 let event_id = entry.key().clone();
813 let e = entry.value();
814
815 let created_offset =
816 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
817 let expires_offset =
818 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
819
820 let serializable = SerializablePrPurgatoryEntry {
821 event: e.event.clone(),
822 commit: e.commit.clone(),
823 created_at_offset_secs: created_offset.as_secs(),
824 expires_at_offset_secs: expires_offset.as_secs(),
825 };
826 pr_events.insert(event_id, serializable);
827 }
828
829 // Convert expired_events to serializable format
830 // We use SystemTime instead of Instant offsets for expired events since
831 // we don't need high precision for cleanup timing
832 let mut expired_events = HashMap::new();
833 for entry in self.expired_events.iter() {
834 let event_id = entry.key().to_hex();
835 // Convert Instant to SystemTime (approximate)
836 let expired_at_instant = *entry.value();
837 let elapsed_since_expire = now_instant.saturating_duration_since(expired_at_instant);
838 let expired_at_system = saved_at - elapsed_since_expire;
839 expired_events.insert(event_id, expired_at_system);
840 }
841
842 // Create state structure
843 let state = PurgatoryState {
844 version: 1,
845 saved_at,
846 state_events,
847 pr_events,
848 expired_events,
849 };
850
851 // Serialize to JSON and write to file
852 let json = serde_json::to_string_pretty(&state)?;
853 std::fs::write(path, json)?;
854
855 tracing::info!(
856 path = %path.display(),
857 state_events = state.state_events.len(),
858 pr_events = state.pr_events.len(),
859 expired_events = state.expired_events.len(),
860 "Saved purgatory state to disk"
861 );
862
863 Ok(())
864 }
865
866 /// Restore purgatory state from disk.
867 ///
868 /// Loads a previously saved purgatory state from the specified path and populates
869 /// the current purgatory instance. Adjusts time-based fields to account for downtime
870 /// between save and restore.
871 ///
872 /// After successful restore, the state file is deleted to prevent accidental
873 /// double-restore.
874 ///
875 /// # Arguments
876 /// * `path` - Path to the saved state file
877 ///
878 /// # Returns
879 /// Ok(()) on success, Err if file doesn't exist or is corrupted
880 ///
881 /// # Example
882 /// ```no_run
883 /// use ngit_grasp::purgatory::Purgatory;
884 /// use std::path::PathBuf;
885 ///
886 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
887 /// match purgatory.restore_from_disk(&PathBuf::from("/tmp/purgatory.json")) {
888 /// Ok(()) => println!("State restored successfully"),
889 /// Err(e) => eprintln!("Failed to restore state: {}", e),
890 /// }
891 /// ```
892 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
893 // Read and parse state file
894 let json = std::fs::read_to_string(path)?;
895 let state: PurgatoryState = serde_json::from_str(&json)?;
896
897 // Verify version
898 if state.version != 1 {
899 return Err(format!("Unsupported state version: {}", state.version).into());
900 }
901
902 let now_instant = Instant::now();
903
904 // Restore state_events
905 for (identifier, entries) in state.state_events {
906 let restored_entries: Vec<StatePurgatoryEntry> = entries
907 .into_iter()
908 .map(|e| {
909 let created_at = persistence::offset_to_instant(
910 Duration::from_secs(e.created_at_offset_secs),
911 state.saved_at,
912 now_instant,
913 );
914 let expires_at = persistence::offset_to_instant(
915 Duration::from_secs(e.expires_at_offset_secs),
916 state.saved_at,
917 now_instant,
918 );
919
920 StatePurgatoryEntry {
921 event: e.event,
922 identifier: e.identifier,
923 author: e.author,
924 created_at,
925 expires_at,
926 }
927 })
928 .collect();
929
930 self.state_events.insert(identifier, restored_entries);
931 }
932
933 // Restore pr_events
934 for (event_id, e) in state.pr_events {
935 let created_at = persistence::offset_to_instant(
936 Duration::from_secs(e.created_at_offset_secs),
937 state.saved_at,
938 now_instant,
939 );
940 let expires_at = persistence::offset_to_instant(
941 Duration::from_secs(e.expires_at_offset_secs),
942 state.saved_at,
943 now_instant,
944 );
945
946 let entry = PrPurgatoryEntry {
947 event: e.event,
948 commit: e.commit,
949 created_at,
950 expires_at,
951 };
952
953 self.pr_events.insert(event_id, entry);
954 }
955
956 // Restore expired_events
957 for (event_id_hex, expired_at_system) in state.expired_events {
958 if let Ok(event_id) = EventId::from_hex(&event_id_hex) {
959 // Convert SystemTime back to Instant (approximate)
960 let elapsed_since_expire = SystemTime::now()
961 .duration_since(expired_at_system)
962 .unwrap_or(Duration::ZERO);
963 let expired_at_instant = now_instant - elapsed_since_expire;
964
965 self.expired_events.insert(event_id, expired_at_instant);
966 }
967 }
968
969 tracing::info!(
970 path = %path.display(),
971 state_events = self.state_events.len(),
972 pr_events = self.pr_events.len(),
973 expired_events = self.expired_events.len(),
974 saved_at = ?state.saved_at,
975 "Restored purgatory state from disk"
976 );
977
978 // Delete state file after successful restore
979 std::fs::remove_file(path)?;
980 tracing::debug!(path = %path.display(), "Deleted state file after restore");
981
982 Ok(())
983 }
670} 984}
671 985
672#[cfg(test)] 986#[cfg(test)]
@@ -1249,3 +1563,610 @@ fn test_user_can_resubmit_expired_event() {
1249 // - Skip the expired check for user-submitted events 1563 // - Skip the expired check for user-submitted events
1250 // - Allow the event to be re-added to purgatory or accepted if git data now exists 1564 // - Allow the event to be re-added to purgatory or accepted if git data now exists
1251} 1565}
1566
1567// ============================================================================
1568// Persistence Serialization Tests
1569// ============================================================================
1570
1571#[tokio::test]
1572async fn test_save_and_restore_state_events() {
1573 use tempfile::tempdir;
1574
1575 let temp_dir = tempdir().unwrap();
1576 let state_file = temp_dir.path().join("purgatory_state.json");
1577
1578 let purgatory = Purgatory::new(PathBuf::new());
1579 let keys = Keys::generate();
1580
1581 // Add multiple state events for the same identifier
1582 let event1 = EventBuilder::text_note("state event 1")
1583 .sign_with_keys(&keys)
1584 .unwrap();
1585 let event2 = EventBuilder::text_note("state event 2")
1586 .sign_with_keys(&keys)
1587 .unwrap();
1588
1589 let event1_id = event1.id;
1590 let event2_id = event2.id;
1591
1592 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key());
1593 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key());
1594
1595 // Save to disk
1596 purgatory.save_to_disk(&state_file).unwrap();
1597
1598 // Verify file exists
1599 assert!(state_file.exists());
1600
1601 // Create new purgatory and restore
1602 let purgatory2 = Purgatory::new(PathBuf::new());
1603 purgatory2.restore_from_disk(&state_file).unwrap();
1604
1605 // Verify file was deleted after restore
1606 assert!(!state_file.exists());
1607
1608 // Verify state events were restored
1609 let (state_count, _) = purgatory2.count();
1610 assert_eq!(state_count, 2);
1611
1612 let restored_entries = purgatory2.find_state("test-repo");
1613 assert_eq!(restored_entries.len(), 2);
1614
1615 // Verify event IDs match
1616 let restored_ids: Vec<EventId> = restored_entries.iter().map(|e| e.event.id).collect();
1617 assert!(restored_ids.contains(&event1_id));
1618 assert!(restored_ids.contains(&event2_id));
1619
1620 // Verify identifiers and authors match
1621 for entry in &restored_entries {
1622 assert_eq!(entry.identifier, "test-repo");
1623 assert_eq!(entry.author, keys.public_key());
1624 }
1625}
1626
1627#[tokio::test]
1628async fn test_save_and_restore_pr_events() {
1629 use nostr_sdk::{Kind, Tag, TagKind};
1630 use tempfile::tempdir;
1631
1632 let temp_dir = tempdir().unwrap();
1633 let state_file = temp_dir.path().join("purgatory_state.json");
1634
1635 let purgatory = Purgatory::new(PathBuf::new());
1636 let keys = Keys::generate();
1637
1638 // Add PR event with actual event
1639 let tags = vec![Tag::custom(
1640 TagKind::Custom("a".into()),
1641 vec!["30617:abc123:test-repo".to_string()],
1642 )];
1643
1644 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
1645 .tags(tags)
1646 .sign_with_keys(&keys)
1647 .unwrap();
1648
1649 let pr_event_id = pr_event.id;
1650
1651 purgatory.add_pr(
1652 pr_event.clone(),
1653 "pr-event-id".to_string(),
1654 "commit-abc".to_string(),
1655 );
1656
1657 // Save to disk
1658 purgatory.save_to_disk(&state_file).unwrap();
1659
1660 // Create new purgatory and restore
1661 let purgatory2 = Purgatory::new(PathBuf::new());
1662 purgatory2.restore_from_disk(&state_file).unwrap();
1663
1664 // Verify PR event was restored
1665 let (_, pr_count) = purgatory2.count();
1666 assert_eq!(pr_count, 1);
1667
1668 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap();
1669 assert!(restored_entry.event.is_some());
1670 assert_eq!(restored_entry.event.unwrap().id, pr_event_id);
1671 assert_eq!(restored_entry.commit, "commit-abc");
1672}
1673
1674#[tokio::test]
1675async fn test_save_and_restore_pr_placeholders() {
1676 use tempfile::tempdir;
1677
1678 let temp_dir = tempdir().unwrap();
1679 let state_file = temp_dir.path().join("purgatory_state.json");
1680
1681 let purgatory = Purgatory::new(PathBuf::new());
1682
1683 // Add PR placeholder (git data arrived first)
1684 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-def".to_string());
1685
1686 // Save to disk
1687 purgatory.save_to_disk(&state_file).unwrap();
1688
1689 // Create new purgatory and restore
1690 let purgatory2 = Purgatory::new(PathBuf::new());
1691 purgatory2.restore_from_disk(&state_file).unwrap();
1692
1693 // Verify placeholder was restored
1694 let (_, pr_count) = purgatory2.count();
1695 assert_eq!(pr_count, 1);
1696
1697 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap();
1698 assert!(restored_entry.event.is_none()); // Still a placeholder
1699 assert_eq!(restored_entry.commit, "commit-def");
1700
1701 // Verify it's findable as a placeholder
1702 assert_eq!(
1703 purgatory2.find_pr_placeholder("placeholder-id"),
1704 Some("commit-def".to_string())
1705 );
1706}
1707
1708#[tokio::test]
1709async fn test_save_and_restore_expired_events() {
1710 use tempfile::tempdir;
1711
1712 let temp_dir = tempdir().unwrap();
1713 let state_file = temp_dir.path().join("purgatory_state.json");
1714
1715 let purgatory = Purgatory::new(PathBuf::new());
1716 let keys = Keys::generate();
1717
1718 let event = EventBuilder::text_note("test")
1719 .sign_with_keys(&keys)
1720 .unwrap();
1721 let event_id = event.id;
1722
1723 // Add and expire event
1724 purgatory.add_state(event, "repo".to_string(), keys.public_key());
1725 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1726 for entry in entries.iter_mut() {
1727 entry.expires_at = Instant::now() - Duration::from_secs(1);
1728 }
1729 }
1730 purgatory.cleanup();
1731
1732 // Verify event is marked as expired
1733 assert!(purgatory.is_expired(&event_id));
1734 assert_eq!(purgatory.expired_count(), 1);
1735
1736 // Save to disk
1737 purgatory.save_to_disk(&state_file).unwrap();
1738
1739 // Create new purgatory and restore
1740 let purgatory2 = Purgatory::new(PathBuf::new());
1741 purgatory2.restore_from_disk(&state_file).unwrap();
1742
1743 // Verify expired event was restored
1744 assert!(purgatory2.is_expired(&event_id));
1745 assert_eq!(purgatory2.expired_count(), 1);
1746
1747 // Verify it's included in event_ids()
1748 let ids = purgatory2.event_ids();
1749 assert!(ids.contains(&event_id));
1750}
1751
1752#[tokio::test]
1753async fn test_save_and_restore_empty_purgatory() {
1754 use tempfile::tempdir;
1755
1756 let temp_dir = tempdir().unwrap();
1757 let state_file = temp_dir.path().join("purgatory_state.json");
1758
1759 let purgatory = Purgatory::new(PathBuf::new());
1760
1761 // Save empty purgatory
1762 purgatory.save_to_disk(&state_file).unwrap();
1763
1764 // Verify file exists
1765 assert!(state_file.exists());
1766
1767 // Create new purgatory and restore
1768 let purgatory2 = Purgatory::new(PathBuf::new());
1769 purgatory2.restore_from_disk(&state_file).unwrap();
1770
1771 // Verify purgatory is still empty
1772 let (state_count, pr_count) = purgatory2.count();
1773 assert_eq!(state_count, 0);
1774 assert_eq!(pr_count, 0);
1775 assert_eq!(purgatory2.expired_count(), 0);
1776}
1777
1778#[tokio::test]
1779async fn test_restore_missing_file() {
1780 use tempfile::tempdir;
1781
1782 let temp_dir = tempdir().unwrap();
1783 let state_file = temp_dir.path().join("nonexistent.json");
1784
1785 let purgatory = Purgatory::new(PathBuf::new());
1786
1787 // Attempting to restore from missing file should error
1788 let result = purgatory.restore_from_disk(&state_file);
1789 assert!(result.is_err());
1790
1791 // Purgatory should remain empty
1792 let (state_count, pr_count) = purgatory.count();
1793 assert_eq!(state_count, 0);
1794 assert_eq!(pr_count, 0);
1795}
1796
1797#[tokio::test]
1798async fn test_restore_corrupted_json() {
1799 use tempfile::tempdir;
1800
1801 let temp_dir = tempdir().unwrap();
1802 let state_file = temp_dir.path().join("corrupted.json");
1803
1804 // Write invalid JSON
1805 std::fs::write(&state_file, "{ this is not valid json }").unwrap();
1806
1807 let purgatory = Purgatory::new(PathBuf::new());
1808
1809 // Attempting to restore corrupted file should error
1810 let result = purgatory.restore_from_disk(&state_file);
1811 assert!(result.is_err());
1812
1813 // Purgatory should remain empty
1814 let (state_count, pr_count) = purgatory.count();
1815 assert_eq!(state_count, 0);
1816 assert_eq!(pr_count, 0);
1817}
1818
1819#[tokio::test]
1820async fn test_restore_unsupported_version() {
1821 use tempfile::tempdir;
1822
1823 let temp_dir = tempdir().unwrap();
1824 let state_file = temp_dir.path().join("wrong_version.json");
1825
1826 // Write state with unsupported version
1827 let state = r#"{
1828 "version": 999,
1829 "saved_at": {"secs_since_epoch": 1000000000, "nanos_since_epoch": 0},
1830 "state_events": {},
1831 "pr_events": {},
1832 "expired_events": {}
1833 }"#;
1834 std::fs::write(&state_file, state).unwrap();
1835
1836 let purgatory = Purgatory::new(PathBuf::new());
1837
1838 // Attempting to restore unsupported version should error
1839 let result = purgatory.restore_from_disk(&state_file);
1840 assert!(result.is_err());
1841 assert!(result
1842 .unwrap_err()
1843 .to_string()
1844 .contains("Unsupported state version"));
1845}
1846
1847#[tokio::test]
1848async fn test_downtime_calculation() {
1849 use tempfile::tempdir;
1850 use tokio::time::sleep;
1851
1852 let temp_dir = tempdir().unwrap();
1853 let state_file = temp_dir.path().join("purgatory_state.json");
1854
1855 let purgatory = Purgatory::new(PathBuf::new());
1856 let keys = Keys::generate();
1857
1858 // Add state event
1859 let event = EventBuilder::text_note("test")
1860 .sign_with_keys(&keys)
1861 .unwrap();
1862
1863 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1864
1865 // Get original expiry time
1866 let original_entries = purgatory.find_state("repo");
1867 let original_entry = &original_entries[0];
1868 let original_expires_at = original_entry.expires_at;
1869 let original_remaining = original_expires_at.saturating_duration_since(Instant::now());
1870
1871 // Save to disk
1872 purgatory.save_to_disk(&state_file).unwrap();
1873
1874 // Simulate downtime (100ms)
1875 sleep(Duration::from_millis(100)).await;
1876
1877 // Create new purgatory and restore
1878 let purgatory2 = Purgatory::new(PathBuf::new());
1879 purgatory2.restore_from_disk(&state_file).unwrap();
1880
1881 // Get restored expiry time
1882 let restored_entries = purgatory2.find_state("repo");
1883 let restored_entry = &restored_entries[0];
1884 let restored_expires_at = restored_entry.expires_at;
1885 let restored_remaining = restored_expires_at.saturating_duration_since(Instant::now());
1886
1887 // Remaining time should be approximately the same (accounting for downtime)
1888 // Allow 2000ms tolerance for test execution time and sleep duration
1889 let diff = if restored_remaining > original_remaining {
1890 restored_remaining.as_millis() - original_remaining.as_millis()
1891 } else {
1892 original_remaining.as_millis() - restored_remaining.as_millis()
1893 };
1894
1895 assert!(
1896 diff < 2000,
1897 "Downtime calculation should preserve remaining TTL. Original: {}ms, Restored: {}ms, Diff: {}ms",
1898 original_remaining.as_millis(),
1899 restored_remaining.as_millis(),
1900 diff
1901 );
1902}
1903
1904#[tokio::test]
1905async fn test_expiry_times_preserved() {
1906 use tempfile::tempdir;
1907
1908 let temp_dir = tempdir().unwrap();
1909 let state_file = temp_dir.path().join("purgatory_state.json");
1910
1911 let purgatory = Purgatory::new(PathBuf::new());
1912 let keys = Keys::generate();
1913
1914 // Add state event
1915 let event = EventBuilder::text_note("test")
1916 .sign_with_keys(&keys)
1917 .unwrap();
1918
1919 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1920
1921 // Manually set expiry to a specific time in the future
1922 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
1923 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1924 for entry in entries.iter_mut() {
1925 entry.expires_at = custom_expiry;
1926 }
1927 }
1928
1929 // Save to disk
1930 purgatory.save_to_disk(&state_file).unwrap();
1931
1932 // Create new purgatory and restore
1933 let purgatory2 = Purgatory::new(PathBuf::new());
1934 purgatory2.restore_from_disk(&state_file).unwrap();
1935
1936 // Get restored expiry time
1937 let restored_entries = purgatory2.find_state("repo");
1938 let restored_entry = &restored_entries[0];
1939 let restored_remaining = restored_entry
1940 .expires_at
1941 .saturating_duration_since(Instant::now());
1942
1943 // Should be approximately 600 seconds (allow 3 second tolerance for test execution)
1944 assert!(
1945 restored_remaining.as_secs() >= 597 && restored_remaining.as_secs() <= 603,
1946 "Expected ~600s remaining, got {}s",
1947 restored_remaining.as_secs()
1948 );
1949}
1950
1951#[tokio::test]
1952async fn test_multiple_state_events_same_identifier() {
1953 use tempfile::tempdir;
1954
1955 let temp_dir = tempdir().unwrap();
1956 let state_file = temp_dir.path().join("purgatory_state.json");
1957
1958 let purgatory = Purgatory::new(PathBuf::new());
1959 let keys1 = Keys::generate();
1960 let keys2 = Keys::generate();
1961 let keys3 = Keys::generate();
1962
1963 // Add multiple state events for the same identifier from different authors
1964 let event1 = EventBuilder::text_note("maintainer 1")
1965 .sign_with_keys(&keys1)
1966 .unwrap();
1967 let event2 = EventBuilder::text_note("maintainer 2")
1968 .sign_with_keys(&keys2)
1969 .unwrap();
1970 let event3 = EventBuilder::text_note("maintainer 3")
1971 .sign_with_keys(&keys3)
1972 .unwrap();
1973
1974 purgatory.add_state(
1975 event1.clone(),
1976 "shared-repo".to_string(),
1977 keys1.public_key(),
1978 );
1979 purgatory.add_state(
1980 event2.clone(),
1981 "shared-repo".to_string(),
1982 keys2.public_key(),
1983 );
1984 purgatory.add_state(
1985 event3.clone(),
1986 "shared-repo".to_string(),
1987 keys3.public_key(),
1988 );
1989
1990 // Save to disk
1991 purgatory.save_to_disk(&state_file).unwrap();
1992
1993 // Create new purgatory and restore
1994 let purgatory2 = Purgatory::new(PathBuf::new());
1995 purgatory2.restore_from_disk(&state_file).unwrap();
1996
1997 // Verify all three events were restored
1998 let restored_entries = purgatory2.find_state("shared-repo");
1999 assert_eq!(restored_entries.len(), 3);
2000
2001 // Verify all authors are present
2002 let authors: Vec<PublicKey> = restored_entries.iter().map(|e| e.author).collect();
2003 assert!(authors.contains(&keys1.public_key()));
2004 assert!(authors.contains(&keys2.public_key()));
2005 assert!(authors.contains(&keys3.public_key()));
2006}
2007
2008#[tokio::test]
2009async fn test_mixed_pr_events_and_placeholders() {
2010 use nostr_sdk::{Kind, Tag, TagKind};
2011 use tempfile::tempdir;
2012
2013 let temp_dir = tempdir().unwrap();
2014 let state_file = temp_dir.path().join("purgatory_state.json");
2015
2016 let purgatory = Purgatory::new(PathBuf::new());
2017 let keys = Keys::generate();
2018
2019 // Add PR event with actual event
2020 let tags = vec![Tag::custom(
2021 TagKind::Custom("a".into()),
2022 vec!["30617:abc123:test-repo".to_string()],
2023 )];
2024
2025 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
2026 .tags(tags)
2027 .sign_with_keys(&keys)
2028 .unwrap();
2029
2030 purgatory.add_pr(
2031 pr_event.clone(),
2032 "pr-with-event".to_string(),
2033 "commit-abc".to_string(),
2034 );
2035
2036 // Add PR placeholder
2037 purgatory.add_pr_placeholder("pr-placeholder".to_string(), "commit-def".to_string());
2038
2039 // Save to disk
2040 purgatory.save_to_disk(&state_file).unwrap();
2041
2042 // Create new purgatory and restore
2043 let purgatory2 = Purgatory::new(PathBuf::new());
2044 purgatory2.restore_from_disk(&state_file).unwrap();
2045
2046 // Verify both were restored correctly
2047 let (_, pr_count) = purgatory2.count();
2048 assert_eq!(pr_count, 2);
2049
2050 // Verify PR event
2051 let pr_entry = purgatory2.find_pr("pr-with-event").unwrap();
2052 assert!(pr_entry.event.is_some());
2053 assert_eq!(pr_entry.commit, "commit-abc");
2054
2055 // Verify placeholder
2056 let placeholder_entry = purgatory2.find_pr("pr-placeholder").unwrap();
2057 assert!(placeholder_entry.event.is_none());
2058 assert_eq!(placeholder_entry.commit, "commit-def");
2059 assert_eq!(
2060 purgatory2.find_pr_placeholder("pr-placeholder"),
2061 Some("commit-def".to_string())
2062 );
2063}
2064
2065#[tokio::test]
2066async fn test_file_cleanup_after_successful_restore() {
2067 use tempfile::tempdir;
2068
2069 let temp_dir = tempdir().unwrap();
2070 let state_file = temp_dir.path().join("purgatory_state.json");
2071
2072 let purgatory = Purgatory::new(PathBuf::new());
2073 let keys = Keys::generate();
2074
2075 // Add some data
2076 let event = EventBuilder::text_note("test")
2077 .sign_with_keys(&keys)
2078 .unwrap();
2079 purgatory.add_state(event, "repo".to_string(), keys.public_key());
2080
2081 // Save to disk
2082 purgatory.save_to_disk(&state_file).unwrap();
2083 assert!(state_file.exists());
2084
2085 // Restore
2086 let purgatory2 = Purgatory::new(PathBuf::new());
2087 purgatory2.restore_from_disk(&state_file).unwrap();
2088
2089 // File should be deleted after successful restore
2090 assert!(!state_file.exists());
2091}
2092
2093#[tokio::test]
2094async fn test_comprehensive_roundtrip() {
2095 use nostr_sdk::{Kind, Tag, TagKind};
2096 use tempfile::tempdir;
2097
2098 let temp_dir = tempdir().unwrap();
2099 let state_file = temp_dir.path().join("purgatory_state.json");
2100
2101 let purgatory = Purgatory::new(PathBuf::new());
2102 let keys1 = Keys::generate();
2103 let keys2 = Keys::generate();
2104
2105 // Add multiple state events
2106 let state1 = EventBuilder::text_note("state 1")
2107 .sign_with_keys(&keys1)
2108 .unwrap();
2109 let state2 = EventBuilder::text_note("state 2")
2110 .sign_with_keys(&keys2)
2111 .unwrap();
2112
2113 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key());
2114 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key());
2115
2116 // Add PR event
2117 let tags = vec![Tag::custom(
2118 TagKind::Custom("a".into()),
2119 vec!["30617:abc123:repo1".to_string()],
2120 )];
2121 let pr_event = EventBuilder::new(Kind::from(1618), "PR")
2122 .tags(tags)
2123 .sign_with_keys(&keys1)
2124 .unwrap();
2125 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string());
2126
2127 // Add PR placeholder
2128 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
2129
2130 // Add and expire an event
2131 let expired_event = EventBuilder::text_note("expired")
2132 .sign_with_keys(&keys1)
2133 .unwrap();
2134 let expired_id = expired_event.id;
2135 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key());
2136 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2137 for entry in entries.iter_mut() {
2138 entry.expires_at = Instant::now() - Duration::from_secs(1);
2139 }
2140 }
2141 purgatory.cleanup();
2142
2143 // Verify initial state
2144 let (state_count, pr_count) = purgatory.count();
2145 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up)
2146 assert_eq!(pr_count, 2); // pr-1, pr-2
2147 assert_eq!(purgatory.expired_count(), 1); // expired_event
2148
2149 // Save to disk
2150 purgatory.save_to_disk(&state_file).unwrap();
2151
2152 // Create new purgatory and restore
2153 let purgatory2 = Purgatory::new(PathBuf::new());
2154 purgatory2.restore_from_disk(&state_file).unwrap();
2155
2156 // Verify all data was restored correctly
2157 let (state_count2, pr_count2) = purgatory2.count();
2158 assert_eq!(state_count2, 2);
2159 assert_eq!(pr_count2, 2);
2160 assert_eq!(purgatory2.expired_count(), 1);
2161
2162 // Verify state events
2163 assert_eq!(purgatory2.find_state("repo1").len(), 1);
2164 assert_eq!(purgatory2.find_state("repo2").len(), 1);
2165
2166 // Verify PR events
2167 assert!(purgatory2.find_pr("pr-1").unwrap().event.is_some());
2168 assert!(purgatory2.find_pr("pr-2").unwrap().event.is_none());
2169
2170 // Verify expired event
2171 assert!(purgatory2.is_expired(&expired_id));
2172}
diff --git a/src/purgatory/persistence.rs b/src/purgatory/persistence.rs
new file mode 100644
index 0000000..7fca2cf
--- /dev/null
+++ b/src/purgatory/persistence.rs
@@ -0,0 +1,198 @@
1//! Persistence utilities for purgatory state.
2//!
3//! This module provides conversion functions between `Instant` (which cannot be
4//! serialized) and `Duration` offsets from a reference `SystemTime`. This allows
5//! purgatory state to be persisted to disk and restored across restarts.
6//!
7//! ## Time Handling
8//!
9//! - `Instant` is monotonic but cannot be serialized
10//! - `SystemTime` can be serialized but may go backwards (NTP, user changes)
11//! - We use `SystemTime` for persistence and convert to/from `Instant` at runtime
12//! - Downtime is accounted for when restoring state (elapsed time is preserved)
13
14use std::time::{Duration, Instant, SystemTime};
15
16/// Convert an `Instant` to a `Duration` offset from a reference `SystemTime`.
17///
18/// This allows storing an `Instant` as a serializable offset that can be
19/// restored later, accounting for system downtime.
20///
21/// # Arguments
22/// * `instant` - The `Instant` to convert
23/// * `reference_time` - The reference `SystemTime` (typically SystemTime::now())
24/// * `reference_instant` - The corresponding `Instant` (typically Instant::now())
25///
26/// # Returns
27/// Duration offset from the reference time
28///
29/// # Example
30/// ```
31/// use std::time::{Duration, Instant, SystemTime};
32/// use ngit_grasp::purgatory::persistence::instant_to_offset;
33///
34/// let now_system = SystemTime::now();
35/// let now_instant = Instant::now();
36/// let future = now_instant + Duration::from_secs(60);
37///
38/// let offset = instant_to_offset(future, now_system, now_instant);
39/// assert!(offset.as_secs() >= 60);
40/// ```
41pub fn instant_to_offset(
42 instant: Instant,
43 _reference_time: SystemTime,
44 reference_instant: Instant,
45) -> Duration {
46 if instant >= reference_instant {
47 // Future instant - return positive offset
48 instant.duration_since(reference_instant)
49 } else {
50 // Past instant - this shouldn't happen in normal operation,
51 // but we handle it by returning zero duration
52 Duration::ZERO
53 }
54}
55
56/// Convert a `Duration` offset back to an `Instant`, accounting for downtime.
57///
58/// This restores an `Instant` from a serialized offset, adjusting for the time
59/// that has elapsed since the state was saved.
60///
61/// # Arguments
62/// * `offset` - The duration offset from the saved reference time
63/// * `saved_at` - The `SystemTime` when the state was saved
64/// * `reference_instant` - The current `Instant` (typically Instant::now())
65///
66/// # Returns
67/// The restored `Instant`, adjusted for downtime
68///
69/// # Example
70/// ```
71/// use std::time::{Duration, Instant, SystemTime};
72/// use ngit_grasp::purgatory::persistence::offset_to_instant;
73///
74/// let saved_at = SystemTime::now();
75/// let offset = Duration::from_secs(60);
76/// let now_instant = Instant::now();
77///
78/// let restored = offset_to_instant(offset, saved_at, now_instant);
79/// // restored will be approximately now_instant + 60 seconds
80/// ```
81pub fn offset_to_instant(
82 offset: Duration,
83 saved_at: SystemTime,
84 reference_instant: Instant,
85) -> Instant {
86 // Calculate how much time has elapsed since the state was saved
87 let now_system = SystemTime::now();
88 let elapsed_since_save = now_system
89 .duration_since(saved_at)
90 .unwrap_or(Duration::ZERO);
91
92 // The original deadline was: saved_at + offset
93 // Time remaining = (saved_at + offset) - now_system
94 // = offset - elapsed_since_save
95
96 if offset > elapsed_since_save {
97 // Deadline is still in the future
98 let remaining = offset - elapsed_since_save;
99 reference_instant + remaining
100 } else {
101 // Deadline has already passed or is right now
102 reference_instant
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use std::thread;
110 use std::time::Duration;
111
112 #[test]
113 fn test_instant_to_offset_future() {
114 let now_system = SystemTime::now();
115 let now_instant = Instant::now();
116 let future = now_instant + Duration::from_secs(60);
117
118 let offset = instant_to_offset(future, now_system, now_instant);
119
120 // Should be approximately 60 seconds (within tolerance)
121 assert!(offset.as_secs() >= 59 && offset.as_secs() <= 61);
122 }
123
124 #[test]
125 fn test_instant_to_offset_past() {
126 let now_system = SystemTime::now();
127 let past_instant = Instant::now();
128 // Simulate some time passing
129 thread::sleep(Duration::from_millis(10));
130 let now_instant = Instant::now();
131
132 let offset = instant_to_offset(past_instant, now_system, now_instant);
133
134 // Past instants return zero duration
135 assert_eq!(offset, Duration::ZERO);
136 }
137
138 #[test]
139 fn test_offset_to_instant_with_time_remaining() {
140 let saved_at = SystemTime::now();
141 let offset = Duration::from_secs(60);
142
143 // Simulate a very short downtime (< 10ms)
144 thread::sleep(Duration::from_millis(5));
145
146 let now_instant = Instant::now();
147 let restored = offset_to_instant(offset, saved_at, now_instant);
148
149 // Should be approximately 60 seconds in the future
150 let remaining = restored.duration_since(now_instant);
151 assert!(
152 remaining.as_secs() >= 59 && remaining.as_secs() <= 61,
153 "Expected ~60s, got {}s",
154 remaining.as_secs()
155 );
156 }
157
158 #[test]
159 fn test_offset_to_instant_deadline_passed() {
160 // Simulate state saved 70 seconds ago with 60 second offset
161 let saved_at = SystemTime::now() - Duration::from_secs(70);
162 let offset = Duration::from_secs(60);
163
164 let now_instant = Instant::now();
165 let restored = offset_to_instant(offset, saved_at, now_instant);
166
167 // Deadline has passed, should be now or in the past
168 let remaining = restored.saturating_duration_since(now_instant);
169 assert_eq!(remaining, Duration::ZERO);
170 }
171
172 #[test]
173 fn test_round_trip_conversion() {
174 let now_system = SystemTime::now();
175 let now_instant = Instant::now();
176 let future = now_instant + Duration::from_secs(120);
177
178 // Convert to offset
179 let offset = instant_to_offset(future, now_system, now_instant);
180
181 // Immediately convert back (minimal downtime)
182 let restored = offset_to_instant(offset, now_system, now_instant);
183
184 // Should be very close to the original future instant
185 let diff = if restored > future {
186 restored.duration_since(future)
187 } else {
188 future.duration_since(restored)
189 };
190
191 // Allow for small timing differences (< 100ms)
192 assert!(
193 diff < Duration::from_millis(100),
194 "Round trip should preserve instant within 100ms, got {}ms",
195 diff.as_millis()
196 );
197 }
198}
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index 9c47616..919504b 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -5,8 +5,14 @@
5//! problem where either the nostr event or git push can arrive first. 5//! problem where either the nostr event or git push can arrive first.
6 6
7use nostr_sdk::prelude::*; 7use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize};
8use std::time::Instant; 9use std::time::Instant;
9 10
11/// Default value for Instant fields during deserialization
12fn instant_now() -> Instant {
13 Instant::now()
14}
15
10/// A reference name and its target object. 16/// A reference name and its target object.
11/// 17///
12/// Used to identify specific git refs (branches, tags) that a state event 18/// Used to identify specific git refs (branches, tags) that a state event
@@ -59,7 +65,10 @@ impl RefUpdate {
59/// State events declare the current state of a repository but may arrive 65/// State events declare the current state of a repository but may arrive
60/// before the corresponding git data has been pushed. This entry holds 66/// before the corresponding git data has been pushed. This entry holds
61/// the event and associated metadata until the git data arrives. 67/// the event and associated metadata until the git data arrives.
62#[derive(Debug, Clone)] 68///
69/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
70/// module to convert to/from serializable wrapper types.
71#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct StatePurgatoryEntry { 72pub struct StatePurgatoryEntry {
64 /// The nostr state event (kind 30618) awaiting git data 73 /// The nostr state event (kind 30618) awaiting git data
65 pub event: Event, 74 pub event: Event,
@@ -71,9 +80,11 @@ pub struct StatePurgatoryEntry {
71 pub author: PublicKey, 80 pub author: PublicKey,
72 81
73 /// When this entry was added to purgatory 82 /// When this entry was added to purgatory
83 #[serde(skip, default = "instant_now")]
74 pub created_at: Instant, 84 pub created_at: Instant,
75 85
76 /// Expiry deadline (30 min from creation, may be extended) 86 /// Expiry deadline (30 min from creation, may be extended)
87 #[serde(skip, default = "instant_now")]
77 pub expires_at: Instant, 88 pub expires_at: Instant,
78} 89}
79 90
@@ -82,7 +93,10 @@ pub struct StatePurgatoryEntry {
82/// PR events reference specific commits but may arrive before the git push 93/// PR events reference specific commits but may arrive before the git push
83/// containing those commits. Alternatively, a git push may arrive first, 94/// containing those commits. Alternatively, a git push may arrive first,
84/// creating a placeholder entry waiting for the corresponding PR event. 95/// creating a placeholder entry waiting for the corresponding PR event.
85#[derive(Debug, Clone)] 96///
97/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
98/// module to convert to/from serializable wrapper types.
99#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PrPurgatoryEntry { 100pub struct PrPurgatoryEntry {
87 /// The nostr PR event, if received (None = git data arrived first) 101 /// The nostr PR event, if received (None = git data arrived first)
88 pub event: Option<Event>, 102 pub event: Option<Event>,
@@ -92,8 +106,10 @@ pub struct PrPurgatoryEntry {
92 pub commit: String, 106 pub commit: String,
93 107
94 /// When this entry was added to purgatory 108 /// When this entry was added to purgatory
109 #[serde(skip, default = "instant_now")]
95 pub created_at: Instant, 110 pub created_at: Instant,
96 111
97 /// Expiry deadline (30 min from creation, may be extended) 112 /// Expiry deadline (30 min from creation, may be extended)
113 #[serde(skip, default = "instant_now")]
98 pub expires_at: Instant, 114 pub expires_at: Instant,
99} 115}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e2d55bd..3213dfb 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -43,6 +43,7 @@ pub use health::RelayHealthTracker;
43use tokio::time::sleep; 43use tokio::time::sleep;
44 44
45use std::collections::{HashMap, HashSet}; 45use std::collections::{HashMap, HashSet};
46use std::path::{Path, PathBuf};
46use std::sync::Arc; 47use std::sync::Arc;
47use std::time::Duration; 48use std::time::Duration;
48 49
@@ -581,6 +582,7 @@ impl SyncManager {
581 /// * `write_policy` - Policy for validating events before storage 582 /// * `write_policy` - Policy for validating events before storage
582 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) 583 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast)
583 /// * `config` - Configuration for sync settings 584 /// * `config` - Configuration for sync settings
585 /// * `data_path` - Path to git data directory (for persistence)
584 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) 586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled)
585 pub fn new( 587 pub fn new(
586 bootstrap_relay_url: Option<String>, 588 bootstrap_relay_url: Option<String>,
@@ -589,11 +591,42 @@ impl SyncManager {
589 write_policy: Nip34WritePolicy, 591 write_policy: Nip34WritePolicy,
590 local_relay: LocalRelay, 592 local_relay: LocalRelay,
591 config: &Config, 593 config: &Config,
594 data_path: PathBuf,
592 sync_metrics: Option<SyncMetrics>, 595 sync_metrics: Option<SyncMetrics>,
593 ) -> Self { 596 ) -> Self {
594 // Extract purgatory from write_policy for read-only access 597 // Extract purgatory from write_policy for read-only access
595 let purgatory = write_policy.purgatory().clone(); 598 let purgatory = write_policy.purgatory().clone();
596 599
600 // Create rejected events index
601 let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics {
602 RejectedEventsIndex::with_metrics(
603 Duration::from_secs(config.rejected_hot_cache_duration_secs),
604 Duration::from_secs(config.rejected_cold_index_expiry_secs),
605 metrics.clone(),
606 )
607 } else {
608 RejectedEventsIndex::new(
609 Duration::from_secs(config.rejected_hot_cache_duration_secs),
610 Duration::from_secs(config.rejected_cold_index_expiry_secs),
611 )
612 });
613
614 // Attempt to restore rejected events index from disk
615 let rejected_index_path = data_path.join("rejected-events-cache.json");
616 if rejected_index_path.exists() {
617 match rejected_events_index.restore_from_disk(&rejected_index_path) {
618 Ok(()) => {
619 tracing::info!("Restored rejected events index from disk");
620 }
621 Err(e) => {
622 tracing::warn!(
623 "Failed to restore rejected events index: {}, starting empty",
624 e
625 );
626 }
627 }
628 }
629
597 Self { 630 Self {
598 bootstrap_relay_url, 631 bootstrap_relay_url,
599 service_domain, 632 service_domain,
@@ -605,18 +638,7 @@ impl SyncManager {
605 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 638 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
606 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 639 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
607 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 640 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
608 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { 641 rejected_events_index,
609 RejectedEventsIndex::with_metrics(
610 Duration::from_secs(config.rejected_hot_cache_duration_secs),
611 Duration::from_secs(config.rejected_cold_index_expiry_secs),
612 metrics.clone(),
613 )
614 } else {
615 RejectedEventsIndex::new(
616 Duration::from_secs(config.rejected_hot_cache_duration_secs),
617 Duration::from_secs(config.rejected_cold_index_expiry_secs),
618 )
619 }),
620 connections: HashMap::new(), 642 connections: HashMap::new(),
621 health_tracker: Arc::new(RelayHealthTracker::new(config)), 643 health_tracker: Arc::new(RelayHealthTracker::new(config)),
622 next_batch_id: 0, 644 next_batch_id: 0,
@@ -637,6 +659,31 @@ impl SyncManager {
637 self.next_batch_id 659 self.next_batch_id
638 } 660 }
639 661
662 /// Get a clone of the rejected events index Arc.
663 ///
664 /// This allows access to the rejected events index for persistence
665 /// even after the SyncManager has been moved into a task.
666 ///
667 /// # Returns
668 /// Arc clone of the rejected events index
669 pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> {
670 self.rejected_events_index.clone()
671 }
672
673 /// Save rejected events index to disk.
674 ///
675 /// This is called during shutdown to persist the rejected events cache,
676 /// allowing us to avoid re-downloading rejected events after restart.
677 ///
678 /// # Arguments
679 /// * `path` - Path to save the rejected index file
680 ///
681 /// # Returns
682 /// Ok(()) on success, Err if save fails
683 pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
684 self.rejected_events_index.save_to_disk(path)
685 }
686
640 /// Handle EOSE (End Of Stored Events) for a subscription 687 /// Handle EOSE (End Of Stored Events) for a subscription
641 /// 688 ///
642 /// This method: 689 /// This method:
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index 4d31901..f25f22a 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -86,12 +86,14 @@
86//! ``` 86//! ```
87 87
88use nostr_sdk::{Event, EventId, PublicKey}; 88use nostr_sdk::{Event, EventId, PublicKey};
89use serde::{Deserialize, Serialize};
89use std::collections::{HashMap, HashSet}; 90use std::collections::{HashMap, HashSet};
91use std::path::Path;
90use std::sync::{Arc, RwLock}; 92use std::sync::{Arc, RwLock};
91use std::time::{Duration, Instant}; 93use std::time::{Duration, Instant, SystemTime};
92 94
93/// Type of event stored in the rejected events index 95/// Type of event stored in the rejected events index
94#[derive(Debug, Clone, Copy, PartialEq, Eq)] 96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum EventType { 97pub enum EventType {
96 /// Repository announcement (kind 30617) 98 /// Repository announcement (kind 30617)
97 Announcement, 99 Announcement,
@@ -109,7 +111,7 @@ impl std::fmt::Display for EventType {
109} 111}
110 112
111/// Reason why a repository announcement was rejected 113/// Reason why a repository announcement was rejected
112#[derive(Debug, Clone, Copy, PartialEq, Eq)] 114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
113pub enum RejectionReason { 115pub enum RejectionReason {
114 /// Announcement doesn't list this service in clone/web URLs 116 /// Announcement doesn't list this service in clone/web URLs
115 DoesNotListService, 117 DoesNotListService,
@@ -141,6 +143,20 @@ struct HotCacheEntry {
141 cached_at: Instant, 143 cached_at: Instant,
142} 144}
143 145
146/// Serializable version of HotCacheEntry for persistence
147///
148/// Converts Instant to Duration offset from saved_at time
149#[derive(Debug, Clone, Serialize, Deserialize)]
150struct SerializableHotCacheEntry {
151 event: Event,
152 pubkey: PublicKey,
153 identifier: String,
154 event_type: EventType,
155 reason: RejectionReason,
156 /// Duration since saved_at when this entry was cached
157 cached_at_offset_secs: u64,
158}
159
144/// Entry in the cold index (metadata only) 160/// Entry in the cold index (metadata only)
145/// 161///
146/// Note: event_id is stored as the HashMap key, not in this struct 162/// Note: event_id is stored as the HashMap key, not in this struct
@@ -154,6 +170,49 @@ struct ColdIndexEntry {
154 rejected_at: Instant, 170 rejected_at: Instant,
155} 171}
156 172
173/// Serializable version of ColdIndexEntry for persistence
174///
175/// Converts Instant to Duration offset from saved_at time
176#[derive(Debug, Clone, Serialize, Deserialize)]
177struct SerializableColdIndexEntry {
178 pubkey: PublicKey,
179 identifier: String,
180 event_type: EventType,
181 reason: RejectionReason,
182 /// Duration since saved_at when this entry was rejected
183 rejected_at_offset_secs: u64,
184}
185
186/// Serializable state for hot cache
187#[derive(Debug, Serialize, Deserialize)]
188struct SerializableHotCache {
189 expiry_duration_secs: u64,
190 entries: HashMap<EventId, SerializableHotCacheEntry>,
191}
192
193/// Serializable state for cold index
194#[derive(Debug, Serialize, Deserialize)]
195struct SerializableColdIndex {
196 expiry_duration_secs: u64,
197 entries: HashMap<EventId, SerializableColdIndexEntry>,
198}
199
200/// Complete rejected cache state for persistence
201///
202/// Stores both hot cache and cold index with version and timestamp information.
203/// All Instant fields are converted to Duration offsets from saved_at.
204#[derive(Debug, Serialize, Deserialize)]
205struct RejectedCacheState {
206 /// Version for future compatibility
207 version: u32,
208 /// When this state was saved
209 saved_at: SystemTime,
210 /// Hot cache entries with full events
211 hot_cache: SerializableHotCache,
212 /// Cold index entries with metadata only
213 cold_index: SerializableColdIndex,
214}
215
157/// Hot cache: Stores full events for immediate re-processing 216/// Hot cache: Stores full events for immediate re-processing
158/// 217///
159/// Events are stored for a short duration (default: 2 minutes) to enable 218/// Events are stored for a short duration (default: 2 minutes) to enable
@@ -603,6 +662,168 @@ impl RejectedEventsIndex {
603 662
604 ids 663 ids
605 } 664 }
665
666 /// Save rejected events cache to disk
667 ///
668 /// Serializes both hot cache and cold index to JSON, converting Instant timestamps
669 /// to Duration offsets from the save time. This allows timestamps to be adjusted
670 /// for downtime when restored.
671 ///
672 /// # Arguments
673 ///
674 /// * `path` - File path to write the serialized state to
675 ///
676 /// # Returns
677 ///
678 /// Ok(()) on success, or an error if serialization or file write fails
679 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
680 let saved_at = SystemTime::now();
681 let now = Instant::now();
682
683 // Lock both caches for consistent snapshot
684 let hot_entries = self.hot_cache.entries.read().unwrap();
685 let cold_entries = self.cold_index.entries.read().unwrap();
686
687 // Convert hot cache entries to serializable format
688 let serializable_hot_entries: HashMap<EventId, SerializableHotCacheEntry> = hot_entries
689 .iter()
690 .map(|(event_id, entry)| {
691 let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs();
692
693 let serializable_entry = SerializableHotCacheEntry {
694 event: entry.event.clone(),
695 pubkey: entry.pubkey,
696 identifier: entry.identifier.clone(),
697 event_type: entry.event_type,
698 reason: entry.reason,
699 cached_at_offset_secs,
700 };
701
702 (*event_id, serializable_entry)
703 })
704 .collect();
705
706 // Convert cold index entries to serializable format
707 let serializable_cold_entries: HashMap<EventId, SerializableColdIndexEntry> = cold_entries
708 .iter()
709 .map(|(event_id, entry)| {
710 let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs();
711
712 let serializable_entry = SerializableColdIndexEntry {
713 pubkey: entry.pubkey,
714 identifier: entry.identifier.clone(),
715 event_type: entry.event_type,
716 reason: entry.reason,
717 rejected_at_offset_secs,
718 };
719
720 (*event_id, serializable_entry)
721 })
722 .collect();
723
724 // Create complete state
725 let state = RejectedCacheState {
726 version: 1,
727 saved_at,
728 hot_cache: SerializableHotCache {
729 expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(),
730 entries: serializable_hot_entries,
731 },
732 cold_index: SerializableColdIndex {
733 expiry_duration_secs: self.cold_index.expiry_duration.as_secs(),
734 entries: serializable_cold_entries,
735 },
736 };
737
738 // Serialize to JSON and write to file
739 let json = serde_json::to_string_pretty(&state)?;
740 std::fs::write(path, json)?;
741
742 Ok(())
743 }
744
745 /// Restore rejected events cache from disk
746 ///
747 /// Loads the serialized state from disk and populates both hot cache and cold index.
748 /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain
749 /// correct expiry behavior. Deletes the state file after successful restore.
750 ///
751 /// # Arguments
752 ///
753 /// * `path` - File path to read the serialized state from
754 ///
755 /// # Returns
756 ///
757 /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails
758 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
759 // Load and parse JSON
760 let json = std::fs::read_to_string(path)?;
761 let state: RejectedCacheState = serde_json::from_str(&json)?;
762
763 // Calculate downtime (how long the relay was offline)
764 let now_system = SystemTime::now();
765 let downtime = now_system
766 .duration_since(state.saved_at)
767 .unwrap_or(Duration::ZERO);
768
769 let now_instant = Instant::now();
770
771 // Lock both caches for restoration
772 let mut hot_entries = self.hot_cache.entries.write().unwrap();
773 let mut cold_entries = self.cold_index.entries.write().unwrap();
774
775 // Restore hot cache entries
776 for (event_id, serializable_entry) in state.hot_cache.entries {
777 // Reconstruct cached_at by extending the offset by downtime
778 // Original offset (how long ago it was cached when saved)
779 let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs);
780 // Total offset including downtime
781 let total_offset = original_offset + downtime;
782
783 // cached_at = now - total_offset
784 let cached_at = now_instant - total_offset;
785
786 let entry = HotCacheEntry {
787 event: serializable_entry.event,
788 pubkey: serializable_entry.pubkey,
789 identifier: serializable_entry.identifier,
790 event_type: serializable_entry.event_type,
791 reason: serializable_entry.reason,
792 cached_at,
793 };
794
795 hot_entries.insert(event_id, entry);
796 }
797
798 // Restore cold index entries
799 for (event_id, serializable_entry) in state.cold_index.entries {
800 // Reconstruct rejected_at by extending the offset by downtime
801 let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs);
802 let total_offset = original_offset + downtime;
803
804 // rejected_at = now - total_offset
805 let rejected_at = now_instant - total_offset;
806
807 let entry = ColdIndexEntry {
808 pubkey: serializable_entry.pubkey,
809 identifier: serializable_entry.identifier,
810 event_type: serializable_entry.event_type,
811 reason: serializable_entry.reason,
812 rejected_at,
813 };
814
815 cold_entries.insert(event_id, entry);
816 }
817
818 // Release locks before deleting file
819 drop(hot_entries);
820 drop(cold_entries);
821
822 // Delete the state file after successful restore
823 std::fs::remove_file(path)?;
824
825 Ok(())
826 }
606} 827}
607 828
608#[cfg(test)] 829#[cfg(test)]
@@ -956,4 +1177,503 @@ mod tests {
956 // Cold index now empty 1177 // Cold index now empty
957 assert_eq!(index.cold_index_len(), 0); 1178 assert_eq!(index.cold_index_len(), 0);
958 } 1179 }
1180
1181 // ========================================================================
1182 // Persistence Serialization Tests
1183 // ========================================================================
1184
1185 #[tokio::test]
1186 async fn test_save_and_restore_hot_cache_roundtrip() {
1187 let temp_dir = tempfile::tempdir().unwrap();
1188 let state_path = temp_dir.path().join("rejected_cache.json");
1189
1190 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1191 let event = create_test_event().await;
1192 let pubkey = event.pubkey;
1193 let identifier = "test-repo".to_string();
1194
1195 // Add event to hot cache
1196 index.add_announcement(
1197 event.clone(),
1198 pubkey,
1199 identifier.clone(),
1200 RejectionReason::DoesNotListService,
1201 );
1202
1203 assert_eq!(index.hot_cache_len(), 1);
1204 assert_eq!(index.cold_index_len(), 1);
1205
1206 // Save to disk
1207 index.save_to_disk(&state_path).unwrap();
1208 assert!(state_path.exists());
1209
1210 // Create new index and restore
1211 let index2 =
1212 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1213 index2.restore_from_disk(&state_path).unwrap();
1214
1215 // Verify state file was deleted after restore
1216 assert!(!state_path.exists());
1217
1218 // Verify hot cache restored
1219 assert_eq!(index2.hot_cache_len(), 1);
1220 assert!(index2.hot_cache.contains(&event.id));
1221
1222 // Verify cold index restored
1223 assert_eq!(index2.cold_index_len(), 1);
1224 assert!(index2.cold_index.contains(&event.id));
1225
1226 // Verify we can retrieve the event
1227 let events = index2
1228 .hot_cache
1229 .get_maintainer_events(&pubkey, &identifier, None);
1230 assert_eq!(events.len(), 1);
1231 assert_eq!(events[0].id, event.id);
1232 }
1233
1234 #[tokio::test]
1235 async fn test_save_and_restore_cold_index_only() {
1236 let temp_dir = tempfile::tempdir().unwrap();
1237 let state_path = temp_dir.path().join("rejected_cache.json");
1238
1239 let index = RejectedEventsIndex::new(
1240 Duration::from_millis(50), // Hot cache expires quickly
1241 Duration::from_secs(604800), // Cold index lasts long
1242 );
1243 let event = create_test_event().await;
1244
1245 // Add event
1246 index.add_announcement(
1247 event.clone(),
1248 event.pubkey,
1249 "test-repo".to_string(),
1250 RejectionReason::MaintainerNotYetValid,
1251 );
1252
1253 // Wait for hot cache to expire
1254 std::thread::sleep(Duration::from_millis(60));
1255 index.cleanup_expired_for_type("announcement");
1256
1257 assert_eq!(index.hot_cache_len(), 0);
1258 assert_eq!(index.cold_index_len(), 1);
1259
1260 // Save to disk
1261 index.save_to_disk(&state_path).unwrap();
1262
1263 // Restore into new index
1264 let index2 =
1265 RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800));
1266 index2.restore_from_disk(&state_path).unwrap();
1267
1268 // Verify only cold index restored (hot cache was empty)
1269 assert_eq!(index2.hot_cache_len(), 0);
1270 assert_eq!(index2.cold_index_len(), 1);
1271 assert!(index2.cold_index.contains(&event.id));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_save_and_restore_both_hot_and_cold() {
1276 let temp_dir = tempfile::tempdir().unwrap();
1277 let state_path = temp_dir.path().join("rejected_cache.json");
1278
1279 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1280 let keys = Keys::generate();
1281
1282 // Create two events
1283 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1284 let event1 = keys.sign_event(unsigned1).await.unwrap();
1285
1286 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1287 let event2 = keys.sign_event(unsigned2).await.unwrap();
1288
1289 // Add both events
1290 index.add_announcement(
1291 event1.clone(),
1292 event1.pubkey,
1293 "repo1".to_string(),
1294 RejectionReason::DoesNotListService,
1295 );
1296
1297 index.add_state(
1298 event2.clone(),
1299 event2.pubkey,
1300 "repo2".to_string(),
1301 RejectionReason::Other,
1302 );
1303
1304 assert_eq!(index.hot_cache_len(), 2);
1305 assert_eq!(index.cold_index_len(), 2);
1306
1307 // Save to disk
1308 index.save_to_disk(&state_path).unwrap();
1309
1310 // Restore into new index
1311 let index2 =
1312 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1313 index2.restore_from_disk(&state_path).unwrap();
1314
1315 // Verify both caches restored
1316 assert_eq!(index2.hot_cache_len(), 2);
1317 assert_eq!(index2.cold_index_len(), 2);
1318 assert!(index2.contains(&event1.id));
1319 assert!(index2.contains(&event2.id));
1320 }
1321
1322 #[tokio::test]
1323 async fn test_save_and_restore_empty_cache() {
1324 let temp_dir = tempfile::tempdir().unwrap();
1325 let state_path = temp_dir.path().join("rejected_cache.json");
1326
1327 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1328
1329 // Save empty cache
1330 index.save_to_disk(&state_path).unwrap();
1331 assert!(state_path.exists());
1332
1333 // Restore into new index
1334 let index2 =
1335 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1336 index2.restore_from_disk(&state_path).unwrap();
1337
1338 // Verify empty state restored
1339 assert_eq!(index2.hot_cache_len(), 0);
1340 assert_eq!(index2.cold_index_len(), 0);
1341 }
1342
1343 #[tokio::test]
1344 async fn test_restore_missing_file() {
1345 let temp_dir = tempfile::tempdir().unwrap();
1346 let state_path = temp_dir.path().join("nonexistent.json");
1347
1348 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1349
1350 // Attempting to restore missing file should return error
1351 let result = index.restore_from_disk(&state_path);
1352 assert!(result.is_err());
1353
1354 // Index should remain empty
1355 assert_eq!(index.hot_cache_len(), 0);
1356 assert_eq!(index.cold_index_len(), 0);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_restore_corrupted_json() {
1361 let temp_dir = tempfile::tempdir().unwrap();
1362 let state_path = temp_dir.path().join("corrupted.json");
1363
1364 // Write corrupted JSON
1365 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
1366
1367 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1368
1369 // Attempting to restore corrupted file should return error
1370 let result = index.restore_from_disk(&state_path);
1371 assert!(result.is_err());
1372
1373 // Index should remain empty
1374 assert_eq!(index.hot_cache_len(), 0);
1375 assert_eq!(index.cold_index_len(), 0);
1376 }
1377
1378 #[tokio::test]
1379 async fn test_file_cleanup_after_successful_restore() {
1380 let temp_dir = tempfile::tempdir().unwrap();
1381 let state_path = temp_dir.path().join("rejected_cache.json");
1382
1383 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1384 let event = create_test_event().await;
1385
1386 index.add_announcement(
1387 event.clone(),
1388 event.pubkey,
1389 "test-repo".to_string(),
1390 RejectionReason::DoesNotListService,
1391 );
1392
1393 // Save to disk
1394 index.save_to_disk(&state_path).unwrap();
1395 assert!(state_path.exists());
1396
1397 // Restore
1398 let index2 =
1399 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1400 index2.restore_from_disk(&state_path).unwrap();
1401
1402 // File should be deleted after successful restore
1403 assert!(!state_path.exists());
1404 }
1405
1406 #[tokio::test]
1407 async fn test_downtime_calculation_preserves_expiry() {
1408 let temp_dir = tempfile::tempdir().unwrap();
1409 let state_path = temp_dir.path().join("rejected_cache.json");
1410
1411 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1412 let event = create_test_event().await;
1413
1414 index.add_announcement(
1415 event.clone(),
1416 event.pubkey,
1417 "test-repo".to_string(),
1418 RejectionReason::DoesNotListService,
1419 );
1420
1421 // Save to disk
1422 index.save_to_disk(&state_path).unwrap();
1423
1424 // Simulate downtime by sleeping
1425 std::thread::sleep(Duration::from_millis(100));
1426
1427 // Restore
1428 let index2 =
1429 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1430 index2.restore_from_disk(&state_path).unwrap();
1431
1432 // Event should still be in both caches (downtime accounted for)
1433 assert_eq!(index2.hot_cache_len(), 1);
1434 assert_eq!(index2.cold_index_len(), 1);
1435 assert!(index2.contains(&event.id));
1436 }
1437
1438 #[tokio::test]
1439 async fn test_entries_expired_during_downtime() {
1440 let temp_dir = tempfile::tempdir().unwrap();
1441 let state_path = temp_dir.path().join("rejected_cache.json");
1442
1443 // Create index with very short expiry
1444 let index = RejectedEventsIndex::new(
1445 Duration::from_millis(100), // Hot cache: 100ms
1446 Duration::from_millis(200), // Cold index: 200ms
1447 );
1448 let event = create_test_event().await;
1449
1450 index.add_announcement(
1451 event.clone(),
1452 event.pubkey,
1453 "test-repo".to_string(),
1454 RejectionReason::DoesNotListService,
1455 );
1456
1457 // Save to disk
1458 index.save_to_disk(&state_path).unwrap();
1459
1460 // Simulate downtime longer than hot cache expiry
1461 std::thread::sleep(Duration::from_millis(150));
1462
1463 // Restore
1464 let index2 =
1465 RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200));
1466 index2.restore_from_disk(&state_path).unwrap();
1467
1468 // Hot cache entry should have expired during downtime
1469 // Cold index should still have it (200ms expiry)
1470 assert_eq!(index2.hot_cache_len(), 1);
1471 assert_eq!(index2.cold_index_len(), 1);
1472
1473 // But when we try to get it, hot cache will see it's expired
1474 let events = index2
1475 .hot_cache
1476 .get_maintainer_events(&event.pubkey, "test-repo", None);
1477 assert_eq!(events.len(), 0); // Expired!
1478
1479 // Cleanup should remove it
1480 let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement");
1481 assert_eq!(hot_expired, 1);
1482 assert_eq!(cold_expired, 0); // Not expired yet
1483 }
1484
1485 #[tokio::test]
1486 async fn test_hot_cache_different_event_types() {
1487 let temp_dir = tempfile::tempdir().unwrap();
1488 let state_path = temp_dir.path().join("rejected_cache.json");
1489
1490 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1491 let keys = Keys::generate();
1492
1493 // Create announcement event
1494 let unsigned_ann =
1495 nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key());
1496 let event_ann = keys.sign_event(unsigned_ann).await.unwrap();
1497
1498 // Create state event
1499 let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key());
1500 let event_state = keys.sign_event(unsigned_state).await.unwrap();
1501
1502 // Add both types
1503 index.add_announcement(
1504 event_ann.clone(),
1505 event_ann.pubkey,
1506 "test-repo".to_string(),
1507 RejectionReason::DoesNotListService,
1508 );
1509
1510 index.add_state(
1511 event_state.clone(),
1512 event_state.pubkey,
1513 "test-repo".to_string(),
1514 RejectionReason::Other,
1515 );
1516
1517 // Save and restore
1518 index.save_to_disk(&state_path).unwrap();
1519 let index2 =
1520 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1521 index2.restore_from_disk(&state_path).unwrap();
1522
1523 // Verify both event types restored
1524 assert_eq!(index2.hot_cache_len(), 2);
1525 assert!(index2.contains(&event_ann.id));
1526 assert!(index2.contains(&event_state.id));
1527
1528 // Verify we can filter by type
1529 let (removed, events) = index2.invalidate_and_get(
1530 &event_ann.pubkey,
1531 "test-repo",
1532 Some(EventType::Announcement),
1533 );
1534 assert_eq!(removed, 1);
1535 assert_eq!(events.len(), 1);
1536 assert_eq!(events[0].id, event_ann.id);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_cold_index_different_rejection_reasons() {
1541 let temp_dir = tempfile::tempdir().unwrap();
1542 let state_path = temp_dir.path().join("rejected_cache.json");
1543
1544 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1545 let keys = Keys::generate();
1546
1547 // Create events with different rejection reasons
1548 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1549 let event1 = keys.sign_event(unsigned1).await.unwrap();
1550
1551 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1552 let event2 = keys.sign_event(unsigned2).await.unwrap();
1553
1554 let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key());
1555 let event3 = keys.sign_event(unsigned3).await.unwrap();
1556
1557 // Add with different rejection reasons
1558 index.add_announcement(
1559 event1.clone(),
1560 event1.pubkey,
1561 "repo1".to_string(),
1562 RejectionReason::DoesNotListService,
1563 );
1564
1565 index.add_announcement(
1566 event2.clone(),
1567 event2.pubkey,
1568 "repo2".to_string(),
1569 RejectionReason::MaintainerNotYetValid,
1570 );
1571
1572 index.add_announcement(
1573 event3.clone(),
1574 event3.pubkey,
1575 "repo3".to_string(),
1576 RejectionReason::Other,
1577 );
1578
1579 // Save and restore
1580 index.save_to_disk(&state_path).unwrap();
1581 let index2 =
1582 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1583 index2.restore_from_disk(&state_path).unwrap();
1584
1585 // Verify all entries restored with their rejection reasons
1586 assert_eq!(index2.cold_index_len(), 3);
1587 assert!(index2.contains(&event1.id));
1588 assert!(index2.contains(&event2.id));
1589 assert!(index2.contains(&event3.id));
1590 }
1591
1592 #[tokio::test]
1593 async fn test_multiple_save_restore_cycles() {
1594 let temp_dir = tempfile::tempdir().unwrap();
1595 let state_path = temp_dir.path().join("rejected_cache.json");
1596
1597 // First cycle
1598 let index1 =
1599 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1600 let event1 = create_test_event().await;
1601
1602 index1.add_announcement(
1603 event1.clone(),
1604 event1.pubkey,
1605 "repo1".to_string(),
1606 RejectionReason::DoesNotListService,
1607 );
1608
1609 index1.save_to_disk(&state_path).unwrap();
1610
1611 // Second cycle - restore and add more
1612 let index2 =
1613 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1614 index2.restore_from_disk(&state_path).unwrap();
1615
1616 let event2 = create_test_event().await;
1617 index2.add_announcement(
1618 event2.clone(),
1619 event2.pubkey,
1620 "repo2".to_string(),
1621 RejectionReason::MaintainerNotYetValid,
1622 );
1623
1624 assert_eq!(index2.hot_cache_len(), 2);
1625 index2.save_to_disk(&state_path).unwrap();
1626
1627 // Third cycle - restore again
1628 let index3 =
1629 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1630 index3.restore_from_disk(&state_path).unwrap();
1631
1632 // Verify both events survived multiple cycles
1633 assert_eq!(index3.hot_cache_len(), 2);
1634 assert!(index3.contains(&event1.id));
1635 assert!(index3.contains(&event2.id));
1636 }
1637
1638 #[tokio::test]
1639 async fn test_restore_preserves_remaining_ttl() {
1640 let temp_dir = tempfile::tempdir().unwrap();
1641 let state_path = temp_dir.path().join("rejected_cache.json");
1642
1643 // Create index with 2 second hot cache expiry
1644 let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1645 let event = create_test_event().await;
1646
1647 index.add_announcement(
1648 event.clone(),
1649 event.pubkey,
1650 "test-repo".to_string(),
1651 RejectionReason::DoesNotListService,
1652 );
1653
1654 // Wait 200ms (small fraction of TTL)
1655 std::thread::sleep(Duration::from_millis(200));
1656
1657 // Save to disk
1658 index.save_to_disk(&state_path).unwrap();
1659
1660 // Immediately restore (minimal downtime)
1661 let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1662 index2.restore_from_disk(&state_path).unwrap();
1663
1664 // Event should still be retrievable (has ~1.8s remaining)
1665 let events = index2
1666 .hot_cache
1667 .get_maintainer_events(&event.pubkey, "test-repo", None);
1668 assert_eq!(events.len(), 1);
1669
1670 // Wait 2 seconds (total 2.2s > 2s expiry)
1671 std::thread::sleep(Duration::from_secs(2));
1672
1673 // Now it should be expired
1674 let events = index2
1675 .hot_cache
1676 .get_maintainer_events(&event.pubkey, "test-repo", None);
1677 assert_eq!(events.len(), 0);
1678 }
959} 1679}
diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs
new file mode 100644
index 0000000..acefb41
--- /dev/null
+++ b/tests/purgatory_persistence.rs
@@ -0,0 +1,755 @@
1//! Purgatory Persistence Integration Tests
2//!
3//! Tests that verify the full purgatory persistence save/restore cycle:
4//! - Purgatory save/restore with state events, PR events, and expired events
5//! - Rejected cache save/restore with hot cache and cold index entries
6//! - Integration with shutdown/startup hooks
7//! - Graceful degradation with missing or corrupted files
8//! - Time adjustment for downtime
9//!
10//! # Test Strategy
11//!
12//! These tests verify end-to-end persistence functionality:
13//! 1. Create purgatory/rejected cache instances with various entries
14//! 2. Save state to disk
15//! 3. Create new instances and restore from disk
16//! 4. Verify all data is restored correctly
17//! 5. Verify system continues to work after restore
18//!
19//! # Running Tests
20//!
21//! ```bash
22//! # Run all purgatory persistence tests
23//! cargo test --test purgatory_persistence
24//!
25//! # Run specific test
26//! cargo test --test purgatory_persistence test_full_purgatory_save_restore_cycle
27//!
28//! # With output for debugging
29//! cargo test --test purgatory_persistence -- --nocapture
30//! ```
31
32mod common;
33
34use ngit_grasp::purgatory::Purgatory;
35use ngit_grasp::sync::rejected_index::{EventType, RejectedEventsIndex, RejectionReason};
36use nostr_sdk::prelude::*;
37use std::time::Duration;
38
39/// Helper to create a test event
40async fn create_test_event(keys: &Keys, content: &str) -> Event {
41 EventBuilder::text_note(content)
42 .sign_with_keys(keys)
43 .unwrap()
44}
45
46/// Helper to create a state event with specific refs
47fn create_state_event_with_refs(
48 keys: &Keys,
49 identifier: &str,
50 refs: &[(&str, &str)],
51) -> Result<Event, Box<dyn std::error::Error>> {
52 let mut tags = vec![Tag::identifier(identifier)];
53
54 // Add ref tags
55 for (ref_name, commit_hash) in refs {
56 tags.push(Tag::custom(
57 TagKind::custom("ref"),
58 vec![ref_name.to_string(), commit_hash.to_string()],
59 ));
60 }
61
62 let event = EventBuilder::new(Kind::from(30618), "")
63 .tags(tags)
64 .sign_with_keys(keys)?;
65
66 Ok(event)
67}
68
69/// Test 1: Full save/restore cycle with state events, PR events, and expired events
70#[tokio::test]
71async fn test_full_purgatory_save_restore_cycle() {
72 let temp_dir = tempfile::tempdir().unwrap();
73 let git_data_path = temp_dir.path().join("git");
74 let state_path = temp_dir.path().join("purgatory.json");
75
76 // Create purgatory instance
77 let purgatory = Purgatory::new(&git_data_path);
78
79 // Create test keys and events
80 let keys1 = Keys::generate();
81 let keys2 = Keys::generate();
82 let keys3 = Keys::generate();
83
84 let state_event1 =
85 create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")]).unwrap();
86 let state_event2 =
87 create_state_event_with_refs(&keys2, "repo2", &[("main", "def456")]).unwrap();
88
89 let pr_event1 = create_test_event(&keys3, "PR 1").await;
90 let pr_event2 = create_test_event(&keys3, "PR 2").await;
91
92 // Add state events to purgatory
93 purgatory.add_state(
94 state_event1.clone(),
95 "repo1".to_string(),
96 keys1.public_key(),
97 );
98 purgatory.add_state(
99 state_event2.clone(),
100 "repo2".to_string(),
101 keys2.public_key(),
102 );
103
104 // Add PR events to purgatory
105 purgatory.add_pr(
106 pr_event1.clone(),
107 pr_event1.id.to_hex(),
108 "commit-abc".to_string(),
109 );
110 purgatory.add_pr(
111 pr_event2.clone(),
112 pr_event2.id.to_hex(),
113 "commit-def".to_string(),
114 );
115
116 // Add a PR placeholder (git-data-first scenario)
117 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-xyz".to_string());
118
119 // Note: We can't directly test expired events without accessing private fields,
120 // so we'll focus on testing state and PR events persistence
121
122 // Verify initial counts
123 let (state_count, pr_count) = purgatory.count();
124 assert_eq!(state_count, 2, "Should have 2 state events");
125 assert_eq!(
126 pr_count, 3,
127 "Should have 3 PR events (2 events + 1 placeholder)"
128 );
129
130 // Save to disk
131 purgatory.save_to_disk(&state_path).unwrap();
132 assert!(state_path.exists(), "State file should exist after save");
133
134 // Create new purgatory instance and restore
135 let purgatory2 = Purgatory::new(&git_data_path);
136 purgatory2.restore_from_disk(&state_path).unwrap();
137
138 // Verify state file was deleted after restore
139 assert!(
140 !state_path.exists(),
141 "State file should be deleted after restore"
142 );
143
144 // Verify all data was restored
145 let (state_count2, pr_count2) = purgatory2.count();
146 assert_eq!(state_count2, 2, "Should have 2 state events after restore");
147 assert_eq!(
148 pr_count2, 3,
149 "Should have 3 PR events after restore (2 events + 1 placeholder)"
150 );
151
152 // Verify specific state events
153 let repo1_states = purgatory2.find_state("repo1");
154 assert_eq!(repo1_states.len(), 1);
155 assert_eq!(repo1_states[0].event.id, state_event1.id);
156
157 let repo2_states = purgatory2.find_state("repo2");
158 assert_eq!(repo2_states.len(), 1);
159 assert_eq!(repo2_states[0].event.id, state_event2.id);
160
161 // Verify PR events
162 let pr1 = purgatory2.find_pr(&pr_event1.id.to_hex());
163 assert!(pr1.is_some());
164 assert_eq!(pr1.unwrap().commit, "commit-abc");
165
166 let pr2 = purgatory2.find_pr(&pr_event2.id.to_hex());
167 assert!(pr2.is_some());
168 assert_eq!(pr2.unwrap().commit, "commit-def");
169
170 // Verify placeholder
171 let placeholder = purgatory2.find_pr_placeholder("placeholder-id");
172 assert_eq!(placeholder, Some("commit-xyz".to_string()));
173
174 // Verify re-queueing works - get all identifiers
175 let identifiers = purgatory2.get_all_identifiers();
176 assert_eq!(identifiers.len(), 2);
177 assert!(identifiers.contains(&"repo1".to_string()));
178 assert!(identifiers.contains(&"repo2".to_string()));
179}
180
181/// Test 2: Rejected cache integration - save/restore hot cache and cold index
182#[tokio::test]
183async fn test_rejected_cache_save_restore_cycle() {
184 let temp_dir = tempfile::tempdir().unwrap();
185 let state_path = temp_dir.path().join("rejected_cache.json");
186
187 // Create rejected events index
188 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
189
190 // Create test events
191 let keys1 = Keys::generate();
192 let keys2 = Keys::generate();
193
194 let event1 = create_test_event(&keys1, "announcement 1").await;
195 let event2 = create_test_event(&keys2, "announcement 2").await;
196 let event3 = create_test_event(&keys1, "state 1").await;
197
198 // Add announcements to rejected cache
199 index.add_announcement(
200 event1.clone(),
201 event1.pubkey,
202 "repo1".to_string(),
203 RejectionReason::DoesNotListService,
204 );
205
206 index.add_announcement(
207 event2.clone(),
208 event2.pubkey,
209 "repo2".to_string(),
210 RejectionReason::MaintainerNotYetValid,
211 );
212
213 // Add state event to rejected cache
214 index.add_state(
215 event3.clone(),
216 event3.pubkey,
217 "repo1".to_string(),
218 RejectionReason::Other,
219 );
220
221 // Verify initial counts
222 assert_eq!(index.hot_cache_len(), 3);
223 assert_eq!(index.cold_index_len(), 3);
224
225 // Save to disk
226 index.save_to_disk(&state_path).unwrap();
227 assert!(state_path.exists());
228
229 // Create new index and restore
230 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
231 index2.restore_from_disk(&state_path).unwrap();
232
233 // Verify state file was deleted
234 assert!(!state_path.exists());
235
236 // Verify all entries restored
237 assert_eq!(index2.hot_cache_len(), 3);
238 assert_eq!(index2.cold_index_len(), 3);
239
240 // Verify specific entries
241 assert!(index2.contains(&event1.id));
242 assert!(index2.contains(&event2.id));
243 assert!(index2.contains(&event3.id));
244
245 // Verify we can invalidate and get events
246 let (removed, hot_events) =
247 index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement));
248 assert_eq!(removed, 1);
249 assert_eq!(hot_events.len(), 1);
250 assert_eq!(hot_events[0].id, event1.id);
251}
252
253/// Test 3: Simulated downtime - verify expiry times are adjusted correctly
254#[tokio::test]
255async fn test_purgatory_downtime_adjustment() {
256 let temp_dir = tempfile::tempdir().unwrap();
257 let git_data_path = temp_dir.path().join("git");
258 let state_path = temp_dir.path().join("purgatory.json");
259
260 let purgatory = Purgatory::new(&git_data_path);
261 let keys = Keys::generate();
262
263 let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
264 .unwrap();
265
266 purgatory.add_state(state_event.clone(), "repo1".to_string(), keys.public_key());
267
268 // Save to disk
269 purgatory.save_to_disk(&state_path).unwrap();
270
271 // Simulate downtime
272 tokio::time::sleep(Duration::from_millis(100)).await;
273
274 // Restore
275 let purgatory2 = Purgatory::new(&git_data_path);
276 purgatory2.restore_from_disk(&state_path).unwrap();
277
278 // Verify event is still there (downtime was accounted for)
279 let (state_count, _) = purgatory2.count();
280 assert_eq!(state_count, 1);
281
282 let repo1_states = purgatory2.find_state("repo1");
283 assert_eq!(repo1_states.len(), 1);
284 assert_eq!(repo1_states[0].event.id, state_event.id);
285
286 // Verify the event hasn't expired yet (expiry time was adjusted)
287 // The event should have ~30 minutes minus the downtime
288 let entry = &repo1_states[0];
289 let remaining = entry
290 .expires_at
291 .saturating_duration_since(std::time::Instant::now());
292 assert!(
293 remaining > Duration::from_secs(1700),
294 "Event should have most of its 30min expiry remaining"
295 );
296}
297
298/// Test 4: Rejected cache downtime adjustment
299#[tokio::test]
300async fn test_rejected_cache_downtime_adjustment() {
301 let temp_dir = tempfile::tempdir().unwrap();
302 let state_path = temp_dir.path().join("rejected_cache.json");
303
304 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
305 let keys = Keys::generate();
306
307 let event = create_test_event(&keys, "test").await;
308
309 index.add_announcement(
310 event.clone(),
311 event.pubkey,
312 "repo1".to_string(),
313 RejectionReason::DoesNotListService,
314 );
315
316 // Save to disk
317 index.save_to_disk(&state_path).unwrap();
318
319 // Simulate downtime
320 tokio::time::sleep(Duration::from_millis(100)).await;
321
322 // Restore
323 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
324 index2.restore_from_disk(&state_path).unwrap();
325
326 // Verify event is still in both caches (downtime was accounted for)
327 assert_eq!(index2.hot_cache_len(), 1);
328 assert_eq!(index2.cold_index_len(), 1);
329 assert!(index2.contains(&event.id));
330}
331
332/// Test 5: File cleanup - verify state files are deleted after successful restore
333#[tokio::test]
334async fn test_purgatory_file_cleanup_after_restore() {
335 let temp_dir = tempfile::tempdir().unwrap();
336 let git_data_path = temp_dir.path().join("git");
337 let state_path = temp_dir.path().join("purgatory.json");
338
339 let purgatory = Purgatory::new(&git_data_path);
340 let keys = Keys::generate();
341
342 let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
343 .unwrap();
344
345 purgatory.add_state(state_event, "repo1".to_string(), keys.public_key());
346
347 // Save to disk
348 purgatory.save_to_disk(&state_path).unwrap();
349 assert!(state_path.exists(), "State file should exist after save");
350
351 // Restore
352 let purgatory2 = Purgatory::new(&git_data_path);
353 purgatory2.restore_from_disk(&state_path).unwrap();
354
355 // Verify file was deleted
356 assert!(
357 !state_path.exists(),
358 "State file should be deleted after successful restore"
359 );
360}
361
362/// Test 6: Rejected cache file cleanup
363#[tokio::test]
364async fn test_rejected_cache_file_cleanup_after_restore() {
365 let temp_dir = tempfile::tempdir().unwrap();
366 let state_path = temp_dir.path().join("rejected_cache.json");
367
368 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
369 let keys = Keys::generate();
370
371 let event = create_test_event(&keys, "test").await;
372
373 index.add_announcement(
374 event,
375 keys.public_key(),
376 "repo1".to_string(),
377 RejectionReason::DoesNotListService,
378 );
379
380 // Save to disk
381 index.save_to_disk(&state_path).unwrap();
382 assert!(state_path.exists());
383
384 // Restore
385 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
386 index2.restore_from_disk(&state_path).unwrap();
387
388 // Verify file was deleted
389 assert!(!state_path.exists());
390}
391
392/// Test 7: Graceful degradation - missing purgatory file
393#[tokio::test]
394async fn test_purgatory_restore_missing_file() {
395 let temp_dir = tempfile::tempdir().unwrap();
396 let git_data_path = temp_dir.path().join("git");
397 let state_path = temp_dir.path().join("nonexistent.json");
398
399 let purgatory = Purgatory::new(&git_data_path);
400
401 // Attempting to restore missing file should return error
402 let result = purgatory.restore_from_disk(&state_path);
403 assert!(result.is_err(), "Should error on missing file");
404
405 // Purgatory should still be usable (empty state)
406 let (state_count, pr_count) = purgatory.count();
407 assert_eq!(state_count, 0);
408 assert_eq!(pr_count, 0);
409
410 // Should be able to add events normally
411 let keys = Keys::generate();
412 let event = create_test_event(&keys, "test").await;
413 purgatory.add_state(event, "repo1".to_string(), keys.public_key());
414
415 let (state_count, _) = purgatory.count();
416 assert_eq!(state_count, 1);
417}
418
419/// Test 8: Graceful degradation - missing rejected cache file
420#[tokio::test]
421async fn test_rejected_cache_restore_missing_file() {
422 let temp_dir = tempfile::tempdir().unwrap();
423 let state_path = temp_dir.path().join("nonexistent.json");
424
425 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
426
427 // Attempting to restore missing file should return error
428 let result = index.restore_from_disk(&state_path);
429 assert!(result.is_err());
430
431 // Index should still be usable (empty state)
432 assert_eq!(index.hot_cache_len(), 0);
433 assert_eq!(index.cold_index_len(), 0);
434
435 // Should be able to add events normally
436 let keys = Keys::generate();
437 let event = create_test_event(&keys, "test").await;
438 index.add_announcement(
439 event,
440 keys.public_key(),
441 "repo1".to_string(),
442 RejectionReason::DoesNotListService,
443 );
444
445 assert_eq!(index.hot_cache_len(), 1);
446 assert_eq!(index.cold_index_len(), 1);
447}
448
449/// Test 9: Graceful degradation - corrupted purgatory file
450#[tokio::test]
451async fn test_purgatory_restore_corrupted_file() {
452 let temp_dir = tempfile::tempdir().unwrap();
453 let git_data_path = temp_dir.path().join("git");
454 let state_path = temp_dir.path().join("corrupted.json");
455
456 // Write corrupted JSON
457 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
458
459 let purgatory = Purgatory::new(&git_data_path);
460
461 // Attempting to restore corrupted file should return error
462 let result = purgatory.restore_from_disk(&state_path);
463 assert!(result.is_err(), "Should error on corrupted file");
464
465 // Purgatory should still be usable
466 let (state_count, pr_count) = purgatory.count();
467 assert_eq!(state_count, 0);
468 assert_eq!(pr_count, 0);
469}
470
471/// Test 10: Graceful degradation - corrupted rejected cache file
472#[tokio::test]
473async fn test_rejected_cache_restore_corrupted_file() {
474 let temp_dir = tempfile::tempdir().unwrap();
475 let state_path = temp_dir.path().join("corrupted.json");
476
477 // Write corrupted JSON
478 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
479
480 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
481
482 // Attempting to restore corrupted file should return error
483 let result = index.restore_from_disk(&state_path);
484 assert!(result.is_err());
485
486 // Index should still be usable
487 assert_eq!(index.hot_cache_len(), 0);
488 assert_eq!(index.cold_index_len(), 0);
489}
490
491/// Test 11: Empty purgatory save/restore
492#[tokio::test]
493async fn test_empty_purgatory_save_restore() {
494 let temp_dir = tempfile::tempdir().unwrap();
495 let git_data_path = temp_dir.path().join("git");
496 let state_path = temp_dir.path().join("purgatory.json");
497
498 let purgatory = Purgatory::new(&git_data_path);
499
500 // Save empty purgatory
501 purgatory.save_to_disk(&state_path).unwrap();
502 assert!(state_path.exists());
503
504 // Restore
505 let purgatory2 = Purgatory::new(&git_data_path);
506 purgatory2.restore_from_disk(&state_path).unwrap();
507
508 // Verify empty state
509 let (state_count, pr_count) = purgatory2.count();
510 assert_eq!(state_count, 0);
511 assert_eq!(pr_count, 0);
512 assert_eq!(purgatory2.expired_count(), 0);
513}
514
515/// Test 12: Empty rejected cache save/restore
516#[tokio::test]
517async fn test_empty_rejected_cache_save_restore() {
518 let temp_dir = tempfile::tempdir().unwrap();
519 let state_path = temp_dir.path().join("rejected_cache.json");
520
521 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
522
523 // Save empty cache
524 index.save_to_disk(&state_path).unwrap();
525 assert!(state_path.exists());
526
527 // Restore
528 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
529 index2.restore_from_disk(&state_path).unwrap();
530
531 // Verify empty state
532 assert_eq!(index2.hot_cache_len(), 0);
533 assert_eq!(index2.cold_index_len(), 0);
534}
535
536/// Test 13: Multiple state events for same identifier
537#[tokio::test]
538async fn test_purgatory_multiple_state_events_same_identifier() {
539 let temp_dir = tempfile::tempdir().unwrap();
540 let git_data_path = temp_dir.path().join("git");
541 let state_path = temp_dir.path().join("purgatory.json");
542
543 let purgatory = Purgatory::new(&git_data_path);
544
545 // Create multiple state events for same identifier (different maintainers)
546 let keys1 = Keys::generate();
547 let keys2 = Keys::generate();
548
549 let event1 = create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")])
550 .unwrap();
551 let event2 = create_state_event_with_refs(&keys2, "repo1", &[("main", "def456")])
552 .unwrap();
553
554 purgatory.add_state(event1.clone(), "repo1".to_string(), keys1.public_key());
555 purgatory.add_state(event2.clone(), "repo1".to_string(), keys2.public_key());
556
557 // Save and restore
558 purgatory.save_to_disk(&state_path).unwrap();
559
560 let purgatory2 = Purgatory::new(&git_data_path);
561 purgatory2.restore_from_disk(&state_path).unwrap();
562
563 // Verify both events restored
564 let repo1_states = purgatory2.find_state("repo1");
565 assert_eq!(repo1_states.len(), 2);
566
567 let event_ids: Vec<_> = repo1_states.iter().map(|e| e.event.id).collect();
568 assert!(event_ids.contains(&event1.id));
569 assert!(event_ids.contains(&event2.id));
570}
571
572/// Test 14: Verify system continues to work after restore
573#[tokio::test]
574async fn test_purgatory_continues_working_after_restore() {
575 let temp_dir = tempfile::tempdir().unwrap();
576 let git_data_path = temp_dir.path().join("git");
577 let state_path = temp_dir.path().join("purgatory.json");
578
579 let purgatory = Purgatory::new(&git_data_path);
580 let keys = Keys::generate();
581
582 let event1 = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
583 .unwrap();
584
585 purgatory.add_state(event1.clone(), "repo1".to_string(), keys.public_key());
586
587 // Save and restore
588 purgatory.save_to_disk(&state_path).unwrap();
589
590 let purgatory2 = Purgatory::new(&git_data_path);
591 purgatory2.restore_from_disk(&state_path).unwrap();
592
593 // Add new events after restore
594 let event2 = create_state_event_with_refs(&keys, "repo2", &[("main", "xyz789")])
595 .unwrap();
596
597 purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key());
598
599 // Verify both old and new events work
600 let (state_count, _) = purgatory2.count();
601 assert_eq!(state_count, 2);
602
603 let repo1_states = purgatory2.find_state("repo1");
604 assert_eq!(repo1_states.len(), 1);
605 assert_eq!(repo1_states[0].event.id, event1.id);
606
607 let repo2_states = purgatory2.find_state("repo2");
608 assert_eq!(repo2_states.len(), 1);
609 assert_eq!(repo2_states[0].event.id, event2.id);
610
611 // Verify cleanup still works
612 let (state_removed, pr_removed) = purgatory2.cleanup();
613 // Nothing should be expired yet
614 assert_eq!(state_removed, 0);
615 assert_eq!(pr_removed, 0);
616}
617
618/// Test 15: Verify rejected cache continues working after restore
619#[tokio::test]
620async fn test_rejected_cache_continues_working_after_restore() {
621 let temp_dir = tempfile::tempdir().unwrap();
622 let state_path = temp_dir.path().join("rejected_cache.json");
623
624 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
625 let keys = Keys::generate();
626
627 let event1 = create_test_event(&keys, "event1").await;
628
629 index.add_announcement(
630 event1.clone(),
631 event1.pubkey,
632 "repo1".to_string(),
633 RejectionReason::DoesNotListService,
634 );
635
636 // Save and restore
637 index.save_to_disk(&state_path).unwrap();
638
639 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
640 index2.restore_from_disk(&state_path).unwrap();
641
642 // Add new events after restore
643 let event2 = create_test_event(&keys, "event2").await;
644
645 index2.add_announcement(
646 event2.clone(),
647 event2.pubkey,
648 "repo2".to_string(),
649 RejectionReason::MaintainerNotYetValid,
650 );
651
652 // Verify both old and new events work
653 assert_eq!(index2.hot_cache_len(), 2);
654 assert_eq!(index2.cold_index_len(), 2);
655 assert!(index2.contains(&event1.id));
656 assert!(index2.contains(&event2.id));
657
658 // Verify invalidation still works
659 let (removed, hot_events) =
660 index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement));
661 assert_eq!(removed, 1);
662 assert_eq!(hot_events.len(), 1);
663 assert_eq!(hot_events[0].id, event1.id);
664}
665
666/// Test 16: Entries that expired during downtime are properly handled
667#[tokio::test]
668async fn test_purgatory_entries_expired_during_downtime() {
669 let temp_dir = tempfile::tempdir().unwrap();
670 let git_data_path = temp_dir.path().join("git");
671 let state_path = temp_dir.path().join("purgatory.json");
672
673 let purgatory = Purgatory::new(&git_data_path);
674 let keys = Keys::generate();
675
676 let event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
677 .unwrap();
678
679 purgatory.add_state(event.clone(), "repo1".to_string(), keys.public_key());
680
681 // Save to disk
682 purgatory.save_to_disk(&state_path).unwrap();
683
684 // Simulate very long downtime (longer than the 30min default expiry)
685 // Note: We can't manually set expiry without accessing private fields,
686 // so this test verifies that the system handles already-expired entries gracefully
687 // In a real scenario, if downtime > 30 minutes, entries would be expired on restore
688
689 // For this test, we'll just verify the restore works and cleanup can be called
690 let purgatory2 = Purgatory::new(&git_data_path);
691 purgatory2.restore_from_disk(&state_path).unwrap();
692
693 // Event should be restored
694 let (state_count, _) = purgatory2.count();
695 assert_eq!(state_count, 1);
696
697 // Cleanup should work (even if nothing is expired yet)
698 let (state_removed, _) = purgatory2.cleanup();
699 // Nothing expired yet since we didn't wait 30 minutes
700 assert_eq!(state_removed, 0);
701
702 let (state_count, _) = purgatory2.count();
703 assert_eq!(state_count, 1);
704}
705
706/// Test 17: Rejected cache entries that expired during downtime
707#[tokio::test]
708async fn test_rejected_cache_entries_expired_during_downtime() {
709 let temp_dir = tempfile::tempdir().unwrap();
710 let state_path = temp_dir.path().join("rejected_cache.json");
711
712 // Create index with very short expiry
713 let index = RejectedEventsIndex::new(
714 Duration::from_millis(50), // Hot cache: 50ms
715 Duration::from_millis(100), // Cold index: 100ms
716 );
717 let keys = Keys::generate();
718
719 let event = create_test_event(&keys, "test").await;
720
721 index.add_announcement(
722 event.clone(),
723 event.pubkey,
724 "repo1".to_string(),
725 RejectionReason::DoesNotListService,
726 );
727
728 // Save to disk
729 index.save_to_disk(&state_path).unwrap();
730
731 // Simulate downtime longer than hot cache expiry
732 tokio::time::sleep(Duration::from_millis(75)).await;
733
734 // Restore
735 let index2 = RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_millis(100));
736 index2.restore_from_disk(&state_path).unwrap();
737
738 // Both should be restored initially
739 assert_eq!(index2.hot_cache_len(), 1);
740 assert_eq!(index2.cold_index_len(), 1);
741
742 // Note: We can't directly access hot_cache.get_maintainer_events (private method)
743 // But we can verify the entry is there via contains() and test cleanup
744
745 // Verify entry is still tracked
746 assert!(index2.contains(&event.id));
747
748 // Cleanup should remove expired hot cache entry
749 let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement");
750 assert_eq!(hot_expired, 1);
751 assert_eq!(cold_expired, 0); // Cold index still valid
752
753 assert_eq!(index2.hot_cache_len(), 0);
754 assert_eq!(index2.cold_index_len(), 1);
755}