upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main.rs50
-rw-r--r--src/purgatory/mod.rs925
-rw-r--r--src/purgatory/persistence.rs198
-rw-r--r--src/purgatory/types.rs20
-rw-r--r--src/sync/mod.rs71
-rw-r--r--src/sync/rejected_index.rs726
-rw-r--r--tests/purgatory_persistence.rs755
7 files changed, 2725 insertions, 20 deletions
diff --git a/src/main.rs b/src/main.rs
index a6f1d9d..5e5b83a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc};
3 3
4use anyhow::Result; 4use anyhow::Result;
5use tokio::signal; 5use tokio::signal;
6use tracing::{info, Level}; 6use tracing::{error, info, warn, Level};
7use tracing_subscriber::FmtSubscriber; 7use tracing_subscriber::FmtSubscriber;
8 8
9use ngit_grasp::{ 9use ngit_grasp::{
@@ -64,6 +64,22 @@ async fn main() -> Result<()> {
64 ))); 64 )));
65 info!("Purgatory initialized for event coordination"); 65 info!("Purgatory initialized for event coordination");
66 66
67 // Restore purgatory state from disk if available
68 let purgatory_path =
69 PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json");
70
71 if purgatory_path.exists() {
72 match purgatory.restore_from_disk(&purgatory_path) {
73 Ok(()) => {
74 info!("Restored purgatory state from disk");
75 // Re-queueing will happen later after sync system is created
76 }
77 Err(e) => {
78 warn!("Failed to restore purgatory state: {}, starting empty", e);
79 }
80 }
81 }
82
67 // Create Nostr relay with NIP-34 validation 83 // Create Nostr relay with NIP-34 validation
68 // Returns both the relay and database for direct queries in handlers 84 // Returns both the relay and database for direct queries in handlers
69 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { 85 if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await {
@@ -88,6 +104,7 @@ async fn main() -> Result<()> {
88 relay_with_db.write_policy.clone(), 104 relay_with_db.write_policy.clone(),
89 relay_with_db.relay.clone(), 105 relay_with_db.relay.clone(),
90 &config, 106 &config,
107 PathBuf::from(config.effective_git_data_path()),
91 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), 108 metrics.as_ref().and_then(|m| m.sync_metrics().cloned()),
92 ); 109 );
93 110
@@ -100,6 +117,21 @@ async fn main() -> Result<()> {
100 info!("Proactive sync enabled (will discover relays from stored announcements)"); 117 info!("Proactive sync enabled (will discover relays from stored announcements)");
101 } 118 }
102 119
120 // Re-queue all restored purgatory repos for sync
121 let restored_identifiers = purgatory.get_all_identifiers();
122 if !restored_identifiers.is_empty() {
123 info!(
124 "Re-queueing {} restored repositories for sync",
125 restored_identifiers.len()
126 );
127 for identifier in restored_identifiers {
128 purgatory.enqueue_sync_immediate(&identifier);
129 }
130 }
131
132 // Get a reference to the rejected events index for shutdown persistence
133 let shutdown_rejected_index = sync_manager.rejected_events_index();
134
103 tokio::spawn(async move { 135 tokio::spawn(async move {
104 sync_manager.run().await; 136 sync_manager.run().await;
105 }); 137 });
@@ -190,6 +222,22 @@ async fn main() -> Result<()> {
190 } 222 }
191 } 223 }
192 224
225 // Save purgatory state to disk
226 let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json");
227 if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) {
228 error!("Failed to save purgatory state: {}", e);
229 } else {
230 info!("Purgatory state saved to disk");
231 }
232
233 // Save rejected events cache to disk
234 let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json");
235 if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) {
236 error!("Failed to save rejected events cache: {}", e);
237 } else {
238 info!("Rejected events cache saved to disk");
239 }
240
193 // Cleanup placeholder refs on shutdown 241 // Cleanup placeholder refs on shutdown
194 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); 242 let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids();
195 if !placeholder_ids.is_empty() { 243 if !placeholder_ids.is_empty() {
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 20df19b..47798a6 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -12,6 +12,7 @@
12//! - **Separate stores**: State events and PR events use different indexing strategies 12//! - **Separate stores**: State events and PR events use different indexing strategies
13 13
14mod helpers; 14mod helpers;
15pub mod persistence;
15pub mod sync; 16pub mod sync;
16mod types; 17mod types;
17 18
@@ -20,10 +21,12 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
20 21
21use dashmap::DashMap; 22use dashmap::DashMap;
22use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
23use std::collections::HashSet; 26use std::collections::HashSet;
24use std::path::PathBuf; 27use std::path::{Path, PathBuf};
25use std::sync::Arc; 28use std::sync::Arc;
26use std::time::{Duration, Instant}; 29use std::time::{Duration, Instant, SystemTime};
27 30
28pub use sync::SyncQueueEntry; 31pub use sync::SyncQueueEntry;
29 32
@@ -38,6 +41,63 @@ const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180);
38/// Used for batching burst arrivals during negentropy sync. 41/// Used for batching burst arrivals during negentropy sync.
39const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); 42const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500);
40 43
44/// Serializable wrapper for `StatePurgatoryEntry` with time offsets.
45///
46/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
47/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49struct SerializableStatePurgatoryEntry {
50 /// The nostr state event (kind 30618) awaiting git data
51 event: Event,
52 /// The repository identifier from the event's 'd' tag
53 identifier: String,
54 /// Event author pubkey
55 author: PublicKey,
56 /// Duration offset from saved_at for created_at
57 created_at_offset_secs: u64,
58 /// Duration offset from saved_at for expires_at
59 expires_at_offset_secs: u64,
60}
61
62/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
63///
64/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
65/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67struct SerializablePrPurgatoryEntry {
68 /// The nostr PR event, if received (None = git data arrived first)
69 event: Option<Event>,
70 /// The expected commit SHA from 'c' tag (if event exists)
71 /// or the actual commit pushed (if git arrived first)
72 commit: String,
73 /// Duration offset from saved_at for created_at
74 created_at_offset_secs: u64,
75 /// Duration offset from saved_at for expires_at
76 expires_at_offset_secs: u64,
77}
78
79/// Serializable purgatory state for disk persistence.
80///
81/// Contains all purgatory data needed to restore state across restarts:
82/// - State events (indexed by identifier)
83/// - PR events (indexed by event ID)
84/// - Expired events (to prevent re-sync loops)
85/// - Version number for future compatibility
86/// - Saved timestamp for downtime calculation
87#[derive(Debug, Clone, Serialize, Deserialize)]
88struct PurgatoryState {
89 /// Version number for state format (currently 1)
90 version: u32,
91 /// When this state was saved to disk
92 saved_at: SystemTime,
93 /// State events indexed by repository identifier
94 state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>,
95 /// PR events indexed by event ID (hex string)
96 pr_events: HashMap<String, SerializablePrPurgatoryEntry>,
97 /// Expired event IDs with their expiry timestamps
98 expired_events: HashMap<String, SystemTime>,
99}
100
41/// Main purgatory structure holding events awaiting git data. 101/// Main purgatory structure holding events awaiting git data.
42/// 102///
43/// Provides thread-safe concurrent access to two separate stores: 103/// Provides thread-safe concurrent access to two separate stores:
@@ -667,6 +727,260 @@ impl Purgatory {
667 pub fn sync_queue_size(&self) -> usize { 727 pub fn sync_queue_size(&self) -> usize {
668 self.sync_queue.len() 728 self.sync_queue.len()
669 } 729 }
730
731 /// Get all repository identifiers currently in purgatory.
732 ///
733 /// Returns a list of all unique repository identifiers that have state events
734 /// in purgatory. This is useful for re-queueing repositories after restore.
735 ///
736 /// # Returns
737 /// Vector of repository identifiers (e.g., "owner/repo")
738 ///
739 /// # Example
740 /// ```no_run
741 /// use ngit_grasp::purgatory::Purgatory;
742 /// use std::path::PathBuf;
743 ///
744 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
745 /// let identifiers = purgatory.get_all_identifiers();
746 /// for id in identifiers {
747 /// println!("Repository in purgatory: {}", id);
748 /// }
749 /// ```
750 pub fn get_all_identifiers(&self) -> Vec<String> {
751 self.state_events
752 .iter()
753 .map(|entry| entry.key().clone())
754 .collect()
755 }
756
757 /// Save purgatory state to disk.
758 ///
759 /// Serializes the current purgatory state (state_events, pr_events, expired_events)
760 /// to JSON and saves it to the specified path. Time-based fields (`Instant`) are
761 /// converted to duration offsets from the current `SystemTime` for persistence.
762 ///
763 /// Note: The sync_queue is NOT persisted - it will be rebuilt when events are
764 /// restored from disk.
765 ///
766 /// # Arguments
767 /// * `path` - Path to save the state file
768 ///
769 /// # Returns
770 /// Ok(()) on success, Err on failure
771 ///
772 /// # Example
773 /// ```no_run
774 /// use ngit_grasp::purgatory::Purgatory;
775 /// use std::path::PathBuf;
776 ///
777 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
778 /// purgatory.save_to_disk(&PathBuf::from("/tmp/purgatory.json")).unwrap();
779 /// ```
780 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
781 let saved_at = SystemTime::now();
782 let now_instant = Instant::now();
783
784 // Convert state_events to serializable format
785 let mut state_events = HashMap::new();
786 for entry in self.state_events.iter() {
787 let identifier = entry.key().clone();
788 let entries: Vec<SerializableStatePurgatoryEntry> = entry
789 .value()
790 .iter()
791 .map(|e| {
792 let created_offset =
793 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
794 let expires_offset =
795 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
796
797 SerializableStatePurgatoryEntry {
798 event: e.event.clone(),
799 identifier: e.identifier.clone(),
800 author: e.author,
801 created_at_offset_secs: created_offset.as_secs(),
802 expires_at_offset_secs: expires_offset.as_secs(),
803 }
804 })
805 .collect();
806 state_events.insert(identifier, entries);
807 }
808
809 // Convert pr_events to serializable format
810 let mut pr_events = HashMap::new();
811 for entry in self.pr_events.iter() {
812 let event_id = entry.key().clone();
813 let e = entry.value();
814
815 let created_offset =
816 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
817 let expires_offset =
818 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
819
820 let serializable = SerializablePrPurgatoryEntry {
821 event: e.event.clone(),
822 commit: e.commit.clone(),
823 created_at_offset_secs: created_offset.as_secs(),
824 expires_at_offset_secs: expires_offset.as_secs(),
825 };
826 pr_events.insert(event_id, serializable);
827 }
828
829 // Convert expired_events to serializable format
830 // We use SystemTime instead of Instant offsets for expired events since
831 // we don't need high precision for cleanup timing
832 let mut expired_events = HashMap::new();
833 for entry in self.expired_events.iter() {
834 let event_id = entry.key().to_hex();
835 // Convert Instant to SystemTime (approximate)
836 let expired_at_instant = *entry.value();
837 let elapsed_since_expire = now_instant.saturating_duration_since(expired_at_instant);
838 let expired_at_system = saved_at - elapsed_since_expire;
839 expired_events.insert(event_id, expired_at_system);
840 }
841
842 // Create state structure
843 let state = PurgatoryState {
844 version: 1,
845 saved_at,
846 state_events,
847 pr_events,
848 expired_events,
849 };
850
851 // Serialize to JSON and write to file
852 let json = serde_json::to_string_pretty(&state)?;
853 std::fs::write(path, json)?;
854
855 tracing::info!(
856 path = %path.display(),
857 state_events = state.state_events.len(),
858 pr_events = state.pr_events.len(),
859 expired_events = state.expired_events.len(),
860 "Saved purgatory state to disk"
861 );
862
863 Ok(())
864 }
865
866 /// Restore purgatory state from disk.
867 ///
868 /// Loads a previously saved purgatory state from the specified path and populates
869 /// the current purgatory instance. Adjusts time-based fields to account for downtime
870 /// between save and restore.
871 ///
872 /// After successful restore, the state file is deleted to prevent accidental
873 /// double-restore.
874 ///
875 /// # Arguments
876 /// * `path` - Path to the saved state file
877 ///
878 /// # Returns
879 /// Ok(()) on success, Err if file doesn't exist or is corrupted
880 ///
881 /// # Example
882 /// ```no_run
883 /// use ngit_grasp::purgatory::Purgatory;
884 /// use std::path::PathBuf;
885 ///
886 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
887 /// match purgatory.restore_from_disk(&PathBuf::from("/tmp/purgatory.json")) {
888 /// Ok(()) => println!("State restored successfully"),
889 /// Err(e) => eprintln!("Failed to restore state: {}", e),
890 /// }
891 /// ```
892 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
893 // Read and parse state file
894 let json = std::fs::read_to_string(path)?;
895 let state: PurgatoryState = serde_json::from_str(&json)?;
896
897 // Verify version
898 if state.version != 1 {
899 return Err(format!("Unsupported state version: {}", state.version).into());
900 }
901
902 let now_instant = Instant::now();
903
904 // Restore state_events
905 for (identifier, entries) in state.state_events {
906 let restored_entries: Vec<StatePurgatoryEntry> = entries
907 .into_iter()
908 .map(|e| {
909 let created_at = persistence::offset_to_instant(
910 Duration::from_secs(e.created_at_offset_secs),
911 state.saved_at,
912 now_instant,
913 );
914 let expires_at = persistence::offset_to_instant(
915 Duration::from_secs(e.expires_at_offset_secs),
916 state.saved_at,
917 now_instant,
918 );
919
920 StatePurgatoryEntry {
921 event: e.event,
922 identifier: e.identifier,
923 author: e.author,
924 created_at,
925 expires_at,
926 }
927 })
928 .collect();
929
930 self.state_events.insert(identifier, restored_entries);
931 }
932
933 // Restore pr_events
934 for (event_id, e) in state.pr_events {
935 let created_at = persistence::offset_to_instant(
936 Duration::from_secs(e.created_at_offset_secs),
937 state.saved_at,
938 now_instant,
939 );
940 let expires_at = persistence::offset_to_instant(
941 Duration::from_secs(e.expires_at_offset_secs),
942 state.saved_at,
943 now_instant,
944 );
945
946 let entry = PrPurgatoryEntry {
947 event: e.event,
948 commit: e.commit,
949 created_at,
950 expires_at,
951 };
952
953 self.pr_events.insert(event_id, entry);
954 }
955
956 // Restore expired_events
957 for (event_id_hex, expired_at_system) in state.expired_events {
958 if let Ok(event_id) = EventId::from_hex(&event_id_hex) {
959 // Convert SystemTime back to Instant (approximate)
960 let elapsed_since_expire = SystemTime::now()
961 .duration_since(expired_at_system)
962 .unwrap_or(Duration::ZERO);
963 let expired_at_instant = now_instant - elapsed_since_expire;
964
965 self.expired_events.insert(event_id, expired_at_instant);
966 }
967 }
968
969 tracing::info!(
970 path = %path.display(),
971 state_events = self.state_events.len(),
972 pr_events = self.pr_events.len(),
973 expired_events = self.expired_events.len(),
974 saved_at = ?state.saved_at,
975 "Restored purgatory state from disk"
976 );
977
978 // Delete state file after successful restore
979 std::fs::remove_file(path)?;
980 tracing::debug!(path = %path.display(), "Deleted state file after restore");
981
982 Ok(())
983 }
670} 984}
671 985
672#[cfg(test)] 986#[cfg(test)]
@@ -1249,3 +1563,610 @@ fn test_user_can_resubmit_expired_event() {
1249 // - Skip the expired check for user-submitted events 1563 // - Skip the expired check for user-submitted events
1250 // - Allow the event to be re-added to purgatory or accepted if git data now exists 1564 // - Allow the event to be re-added to purgatory or accepted if git data now exists
1251} 1565}
1566
1567// ============================================================================
1568// Persistence Serialization Tests
1569// ============================================================================
1570
1571#[tokio::test]
1572async fn test_save_and_restore_state_events() {
1573 use tempfile::tempdir;
1574
1575 let temp_dir = tempdir().unwrap();
1576 let state_file = temp_dir.path().join("purgatory_state.json");
1577
1578 let purgatory = Purgatory::new(PathBuf::new());
1579 let keys = Keys::generate();
1580
1581 // Add multiple state events for the same identifier
1582 let event1 = EventBuilder::text_note("state event 1")
1583 .sign_with_keys(&keys)
1584 .unwrap();
1585 let event2 = EventBuilder::text_note("state event 2")
1586 .sign_with_keys(&keys)
1587 .unwrap();
1588
1589 let event1_id = event1.id;
1590 let event2_id = event2.id;
1591
1592 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key());
1593 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key());
1594
1595 // Save to disk
1596 purgatory.save_to_disk(&state_file).unwrap();
1597
1598 // Verify file exists
1599 assert!(state_file.exists());
1600
1601 // Create new purgatory and restore
1602 let purgatory2 = Purgatory::new(PathBuf::new());
1603 purgatory2.restore_from_disk(&state_file).unwrap();
1604
1605 // Verify file was deleted after restore
1606 assert!(!state_file.exists());
1607
1608 // Verify state events were restored
1609 let (state_count, _) = purgatory2.count();
1610 assert_eq!(state_count, 2);
1611
1612 let restored_entries = purgatory2.find_state("test-repo");
1613 assert_eq!(restored_entries.len(), 2);
1614
1615 // Verify event IDs match
1616 let restored_ids: Vec<EventId> = restored_entries.iter().map(|e| e.event.id).collect();
1617 assert!(restored_ids.contains(&event1_id));
1618 assert!(restored_ids.contains(&event2_id));
1619
1620 // Verify identifiers and authors match
1621 for entry in &restored_entries {
1622 assert_eq!(entry.identifier, "test-repo");
1623 assert_eq!(entry.author, keys.public_key());
1624 }
1625}
1626
1627#[tokio::test]
1628async fn test_save_and_restore_pr_events() {
1629 use nostr_sdk::{Kind, Tag, TagKind};
1630 use tempfile::tempdir;
1631
1632 let temp_dir = tempdir().unwrap();
1633 let state_file = temp_dir.path().join("purgatory_state.json");
1634
1635 let purgatory = Purgatory::new(PathBuf::new());
1636 let keys = Keys::generate();
1637
1638 // Add PR event with actual event
1639 let tags = vec![Tag::custom(
1640 TagKind::Custom("a".into()),
1641 vec!["30617:abc123:test-repo".to_string()],
1642 )];
1643
1644 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
1645 .tags(tags)
1646 .sign_with_keys(&keys)
1647 .unwrap();
1648
1649 let pr_event_id = pr_event.id;
1650
1651 purgatory.add_pr(
1652 pr_event.clone(),
1653 "pr-event-id".to_string(),
1654 "commit-abc".to_string(),
1655 );
1656
1657 // Save to disk
1658 purgatory.save_to_disk(&state_file).unwrap();
1659
1660 // Create new purgatory and restore
1661 let purgatory2 = Purgatory::new(PathBuf::new());
1662 purgatory2.restore_from_disk(&state_file).unwrap();
1663
1664 // Verify PR event was restored
1665 let (_, pr_count) = purgatory2.count();
1666 assert_eq!(pr_count, 1);
1667
1668 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap();
1669 assert!(restored_entry.event.is_some());
1670 assert_eq!(restored_entry.event.unwrap().id, pr_event_id);
1671 assert_eq!(restored_entry.commit, "commit-abc");
1672}
1673
1674#[tokio::test]
1675async fn test_save_and_restore_pr_placeholders() {
1676 use tempfile::tempdir;
1677
1678 let temp_dir = tempdir().unwrap();
1679 let state_file = temp_dir.path().join("purgatory_state.json");
1680
1681 let purgatory = Purgatory::new(PathBuf::new());
1682
1683 // Add PR placeholder (git data arrived first)
1684 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-def".to_string());
1685
1686 // Save to disk
1687 purgatory.save_to_disk(&state_file).unwrap();
1688
1689 // Create new purgatory and restore
1690 let purgatory2 = Purgatory::new(PathBuf::new());
1691 purgatory2.restore_from_disk(&state_file).unwrap();
1692
1693 // Verify placeholder was restored
1694 let (_, pr_count) = purgatory2.count();
1695 assert_eq!(pr_count, 1);
1696
1697 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap();
1698 assert!(restored_entry.event.is_none()); // Still a placeholder
1699 assert_eq!(restored_entry.commit, "commit-def");
1700
1701 // Verify it's findable as a placeholder
1702 assert_eq!(
1703 purgatory2.find_pr_placeholder("placeholder-id"),
1704 Some("commit-def".to_string())
1705 );
1706}
1707
1708#[tokio::test]
1709async fn test_save_and_restore_expired_events() {
1710 use tempfile::tempdir;
1711
1712 let temp_dir = tempdir().unwrap();
1713 let state_file = temp_dir.path().join("purgatory_state.json");
1714
1715 let purgatory = Purgatory::new(PathBuf::new());
1716 let keys = Keys::generate();
1717
1718 let event = EventBuilder::text_note("test")
1719 .sign_with_keys(&keys)
1720 .unwrap();
1721 let event_id = event.id;
1722
1723 // Add and expire event
1724 purgatory.add_state(event, "repo".to_string(), keys.public_key());
1725 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1726 for entry in entries.iter_mut() {
1727 entry.expires_at = Instant::now() - Duration::from_secs(1);
1728 }
1729 }
1730 purgatory.cleanup();
1731
1732 // Verify event is marked as expired
1733 assert!(purgatory.is_expired(&event_id));
1734 assert_eq!(purgatory.expired_count(), 1);
1735
1736 // Save to disk
1737 purgatory.save_to_disk(&state_file).unwrap();
1738
1739 // Create new purgatory and restore
1740 let purgatory2 = Purgatory::new(PathBuf::new());
1741 purgatory2.restore_from_disk(&state_file).unwrap();
1742
1743 // Verify expired event was restored
1744 assert!(purgatory2.is_expired(&event_id));
1745 assert_eq!(purgatory2.expired_count(), 1);
1746
1747 // Verify it's included in event_ids()
1748 let ids = purgatory2.event_ids();
1749 assert!(ids.contains(&event_id));
1750}
1751
1752#[tokio::test]
1753async fn test_save_and_restore_empty_purgatory() {
1754 use tempfile::tempdir;
1755
1756 let temp_dir = tempdir().unwrap();
1757 let state_file = temp_dir.path().join("purgatory_state.json");
1758
1759 let purgatory = Purgatory::new(PathBuf::new());
1760
1761 // Save empty purgatory
1762 purgatory.save_to_disk(&state_file).unwrap();
1763
1764 // Verify file exists
1765 assert!(state_file.exists());
1766
1767 // Create new purgatory and restore
1768 let purgatory2 = Purgatory::new(PathBuf::new());
1769 purgatory2.restore_from_disk(&state_file).unwrap();
1770
1771 // Verify purgatory is still empty
1772 let (state_count, pr_count) = purgatory2.count();
1773 assert_eq!(state_count, 0);
1774 assert_eq!(pr_count, 0);
1775 assert_eq!(purgatory2.expired_count(), 0);
1776}
1777
1778#[tokio::test]
1779async fn test_restore_missing_file() {
1780 use tempfile::tempdir;
1781
1782 let temp_dir = tempdir().unwrap();
1783 let state_file = temp_dir.path().join("nonexistent.json");
1784
1785 let purgatory = Purgatory::new(PathBuf::new());
1786
1787 // Attempting to restore from missing file should error
1788 let result = purgatory.restore_from_disk(&state_file);
1789 assert!(result.is_err());
1790
1791 // Purgatory should remain empty
1792 let (state_count, pr_count) = purgatory.count();
1793 assert_eq!(state_count, 0);
1794 assert_eq!(pr_count, 0);
1795}
1796
1797#[tokio::test]
1798async fn test_restore_corrupted_json() {
1799 use tempfile::tempdir;
1800
1801 let temp_dir = tempdir().unwrap();
1802 let state_file = temp_dir.path().join("corrupted.json");
1803
1804 // Write invalid JSON
1805 std::fs::write(&state_file, "{ this is not valid json }").unwrap();
1806
1807 let purgatory = Purgatory::new(PathBuf::new());
1808
1809 // Attempting to restore corrupted file should error
1810 let result = purgatory.restore_from_disk(&state_file);
1811 assert!(result.is_err());
1812
1813 // Purgatory should remain empty
1814 let (state_count, pr_count) = purgatory.count();
1815 assert_eq!(state_count, 0);
1816 assert_eq!(pr_count, 0);
1817}
1818
1819#[tokio::test]
1820async fn test_restore_unsupported_version() {
1821 use tempfile::tempdir;
1822
1823 let temp_dir = tempdir().unwrap();
1824 let state_file = temp_dir.path().join("wrong_version.json");
1825
1826 // Write state with unsupported version
1827 let state = r#"{
1828 "version": 999,
1829 "saved_at": {"secs_since_epoch": 1000000000, "nanos_since_epoch": 0},
1830 "state_events": {},
1831 "pr_events": {},
1832 "expired_events": {}
1833 }"#;
1834 std::fs::write(&state_file, state).unwrap();
1835
1836 let purgatory = Purgatory::new(PathBuf::new());
1837
1838 // Attempting to restore unsupported version should error
1839 let result = purgatory.restore_from_disk(&state_file);
1840 assert!(result.is_err());
1841 assert!(result
1842 .unwrap_err()
1843 .to_string()
1844 .contains("Unsupported state version"));
1845}
1846
1847#[tokio::test]
1848async fn test_downtime_calculation() {
1849 use tempfile::tempdir;
1850 use tokio::time::sleep;
1851
1852 let temp_dir = tempdir().unwrap();
1853 let state_file = temp_dir.path().join("purgatory_state.json");
1854
1855 let purgatory = Purgatory::new(PathBuf::new());
1856 let keys = Keys::generate();
1857
1858 // Add state event
1859 let event = EventBuilder::text_note("test")
1860 .sign_with_keys(&keys)
1861 .unwrap();
1862
1863 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1864
1865 // Get original expiry time
1866 let original_entries = purgatory.find_state("repo");
1867 let original_entry = &original_entries[0];
1868 let original_expires_at = original_entry.expires_at;
1869 let original_remaining = original_expires_at.saturating_duration_since(Instant::now());
1870
1871 // Save to disk
1872 purgatory.save_to_disk(&state_file).unwrap();
1873
1874 // Simulate downtime (100ms)
1875 sleep(Duration::from_millis(100)).await;
1876
1877 // Create new purgatory and restore
1878 let purgatory2 = Purgatory::new(PathBuf::new());
1879 purgatory2.restore_from_disk(&state_file).unwrap();
1880
1881 // Get restored expiry time
1882 let restored_entries = purgatory2.find_state("repo");
1883 let restored_entry = &restored_entries[0];
1884 let restored_expires_at = restored_entry.expires_at;
1885 let restored_remaining = restored_expires_at.saturating_duration_since(Instant::now());
1886
1887 // Remaining time should be approximately the same (accounting for downtime)
1888 // Allow 2000ms tolerance for test execution time and sleep duration
1889 let diff = if restored_remaining > original_remaining {
1890 restored_remaining.as_millis() - original_remaining.as_millis()
1891 } else {
1892 original_remaining.as_millis() - restored_remaining.as_millis()
1893 };
1894
1895 assert!(
1896 diff < 2000,
1897 "Downtime calculation should preserve remaining TTL. Original: {}ms, Restored: {}ms, Diff: {}ms",
1898 original_remaining.as_millis(),
1899 restored_remaining.as_millis(),
1900 diff
1901 );
1902}
1903
1904#[tokio::test]
1905async fn test_expiry_times_preserved() {
1906 use tempfile::tempdir;
1907
1908 let temp_dir = tempdir().unwrap();
1909 let state_file = temp_dir.path().join("purgatory_state.json");
1910
1911 let purgatory = Purgatory::new(PathBuf::new());
1912 let keys = Keys::generate();
1913
1914 // Add state event
1915 let event = EventBuilder::text_note("test")
1916 .sign_with_keys(&keys)
1917 .unwrap();
1918
1919 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1920
1921 // Manually set expiry to a specific time in the future
1922 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
1923 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1924 for entry in entries.iter_mut() {
1925 entry.expires_at = custom_expiry;
1926 }
1927 }
1928
1929 // Save to disk
1930 purgatory.save_to_disk(&state_file).unwrap();
1931
1932 // Create new purgatory and restore
1933 let purgatory2 = Purgatory::new(PathBuf::new());
1934 purgatory2.restore_from_disk(&state_file).unwrap();
1935
1936 // Get restored expiry time
1937 let restored_entries = purgatory2.find_state("repo");
1938 let restored_entry = &restored_entries[0];
1939 let restored_remaining = restored_entry
1940 .expires_at
1941 .saturating_duration_since(Instant::now());
1942
1943 // Should be approximately 600 seconds (allow 3 second tolerance for test execution)
1944 assert!(
1945 restored_remaining.as_secs() >= 597 && restored_remaining.as_secs() <= 603,
1946 "Expected ~600s remaining, got {}s",
1947 restored_remaining.as_secs()
1948 );
1949}
1950
1951#[tokio::test]
1952async fn test_multiple_state_events_same_identifier() {
1953 use tempfile::tempdir;
1954
1955 let temp_dir = tempdir().unwrap();
1956 let state_file = temp_dir.path().join("purgatory_state.json");
1957
1958 let purgatory = Purgatory::new(PathBuf::new());
1959 let keys1 = Keys::generate();
1960 let keys2 = Keys::generate();
1961 let keys3 = Keys::generate();
1962
1963 // Add multiple state events for the same identifier from different authors
1964 let event1 = EventBuilder::text_note("maintainer 1")
1965 .sign_with_keys(&keys1)
1966 .unwrap();
1967 let event2 = EventBuilder::text_note("maintainer 2")
1968 .sign_with_keys(&keys2)
1969 .unwrap();
1970 let event3 = EventBuilder::text_note("maintainer 3")
1971 .sign_with_keys(&keys3)
1972 .unwrap();
1973
1974 purgatory.add_state(
1975 event1.clone(),
1976 "shared-repo".to_string(),
1977 keys1.public_key(),
1978 );
1979 purgatory.add_state(
1980 event2.clone(),
1981 "shared-repo".to_string(),
1982 keys2.public_key(),
1983 );
1984 purgatory.add_state(
1985 event3.clone(),
1986 "shared-repo".to_string(),
1987 keys3.public_key(),
1988 );
1989
1990 // Save to disk
1991 purgatory.save_to_disk(&state_file).unwrap();
1992
1993 // Create new purgatory and restore
1994 let purgatory2 = Purgatory::new(PathBuf::new());
1995 purgatory2.restore_from_disk(&state_file).unwrap();
1996
1997 // Verify all three events were restored
1998 let restored_entries = purgatory2.find_state("shared-repo");
1999 assert_eq!(restored_entries.len(), 3);
2000
2001 // Verify all authors are present
2002 let authors: Vec<PublicKey> = restored_entries.iter().map(|e| e.author).collect();
2003 assert!(authors.contains(&keys1.public_key()));
2004 assert!(authors.contains(&keys2.public_key()));
2005 assert!(authors.contains(&keys3.public_key()));
2006}
2007
2008#[tokio::test]
2009async fn test_mixed_pr_events_and_placeholders() {
2010 use nostr_sdk::{Kind, Tag, TagKind};
2011 use tempfile::tempdir;
2012
2013 let temp_dir = tempdir().unwrap();
2014 let state_file = temp_dir.path().join("purgatory_state.json");
2015
2016 let purgatory = Purgatory::new(PathBuf::new());
2017 let keys = Keys::generate();
2018
2019 // Add PR event with actual event
2020 let tags = vec![Tag::custom(
2021 TagKind::Custom("a".into()),
2022 vec!["30617:abc123:test-repo".to_string()],
2023 )];
2024
2025 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
2026 .tags(tags)
2027 .sign_with_keys(&keys)
2028 .unwrap();
2029
2030 purgatory.add_pr(
2031 pr_event.clone(),
2032 "pr-with-event".to_string(),
2033 "commit-abc".to_string(),
2034 );
2035
2036 // Add PR placeholder
2037 purgatory.add_pr_placeholder("pr-placeholder".to_string(), "commit-def".to_string());
2038
2039 // Save to disk
2040 purgatory.save_to_disk(&state_file).unwrap();
2041
2042 // Create new purgatory and restore
2043 let purgatory2 = Purgatory::new(PathBuf::new());
2044 purgatory2.restore_from_disk(&state_file).unwrap();
2045
2046 // Verify both were restored correctly
2047 let (_, pr_count) = purgatory2.count();
2048 assert_eq!(pr_count, 2);
2049
2050 // Verify PR event
2051 let pr_entry = purgatory2.find_pr("pr-with-event").unwrap();
2052 assert!(pr_entry.event.is_some());
2053 assert_eq!(pr_entry.commit, "commit-abc");
2054
2055 // Verify placeholder
2056 let placeholder_entry = purgatory2.find_pr("pr-placeholder").unwrap();
2057 assert!(placeholder_entry.event.is_none());
2058 assert_eq!(placeholder_entry.commit, "commit-def");
2059 assert_eq!(
2060 purgatory2.find_pr_placeholder("pr-placeholder"),
2061 Some("commit-def".to_string())
2062 );
2063}
2064
2065#[tokio::test]
2066async fn test_file_cleanup_after_successful_restore() {
2067 use tempfile::tempdir;
2068
2069 let temp_dir = tempdir().unwrap();
2070 let state_file = temp_dir.path().join("purgatory_state.json");
2071
2072 let purgatory = Purgatory::new(PathBuf::new());
2073 let keys = Keys::generate();
2074
2075 // Add some data
2076 let event = EventBuilder::text_note("test")
2077 .sign_with_keys(&keys)
2078 .unwrap();
2079 purgatory.add_state(event, "repo".to_string(), keys.public_key());
2080
2081 // Save to disk
2082 purgatory.save_to_disk(&state_file).unwrap();
2083 assert!(state_file.exists());
2084
2085 // Restore
2086 let purgatory2 = Purgatory::new(PathBuf::new());
2087 purgatory2.restore_from_disk(&state_file).unwrap();
2088
2089 // File should be deleted after successful restore
2090 assert!(!state_file.exists());
2091}
2092
2093#[tokio::test]
2094async fn test_comprehensive_roundtrip() {
2095 use nostr_sdk::{Kind, Tag, TagKind};
2096 use tempfile::tempdir;
2097
2098 let temp_dir = tempdir().unwrap();
2099 let state_file = temp_dir.path().join("purgatory_state.json");
2100
2101 let purgatory = Purgatory::new(PathBuf::new());
2102 let keys1 = Keys::generate();
2103 let keys2 = Keys::generate();
2104
2105 // Add multiple state events
2106 let state1 = EventBuilder::text_note("state 1")
2107 .sign_with_keys(&keys1)
2108 .unwrap();
2109 let state2 = EventBuilder::text_note("state 2")
2110 .sign_with_keys(&keys2)
2111 .unwrap();
2112
2113 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key());
2114 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key());
2115
2116 // Add PR event
2117 let tags = vec![Tag::custom(
2118 TagKind::Custom("a".into()),
2119 vec!["30617:abc123:repo1".to_string()],
2120 )];
2121 let pr_event = EventBuilder::new(Kind::from(1618), "PR")
2122 .tags(tags)
2123 .sign_with_keys(&keys1)
2124 .unwrap();
2125 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string());
2126
2127 // Add PR placeholder
2128 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
2129
2130 // Add and expire an event
2131 let expired_event = EventBuilder::text_note("expired")
2132 .sign_with_keys(&keys1)
2133 .unwrap();
2134 let expired_id = expired_event.id;
2135 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key());
2136 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2137 for entry in entries.iter_mut() {
2138 entry.expires_at = Instant::now() - Duration::from_secs(1);
2139 }
2140 }
2141 purgatory.cleanup();
2142
2143 // Verify initial state
2144 let (state_count, pr_count) = purgatory.count();
2145 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up)
2146 assert_eq!(pr_count, 2); // pr-1, pr-2
2147 assert_eq!(purgatory.expired_count(), 1); // expired_event
2148
2149 // Save to disk
2150 purgatory.save_to_disk(&state_file).unwrap();
2151
2152 // Create new purgatory and restore
2153 let purgatory2 = Purgatory::new(PathBuf::new());
2154 purgatory2.restore_from_disk(&state_file).unwrap();
2155
2156 // Verify all data was restored correctly
2157 let (state_count2, pr_count2) = purgatory2.count();
2158 assert_eq!(state_count2, 2);
2159 assert_eq!(pr_count2, 2);
2160 assert_eq!(purgatory2.expired_count(), 1);
2161
2162 // Verify state events
2163 assert_eq!(purgatory2.find_state("repo1").len(), 1);
2164 assert_eq!(purgatory2.find_state("repo2").len(), 1);
2165
2166 // Verify PR events
2167 assert!(purgatory2.find_pr("pr-1").unwrap().event.is_some());
2168 assert!(purgatory2.find_pr("pr-2").unwrap().event.is_none());
2169
2170 // Verify expired event
2171 assert!(purgatory2.is_expired(&expired_id));
2172}
diff --git a/src/purgatory/persistence.rs b/src/purgatory/persistence.rs
new file mode 100644
index 0000000..7fca2cf
--- /dev/null
+++ b/src/purgatory/persistence.rs
@@ -0,0 +1,198 @@
1//! Persistence utilities for purgatory state.
2//!
3//! This module provides conversion functions between `Instant` (which cannot be
4//! serialized) and `Duration` offsets from a reference `SystemTime`. This allows
5//! purgatory state to be persisted to disk and restored across restarts.
6//!
7//! ## Time Handling
8//!
9//! - `Instant` is monotonic but cannot be serialized
10//! - `SystemTime` can be serialized but may go backwards (NTP, user changes)
11//! - We use `SystemTime` for persistence and convert to/from `Instant` at runtime
12//! - Downtime is accounted for when restoring state (elapsed time is preserved)
13
14use std::time::{Duration, Instant, SystemTime};
15
16/// Convert an `Instant` to a `Duration` offset from a reference `SystemTime`.
17///
18/// This allows storing an `Instant` as a serializable offset that can be
19/// restored later, accounting for system downtime.
20///
21/// # Arguments
22/// * `instant` - The `Instant` to convert
23/// * `reference_time` - The reference `SystemTime` (typically SystemTime::now())
24/// * `reference_instant` - The corresponding `Instant` (typically Instant::now())
25///
26/// # Returns
27/// Duration offset from the reference time
28///
29/// # Example
30/// ```
31/// use std::time::{Duration, Instant, SystemTime};
32/// use ngit_grasp::purgatory::persistence::instant_to_offset;
33///
34/// let now_system = SystemTime::now();
35/// let now_instant = Instant::now();
36/// let future = now_instant + Duration::from_secs(60);
37///
38/// let offset = instant_to_offset(future, now_system, now_instant);
39/// assert!(offset.as_secs() >= 60);
40/// ```
41pub fn instant_to_offset(
42 instant: Instant,
43 _reference_time: SystemTime,
44 reference_instant: Instant,
45) -> Duration {
46 if instant >= reference_instant {
47 // Future instant - return positive offset
48 instant.duration_since(reference_instant)
49 } else {
50 // Past instant - this shouldn't happen in normal operation,
51 // but we handle it by returning zero duration
52 Duration::ZERO
53 }
54}
55
56/// Convert a `Duration` offset back to an `Instant`, accounting for downtime.
57///
58/// This restores an `Instant` from a serialized offset, adjusting for the time
59/// that has elapsed since the state was saved.
60///
61/// # Arguments
62/// * `offset` - The duration offset from the saved reference time
63/// * `saved_at` - The `SystemTime` when the state was saved
64/// * `reference_instant` - The current `Instant` (typically Instant::now())
65///
66/// # Returns
67/// The restored `Instant`, adjusted for downtime
68///
69/// # Example
70/// ```
71/// use std::time::{Duration, Instant, SystemTime};
72/// use ngit_grasp::purgatory::persistence::offset_to_instant;
73///
74/// let saved_at = SystemTime::now();
75/// let offset = Duration::from_secs(60);
76/// let now_instant = Instant::now();
77///
78/// let restored = offset_to_instant(offset, saved_at, now_instant);
79/// // restored will be approximately now_instant + 60 seconds
80/// ```
81pub fn offset_to_instant(
82 offset: Duration,
83 saved_at: SystemTime,
84 reference_instant: Instant,
85) -> Instant {
86 // Calculate how much time has elapsed since the state was saved
87 let now_system = SystemTime::now();
88 let elapsed_since_save = now_system
89 .duration_since(saved_at)
90 .unwrap_or(Duration::ZERO);
91
92 // The original deadline was: saved_at + offset
93 // Time remaining = (saved_at + offset) - now_system
94 // = offset - elapsed_since_save
95
96 if offset > elapsed_since_save {
97 // Deadline is still in the future
98 let remaining = offset - elapsed_since_save;
99 reference_instant + remaining
100 } else {
101 // Deadline has already passed or is right now
102 reference_instant
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use std::thread;
110 use std::time::Duration;
111
112 #[test]
113 fn test_instant_to_offset_future() {
114 let now_system = SystemTime::now();
115 let now_instant = Instant::now();
116 let future = now_instant + Duration::from_secs(60);
117
118 let offset = instant_to_offset(future, now_system, now_instant);
119
120 // Should be approximately 60 seconds (within tolerance)
121 assert!(offset.as_secs() >= 59 && offset.as_secs() <= 61);
122 }
123
124 #[test]
125 fn test_instant_to_offset_past() {
126 let now_system = SystemTime::now();
127 let past_instant = Instant::now();
128 // Simulate some time passing
129 thread::sleep(Duration::from_millis(10));
130 let now_instant = Instant::now();
131
132 let offset = instant_to_offset(past_instant, now_system, now_instant);
133
134 // Past instants return zero duration
135 assert_eq!(offset, Duration::ZERO);
136 }
137
138 #[test]
139 fn test_offset_to_instant_with_time_remaining() {
140 let saved_at = SystemTime::now();
141 let offset = Duration::from_secs(60);
142
143 // Simulate a very short downtime (< 10ms)
144 thread::sleep(Duration::from_millis(5));
145
146 let now_instant = Instant::now();
147 let restored = offset_to_instant(offset, saved_at, now_instant);
148
149 // Should be approximately 60 seconds in the future
150 let remaining = restored.duration_since(now_instant);
151 assert!(
152 remaining.as_secs() >= 59 && remaining.as_secs() <= 61,
153 "Expected ~60s, got {}s",
154 remaining.as_secs()
155 );
156 }
157
158 #[test]
159 fn test_offset_to_instant_deadline_passed() {
160 // Simulate state saved 70 seconds ago with 60 second offset
161 let saved_at = SystemTime::now() - Duration::from_secs(70);
162 let offset = Duration::from_secs(60);
163
164 let now_instant = Instant::now();
165 let restored = offset_to_instant(offset, saved_at, now_instant);
166
167 // Deadline has passed, should be now or in the past
168 let remaining = restored.saturating_duration_since(now_instant);
169 assert_eq!(remaining, Duration::ZERO);
170 }
171
172 #[test]
173 fn test_round_trip_conversion() {
174 let now_system = SystemTime::now();
175 let now_instant = Instant::now();
176 let future = now_instant + Duration::from_secs(120);
177
178 // Convert to offset
179 let offset = instant_to_offset(future, now_system, now_instant);
180
181 // Immediately convert back (minimal downtime)
182 let restored = offset_to_instant(offset, now_system, now_instant);
183
184 // Should be very close to the original future instant
185 let diff = if restored > future {
186 restored.duration_since(future)
187 } else {
188 future.duration_since(restored)
189 };
190
191 // Allow for small timing differences (< 100ms)
192 assert!(
193 diff < Duration::from_millis(100),
194 "Round trip should preserve instant within 100ms, got {}ms",
195 diff.as_millis()
196 );
197 }
198}
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index 9c47616..919504b 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -5,8 +5,14 @@
5//! problem where either the nostr event or git push can arrive first. 5//! problem where either the nostr event or git push can arrive first.
6 6
7use nostr_sdk::prelude::*; 7use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize};
8use std::time::Instant; 9use std::time::Instant;
9 10
11/// Default value for Instant fields during deserialization
12fn instant_now() -> Instant {
13 Instant::now()
14}
15
10/// A reference name and its target object. 16/// A reference name and its target object.
11/// 17///
12/// Used to identify specific git refs (branches, tags) that a state event 18/// Used to identify specific git refs (branches, tags) that a state event
@@ -59,7 +65,10 @@ impl RefUpdate {
59/// State events declare the current state of a repository but may arrive 65/// State events declare the current state of a repository but may arrive
60/// before the corresponding git data has been pushed. This entry holds 66/// before the corresponding git data has been pushed. This entry holds
61/// the event and associated metadata until the git data arrives. 67/// the event and associated metadata until the git data arrives.
62#[derive(Debug, Clone)] 68///
69/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
70/// module to convert to/from serializable wrapper types.
71#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct StatePurgatoryEntry { 72pub struct StatePurgatoryEntry {
64 /// The nostr state event (kind 30618) awaiting git data 73 /// The nostr state event (kind 30618) awaiting git data
65 pub event: Event, 74 pub event: Event,
@@ -71,9 +80,11 @@ pub struct StatePurgatoryEntry {
71 pub author: PublicKey, 80 pub author: PublicKey,
72 81
73 /// When this entry was added to purgatory 82 /// When this entry was added to purgatory
83 #[serde(skip, default = "instant_now")]
74 pub created_at: Instant, 84 pub created_at: Instant,
75 85
76 /// Expiry deadline (30 min from creation, may be extended) 86 /// Expiry deadline (30 min from creation, may be extended)
87 #[serde(skip, default = "instant_now")]
77 pub expires_at: Instant, 88 pub expires_at: Instant,
78} 89}
79 90
@@ -82,7 +93,10 @@ pub struct StatePurgatoryEntry {
82/// PR events reference specific commits but may arrive before the git push 93/// PR events reference specific commits but may arrive before the git push
83/// containing those commits. Alternatively, a git push may arrive first, 94/// containing those commits. Alternatively, a git push may arrive first,
84/// creating a placeholder entry waiting for the corresponding PR event. 95/// creating a placeholder entry waiting for the corresponding PR event.
85#[derive(Debug, Clone)] 96///
97/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
98/// module to convert to/from serializable wrapper types.
99#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PrPurgatoryEntry { 100pub struct PrPurgatoryEntry {
87 /// The nostr PR event, if received (None = git data arrived first) 101 /// The nostr PR event, if received (None = git data arrived first)
88 pub event: Option<Event>, 102 pub event: Option<Event>,
@@ -92,8 +106,10 @@ pub struct PrPurgatoryEntry {
92 pub commit: String, 106 pub commit: String,
93 107
94 /// When this entry was added to purgatory 108 /// When this entry was added to purgatory
109 #[serde(skip, default = "instant_now")]
95 pub created_at: Instant, 110 pub created_at: Instant,
96 111
97 /// Expiry deadline (30 min from creation, may be extended) 112 /// Expiry deadline (30 min from creation, may be extended)
113 #[serde(skip, default = "instant_now")]
98 pub expires_at: Instant, 114 pub expires_at: Instant,
99} 115}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e2d55bd..3213dfb 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -43,6 +43,7 @@ pub use health::RelayHealthTracker;
43use tokio::time::sleep; 43use tokio::time::sleep;
44 44
45use std::collections::{HashMap, HashSet}; 45use std::collections::{HashMap, HashSet};
46use std::path::{Path, PathBuf};
46use std::sync::Arc; 47use std::sync::Arc;
47use std::time::Duration; 48use std::time::Duration;
48 49
@@ -581,6 +582,7 @@ impl SyncManager {
581 /// * `write_policy` - Policy for validating events before storage 582 /// * `write_policy` - Policy for validating events before storage
582 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) 583 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast)
583 /// * `config` - Configuration for sync settings 584 /// * `config` - Configuration for sync settings
585 /// * `data_path` - Path to git data directory (for persistence)
584 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) 586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled)
585 pub fn new( 587 pub fn new(
586 bootstrap_relay_url: Option<String>, 588 bootstrap_relay_url: Option<String>,
@@ -589,11 +591,42 @@ impl SyncManager {
589 write_policy: Nip34WritePolicy, 591 write_policy: Nip34WritePolicy,
590 local_relay: LocalRelay, 592 local_relay: LocalRelay,
591 config: &Config, 593 config: &Config,
594 data_path: PathBuf,
592 sync_metrics: Option<SyncMetrics>, 595 sync_metrics: Option<SyncMetrics>,
593 ) -> Self { 596 ) -> Self {
594 // Extract purgatory from write_policy for read-only access 597 // Extract purgatory from write_policy for read-only access
595 let purgatory = write_policy.purgatory().clone(); 598 let purgatory = write_policy.purgatory().clone();
596 599
600 // Create rejected events index
601 let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics {
602 RejectedEventsIndex::with_metrics(
603 Duration::from_secs(config.rejected_hot_cache_duration_secs),
604 Duration::from_secs(config.rejected_cold_index_expiry_secs),
605 metrics.clone(),
606 )
607 } else {
608 RejectedEventsIndex::new(
609 Duration::from_secs(config.rejected_hot_cache_duration_secs),
610 Duration::from_secs(config.rejected_cold_index_expiry_secs),
611 )
612 });
613
614 // Attempt to restore rejected events index from disk
615 let rejected_index_path = data_path.join("rejected-events-cache.json");
616 if rejected_index_path.exists() {
617 match rejected_events_index.restore_from_disk(&rejected_index_path) {
618 Ok(()) => {
619 tracing::info!("Restored rejected events index from disk");
620 }
621 Err(e) => {
622 tracing::warn!(
623 "Failed to restore rejected events index: {}, starting empty",
624 e
625 );
626 }
627 }
628 }
629
597 Self { 630 Self {
598 bootstrap_relay_url, 631 bootstrap_relay_url,
599 service_domain, 632 service_domain,
@@ -605,18 +638,7 @@ impl SyncManager {
605 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 638 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
606 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 639 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
607 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 640 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
608 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { 641 rejected_events_index,
609 RejectedEventsIndex::with_metrics(
610 Duration::from_secs(config.rejected_hot_cache_duration_secs),
611 Duration::from_secs(config.rejected_cold_index_expiry_secs),
612 metrics.clone(),
613 )
614 } else {
615 RejectedEventsIndex::new(
616 Duration::from_secs(config.rejected_hot_cache_duration_secs),
617 Duration::from_secs(config.rejected_cold_index_expiry_secs),
618 )
619 }),
620 connections: HashMap::new(), 642 connections: HashMap::new(),
621 health_tracker: Arc::new(RelayHealthTracker::new(config)), 643 health_tracker: Arc::new(RelayHealthTracker::new(config)),
622 next_batch_id: 0, 644 next_batch_id: 0,
@@ -637,6 +659,31 @@ impl SyncManager {
637 self.next_batch_id 659 self.next_batch_id
638 } 660 }
639 661
662 /// Get a clone of the rejected events index Arc.
663 ///
664 /// This allows access to the rejected events index for persistence
665 /// even after the SyncManager has been moved into a task.
666 ///
667 /// # Returns
668 /// Arc clone of the rejected events index
669 pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> {
670 self.rejected_events_index.clone()
671 }
672
673 /// Save rejected events index to disk.
674 ///
675 /// This is called during shutdown to persist the rejected events cache,
676 /// allowing us to avoid re-downloading rejected events after restart.
677 ///
678 /// # Arguments
679 /// * `path` - Path to save the rejected index file
680 ///
681 /// # Returns
682 /// Ok(()) on success, Err if save fails
683 pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
684 self.rejected_events_index.save_to_disk(path)
685 }
686
640 /// Handle EOSE (End Of Stored Events) for a subscription 687 /// Handle EOSE (End Of Stored Events) for a subscription
641 /// 688 ///
642 /// This method: 689 /// This method:
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index 4d31901..f25f22a 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -86,12 +86,14 @@
86//! ``` 86//! ```
87 87
88use nostr_sdk::{Event, EventId, PublicKey}; 88use nostr_sdk::{Event, EventId, PublicKey};
89use serde::{Deserialize, Serialize};
89use std::collections::{HashMap, HashSet}; 90use std::collections::{HashMap, HashSet};
91use std::path::Path;
90use std::sync::{Arc, RwLock}; 92use std::sync::{Arc, RwLock};
91use std::time::{Duration, Instant}; 93use std::time::{Duration, Instant, SystemTime};
92 94
93/// Type of event stored in the rejected events index 95/// Type of event stored in the rejected events index
94#[derive(Debug, Clone, Copy, PartialEq, Eq)] 96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum EventType { 97pub enum EventType {
96 /// Repository announcement (kind 30617) 98 /// Repository announcement (kind 30617)
97 Announcement, 99 Announcement,
@@ -109,7 +111,7 @@ impl std::fmt::Display for EventType {
109} 111}
110 112
111/// Reason why a repository announcement was rejected 113/// Reason why a repository announcement was rejected
112#[derive(Debug, Clone, Copy, PartialEq, Eq)] 114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
113pub enum RejectionReason { 115pub enum RejectionReason {
114 /// Announcement doesn't list this service in clone/web URLs 116 /// Announcement doesn't list this service in clone/web URLs
115 DoesNotListService, 117 DoesNotListService,
@@ -141,6 +143,20 @@ struct HotCacheEntry {
141 cached_at: Instant, 143 cached_at: Instant,
142} 144}
143 145
146/// Serializable version of HotCacheEntry for persistence
147///
148/// Converts Instant to Duration offset from saved_at time
149#[derive(Debug, Clone, Serialize, Deserialize)]
150struct SerializableHotCacheEntry {
151 event: Event,
152 pubkey: PublicKey,
153 identifier: String,
154 event_type: EventType,
155 reason: RejectionReason,
156 /// Duration since saved_at when this entry was cached
157 cached_at_offset_secs: u64,
158}
159
144/// Entry in the cold index (metadata only) 160/// Entry in the cold index (metadata only)
145/// 161///
146/// Note: event_id is stored as the HashMap key, not in this struct 162/// Note: event_id is stored as the HashMap key, not in this struct
@@ -154,6 +170,49 @@ struct ColdIndexEntry {
154 rejected_at: Instant, 170 rejected_at: Instant,
155} 171}
156 172
173/// Serializable version of ColdIndexEntry for persistence
174///
175/// Converts Instant to Duration offset from saved_at time
176#[derive(Debug, Clone, Serialize, Deserialize)]
177struct SerializableColdIndexEntry {
178 pubkey: PublicKey,
179 identifier: String,
180 event_type: EventType,
181 reason: RejectionReason,
182 /// Duration since saved_at when this entry was rejected
183 rejected_at_offset_secs: u64,
184}
185
186/// Serializable state for hot cache
187#[derive(Debug, Serialize, Deserialize)]
188struct SerializableHotCache {
189 expiry_duration_secs: u64,
190 entries: HashMap<EventId, SerializableHotCacheEntry>,
191}
192
193/// Serializable state for cold index
194#[derive(Debug, Serialize, Deserialize)]
195struct SerializableColdIndex {
196 expiry_duration_secs: u64,
197 entries: HashMap<EventId, SerializableColdIndexEntry>,
198}
199
200/// Complete rejected cache state for persistence
201///
202/// Stores both hot cache and cold index with version and timestamp information.
203/// All Instant fields are converted to Duration offsets from saved_at.
204#[derive(Debug, Serialize, Deserialize)]
205struct RejectedCacheState {
206 /// Version for future compatibility
207 version: u32,
208 /// When this state was saved
209 saved_at: SystemTime,
210 /// Hot cache entries with full events
211 hot_cache: SerializableHotCache,
212 /// Cold index entries with metadata only
213 cold_index: SerializableColdIndex,
214}
215
157/// Hot cache: Stores full events for immediate re-processing 216/// Hot cache: Stores full events for immediate re-processing
158/// 217///
159/// Events are stored for a short duration (default: 2 minutes) to enable 218/// Events are stored for a short duration (default: 2 minutes) to enable
@@ -603,6 +662,168 @@ impl RejectedEventsIndex {
603 662
604 ids 663 ids
605 } 664 }
665
666 /// Save rejected events cache to disk
667 ///
668 /// Serializes both hot cache and cold index to JSON, converting Instant timestamps
669 /// to Duration offsets from the save time. This allows timestamps to be adjusted
670 /// for downtime when restored.
671 ///
672 /// # Arguments
673 ///
674 /// * `path` - File path to write the serialized state to
675 ///
676 /// # Returns
677 ///
678 /// Ok(()) on success, or an error if serialization or file write fails
679 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
680 let saved_at = SystemTime::now();
681 let now = Instant::now();
682
683 // Lock both caches for consistent snapshot
684 let hot_entries = self.hot_cache.entries.read().unwrap();
685 let cold_entries = self.cold_index.entries.read().unwrap();
686
687 // Convert hot cache entries to serializable format
688 let serializable_hot_entries: HashMap<EventId, SerializableHotCacheEntry> = hot_entries
689 .iter()
690 .map(|(event_id, entry)| {
691 let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs();
692
693 let serializable_entry = SerializableHotCacheEntry {
694 event: entry.event.clone(),
695 pubkey: entry.pubkey,
696 identifier: entry.identifier.clone(),
697 event_type: entry.event_type,
698 reason: entry.reason,
699 cached_at_offset_secs,
700 };
701
702 (*event_id, serializable_entry)
703 })
704 .collect();
705
706 // Convert cold index entries to serializable format
707 let serializable_cold_entries: HashMap<EventId, SerializableColdIndexEntry> = cold_entries
708 .iter()
709 .map(|(event_id, entry)| {
710 let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs();
711
712 let serializable_entry = SerializableColdIndexEntry {
713 pubkey: entry.pubkey,
714 identifier: entry.identifier.clone(),
715 event_type: entry.event_type,
716 reason: entry.reason,
717 rejected_at_offset_secs,
718 };
719
720 (*event_id, serializable_entry)
721 })
722 .collect();
723
724 // Create complete state
725 let state = RejectedCacheState {
726 version: 1,
727 saved_at,
728 hot_cache: SerializableHotCache {
729 expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(),
730 entries: serializable_hot_entries,
731 },
732 cold_index: SerializableColdIndex {
733 expiry_duration_secs: self.cold_index.expiry_duration.as_secs(),
734 entries: serializable_cold_entries,
735 },
736 };
737
738 // Serialize to JSON and write to file
739 let json = serde_json::to_string_pretty(&state)?;
740 std::fs::write(path, json)?;
741
742 Ok(())
743 }
744
745 /// Restore rejected events cache from disk
746 ///
747 /// Loads the serialized state from disk and populates both hot cache and cold index.
748 /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain
749 /// correct expiry behavior. Deletes the state file after successful restore.
750 ///
751 /// # Arguments
752 ///
753 /// * `path` - File path to read the serialized state from
754 ///
755 /// # Returns
756 ///
757 /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails
758 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
759 // Load and parse JSON
760 let json = std::fs::read_to_string(path)?;
761 let state: RejectedCacheState = serde_json::from_str(&json)?;
762
763 // Calculate downtime (how long the relay was offline)
764 let now_system = SystemTime::now();
765 let downtime = now_system
766 .duration_since(state.saved_at)
767 .unwrap_or(Duration::ZERO);
768
769 let now_instant = Instant::now();
770
771 // Lock both caches for restoration
772 let mut hot_entries = self.hot_cache.entries.write().unwrap();
773 let mut cold_entries = self.cold_index.entries.write().unwrap();
774
775 // Restore hot cache entries
776 for (event_id, serializable_entry) in state.hot_cache.entries {
777 // Reconstruct cached_at by extending the offset by downtime
778 // Original offset (how long ago it was cached when saved)
779 let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs);
780 // Total offset including downtime
781 let total_offset = original_offset + downtime;
782
783 // cached_at = now - total_offset
784 let cached_at = now_instant - total_offset;
785
786 let entry = HotCacheEntry {
787 event: serializable_entry.event,
788 pubkey: serializable_entry.pubkey,
789 identifier: serializable_entry.identifier,
790 event_type: serializable_entry.event_type,
791 reason: serializable_entry.reason,
792 cached_at,
793 };
794
795 hot_entries.insert(event_id, entry);
796 }
797
798 // Restore cold index entries
799 for (event_id, serializable_entry) in state.cold_index.entries {
800 // Reconstruct rejected_at by extending the offset by downtime
801 let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs);
802 let total_offset = original_offset + downtime;
803
804 // rejected_at = now - total_offset
805 let rejected_at = now_instant - total_offset;
806
807 let entry = ColdIndexEntry {
808 pubkey: serializable_entry.pubkey,
809 identifier: serializable_entry.identifier,
810 event_type: serializable_entry.event_type,
811 reason: serializable_entry.reason,
812 rejected_at,
813 };
814
815 cold_entries.insert(event_id, entry);
816 }
817
818 // Release locks before deleting file
819 drop(hot_entries);
820 drop(cold_entries);
821
822 // Delete the state file after successful restore
823 std::fs::remove_file(path)?;
824
825 Ok(())
826 }
606} 827}
607 828
608#[cfg(test)] 829#[cfg(test)]
@@ -956,4 +1177,503 @@ mod tests {
956 // Cold index now empty 1177 // Cold index now empty
957 assert_eq!(index.cold_index_len(), 0); 1178 assert_eq!(index.cold_index_len(), 0);
958 } 1179 }
1180
1181 // ========================================================================
1182 // Persistence Serialization Tests
1183 // ========================================================================
1184
1185 #[tokio::test]
1186 async fn test_save_and_restore_hot_cache_roundtrip() {
1187 let temp_dir = tempfile::tempdir().unwrap();
1188 let state_path = temp_dir.path().join("rejected_cache.json");
1189
1190 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1191 let event = create_test_event().await;
1192 let pubkey = event.pubkey;
1193 let identifier = "test-repo".to_string();
1194
1195 // Add event to hot cache
1196 index.add_announcement(
1197 event.clone(),
1198 pubkey,
1199 identifier.clone(),
1200 RejectionReason::DoesNotListService,
1201 );
1202
1203 assert_eq!(index.hot_cache_len(), 1);
1204 assert_eq!(index.cold_index_len(), 1);
1205
1206 // Save to disk
1207 index.save_to_disk(&state_path).unwrap();
1208 assert!(state_path.exists());
1209
1210 // Create new index and restore
1211 let index2 =
1212 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1213 index2.restore_from_disk(&state_path).unwrap();
1214
1215 // Verify state file was deleted after restore
1216 assert!(!state_path.exists());
1217
1218 // Verify hot cache restored
1219 assert_eq!(index2.hot_cache_len(), 1);
1220 assert!(index2.hot_cache.contains(&event.id));
1221
1222 // Verify cold index restored
1223 assert_eq!(index2.cold_index_len(), 1);
1224 assert!(index2.cold_index.contains(&event.id));
1225
1226 // Verify we can retrieve the event
1227 let events = index2
1228 .hot_cache
1229 .get_maintainer_events(&pubkey, &identifier, None);
1230 assert_eq!(events.len(), 1);
1231 assert_eq!(events[0].id, event.id);
1232 }
1233
1234 #[tokio::test]
1235 async fn test_save_and_restore_cold_index_only() {
1236 let temp_dir = tempfile::tempdir().unwrap();
1237 let state_path = temp_dir.path().join("rejected_cache.json");
1238
1239 let index = RejectedEventsIndex::new(
1240 Duration::from_millis(50), // Hot cache expires quickly
1241 Duration::from_secs(604800), // Cold index lasts long
1242 );
1243 let event = create_test_event().await;
1244
1245 // Add event
1246 index.add_announcement(
1247 event.clone(),
1248 event.pubkey,
1249 "test-repo".to_string(),
1250 RejectionReason::MaintainerNotYetValid,
1251 );
1252
1253 // Wait for hot cache to expire
1254 std::thread::sleep(Duration::from_millis(60));
1255 index.cleanup_expired_for_type("announcement");
1256
1257 assert_eq!(index.hot_cache_len(), 0);
1258 assert_eq!(index.cold_index_len(), 1);
1259
1260 // Save to disk
1261 index.save_to_disk(&state_path).unwrap();
1262
1263 // Restore into new index
1264 let index2 =
1265 RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800));
1266 index2.restore_from_disk(&state_path).unwrap();
1267
1268 // Verify only cold index restored (hot cache was empty)
1269 assert_eq!(index2.hot_cache_len(), 0);
1270 assert_eq!(index2.cold_index_len(), 1);
1271 assert!(index2.cold_index.contains(&event.id));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_save_and_restore_both_hot_and_cold() {
1276 let temp_dir = tempfile::tempdir().unwrap();
1277 let state_path = temp_dir.path().join("rejected_cache.json");
1278
1279 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1280 let keys = Keys::generate();
1281
1282 // Create two events
1283 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1284 let event1 = keys.sign_event(unsigned1).await.unwrap();
1285
1286 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1287 let event2 = keys.sign_event(unsigned2).await.unwrap();
1288
1289 // Add both events
1290 index.add_announcement(
1291 event1.clone(),
1292 event1.pubkey,
1293 "repo1".to_string(),
1294 RejectionReason::DoesNotListService,
1295 );
1296
1297 index.add_state(
1298 event2.clone(),
1299 event2.pubkey,
1300 "repo2".to_string(),
1301 RejectionReason::Other,
1302 );
1303
1304 assert_eq!(index.hot_cache_len(), 2);
1305 assert_eq!(index.cold_index_len(), 2);
1306
1307 // Save to disk
1308 index.save_to_disk(&state_path).unwrap();
1309
1310 // Restore into new index
1311 let index2 =
1312 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1313 index2.restore_from_disk(&state_path).unwrap();
1314
1315 // Verify both caches restored
1316 assert_eq!(index2.hot_cache_len(), 2);
1317 assert_eq!(index2.cold_index_len(), 2);
1318 assert!(index2.contains(&event1.id));
1319 assert!(index2.contains(&event2.id));
1320 }
1321
1322 #[tokio::test]
1323 async fn test_save_and_restore_empty_cache() {
1324 let temp_dir = tempfile::tempdir().unwrap();
1325 let state_path = temp_dir.path().join("rejected_cache.json");
1326
1327 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1328
1329 // Save empty cache
1330 index.save_to_disk(&state_path).unwrap();
1331 assert!(state_path.exists());
1332
1333 // Restore into new index
1334 let index2 =
1335 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1336 index2.restore_from_disk(&state_path).unwrap();
1337
1338 // Verify empty state restored
1339 assert_eq!(index2.hot_cache_len(), 0);
1340 assert_eq!(index2.cold_index_len(), 0);
1341 }
1342
1343 #[tokio::test]
1344 async fn test_restore_missing_file() {
1345 let temp_dir = tempfile::tempdir().unwrap();
1346 let state_path = temp_dir.path().join("nonexistent.json");
1347
1348 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1349
1350 // Attempting to restore missing file should return error
1351 let result = index.restore_from_disk(&state_path);
1352 assert!(result.is_err());
1353
1354 // Index should remain empty
1355 assert_eq!(index.hot_cache_len(), 0);
1356 assert_eq!(index.cold_index_len(), 0);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_restore_corrupted_json() {
1361 let temp_dir = tempfile::tempdir().unwrap();
1362 let state_path = temp_dir.path().join("corrupted.json");
1363
1364 // Write corrupted JSON
1365 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
1366
1367 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1368
1369 // Attempting to restore corrupted file should return error
1370 let result = index.restore_from_disk(&state_path);
1371 assert!(result.is_err());
1372
1373 // Index should remain empty
1374 assert_eq!(index.hot_cache_len(), 0);
1375 assert_eq!(index.cold_index_len(), 0);
1376 }
1377
1378 #[tokio::test]
1379 async fn test_file_cleanup_after_successful_restore() {
1380 let temp_dir = tempfile::tempdir().unwrap();
1381 let state_path = temp_dir.path().join("rejected_cache.json");
1382
1383 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1384 let event = create_test_event().await;
1385
1386 index.add_announcement(
1387 event.clone(),
1388 event.pubkey,
1389 "test-repo".to_string(),
1390 RejectionReason::DoesNotListService,
1391 );
1392
1393 // Save to disk
1394 index.save_to_disk(&state_path).unwrap();
1395 assert!(state_path.exists());
1396
1397 // Restore
1398 let index2 =
1399 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1400 index2.restore_from_disk(&state_path).unwrap();
1401
1402 // File should be deleted after successful restore
1403 assert!(!state_path.exists());
1404 }
1405
1406 #[tokio::test]
1407 async fn test_downtime_calculation_preserves_expiry() {
1408 let temp_dir = tempfile::tempdir().unwrap();
1409 let state_path = temp_dir.path().join("rejected_cache.json");
1410
1411 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1412 let event = create_test_event().await;
1413
1414 index.add_announcement(
1415 event.clone(),
1416 event.pubkey,
1417 "test-repo".to_string(),
1418 RejectionReason::DoesNotListService,
1419 );
1420
1421 // Save to disk
1422 index.save_to_disk(&state_path).unwrap();
1423
1424 // Simulate downtime by sleeping
1425 std::thread::sleep(Duration::from_millis(100));
1426
1427 // Restore
1428 let index2 =
1429 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1430 index2.restore_from_disk(&state_path).unwrap();
1431
1432 // Event should still be in both caches (downtime accounted for)
1433 assert_eq!(index2.hot_cache_len(), 1);
1434 assert_eq!(index2.cold_index_len(), 1);
1435 assert!(index2.contains(&event.id));
1436 }
1437
1438 #[tokio::test]
1439 async fn test_entries_expired_during_downtime() {
1440 let temp_dir = tempfile::tempdir().unwrap();
1441 let state_path = temp_dir.path().join("rejected_cache.json");
1442
1443 // Create index with very short expiry
1444 let index = RejectedEventsIndex::new(
1445 Duration::from_millis(100), // Hot cache: 100ms
1446 Duration::from_millis(200), // Cold index: 200ms
1447 );
1448 let event = create_test_event().await;
1449
1450 index.add_announcement(
1451 event.clone(),
1452 event.pubkey,
1453 "test-repo".to_string(),
1454 RejectionReason::DoesNotListService,
1455 );
1456
1457 // Save to disk
1458 index.save_to_disk(&state_path).unwrap();
1459
1460 // Simulate downtime longer than hot cache expiry
1461 std::thread::sleep(Duration::from_millis(150));
1462
1463 // Restore
1464 let index2 =
1465 RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200));
1466 index2.restore_from_disk(&state_path).unwrap();
1467
1468 // Hot cache entry should have expired during downtime
1469 // Cold index should still have it (200ms expiry)
1470 assert_eq!(index2.hot_cache_len(), 1);
1471 assert_eq!(index2.cold_index_len(), 1);
1472
1473 // But when we try to get it, hot cache will see it's expired
1474 let events = index2
1475 .hot_cache
1476 .get_maintainer_events(&event.pubkey, "test-repo", None);
1477 assert_eq!(events.len(), 0); // Expired!
1478
1479 // Cleanup should remove it
1480 let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement");
1481 assert_eq!(hot_expired, 1);
1482 assert_eq!(cold_expired, 0); // Not expired yet
1483 }
1484
1485 #[tokio::test]
1486 async fn test_hot_cache_different_event_types() {
1487 let temp_dir = tempfile::tempdir().unwrap();
1488 let state_path = temp_dir.path().join("rejected_cache.json");
1489
1490 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1491 let keys = Keys::generate();
1492
1493 // Create announcement event
1494 let unsigned_ann =
1495 nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key());
1496 let event_ann = keys.sign_event(unsigned_ann).await.unwrap();
1497
1498 // Create state event
1499 let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key());
1500 let event_state = keys.sign_event(unsigned_state).await.unwrap();
1501
1502 // Add both types
1503 index.add_announcement(
1504 event_ann.clone(),
1505 event_ann.pubkey,
1506 "test-repo".to_string(),
1507 RejectionReason::DoesNotListService,
1508 );
1509
1510 index.add_state(
1511 event_state.clone(),
1512 event_state.pubkey,
1513 "test-repo".to_string(),
1514 RejectionReason::Other,
1515 );
1516
1517 // Save and restore
1518 index.save_to_disk(&state_path).unwrap();
1519 let index2 =
1520 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1521 index2.restore_from_disk(&state_path).unwrap();
1522
1523 // Verify both event types restored
1524 assert_eq!(index2.hot_cache_len(), 2);
1525 assert!(index2.contains(&event_ann.id));
1526 assert!(index2.contains(&event_state.id));
1527
1528 // Verify we can filter by type
1529 let (removed, events) = index2.invalidate_and_get(
1530 &event_ann.pubkey,
1531 "test-repo",
1532 Some(EventType::Announcement),
1533 );
1534 assert_eq!(removed, 1);
1535 assert_eq!(events.len(), 1);
1536 assert_eq!(events[0].id, event_ann.id);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_cold_index_different_rejection_reasons() {
1541 let temp_dir = tempfile::tempdir().unwrap();
1542 let state_path = temp_dir.path().join("rejected_cache.json");
1543
1544 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1545 let keys = Keys::generate();
1546
1547 // Create events with different rejection reasons
1548 let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key());
1549 let event1 = keys.sign_event(unsigned1).await.unwrap();
1550
1551 let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key());
1552 let event2 = keys.sign_event(unsigned2).await.unwrap();
1553
1554 let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key());
1555 let event3 = keys.sign_event(unsigned3).await.unwrap();
1556
1557 // Add with different rejection reasons
1558 index.add_announcement(
1559 event1.clone(),
1560 event1.pubkey,
1561 "repo1".to_string(),
1562 RejectionReason::DoesNotListService,
1563 );
1564
1565 index.add_announcement(
1566 event2.clone(),
1567 event2.pubkey,
1568 "repo2".to_string(),
1569 RejectionReason::MaintainerNotYetValid,
1570 );
1571
1572 index.add_announcement(
1573 event3.clone(),
1574 event3.pubkey,
1575 "repo3".to_string(),
1576 RejectionReason::Other,
1577 );
1578
1579 // Save and restore
1580 index.save_to_disk(&state_path).unwrap();
1581 let index2 =
1582 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1583 index2.restore_from_disk(&state_path).unwrap();
1584
1585 // Verify all entries restored with their rejection reasons
1586 assert_eq!(index2.cold_index_len(), 3);
1587 assert!(index2.contains(&event1.id));
1588 assert!(index2.contains(&event2.id));
1589 assert!(index2.contains(&event3.id));
1590 }
1591
1592 #[tokio::test]
1593 async fn test_multiple_save_restore_cycles() {
1594 let temp_dir = tempfile::tempdir().unwrap();
1595 let state_path = temp_dir.path().join("rejected_cache.json");
1596
1597 // First cycle
1598 let index1 =
1599 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1600 let event1 = create_test_event().await;
1601
1602 index1.add_announcement(
1603 event1.clone(),
1604 event1.pubkey,
1605 "repo1".to_string(),
1606 RejectionReason::DoesNotListService,
1607 );
1608
1609 index1.save_to_disk(&state_path).unwrap();
1610
1611 // Second cycle - restore and add more
1612 let index2 =
1613 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1614 index2.restore_from_disk(&state_path).unwrap();
1615
1616 let event2 = create_test_event().await;
1617 index2.add_announcement(
1618 event2.clone(),
1619 event2.pubkey,
1620 "repo2".to_string(),
1621 RejectionReason::MaintainerNotYetValid,
1622 );
1623
1624 assert_eq!(index2.hot_cache_len(), 2);
1625 index2.save_to_disk(&state_path).unwrap();
1626
1627 // Third cycle - restore again
1628 let index3 =
1629 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
1630 index3.restore_from_disk(&state_path).unwrap();
1631
1632 // Verify both events survived multiple cycles
1633 assert_eq!(index3.hot_cache_len(), 2);
1634 assert!(index3.contains(&event1.id));
1635 assert!(index3.contains(&event2.id));
1636 }
1637
1638 #[tokio::test]
1639 async fn test_restore_preserves_remaining_ttl() {
1640 let temp_dir = tempfile::tempdir().unwrap();
1641 let state_path = temp_dir.path().join("rejected_cache.json");
1642
1643 // Create index with 2 second hot cache expiry
1644 let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1645 let event = create_test_event().await;
1646
1647 index.add_announcement(
1648 event.clone(),
1649 event.pubkey,
1650 "test-repo".to_string(),
1651 RejectionReason::DoesNotListService,
1652 );
1653
1654 // Wait 200ms (small fraction of TTL)
1655 std::thread::sleep(Duration::from_millis(200));
1656
1657 // Save to disk
1658 index.save_to_disk(&state_path).unwrap();
1659
1660 // Immediately restore (minimal downtime)
1661 let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800));
1662 index2.restore_from_disk(&state_path).unwrap();
1663
1664 // Event should still be retrievable (has ~1.8s remaining)
1665 let events = index2
1666 .hot_cache
1667 .get_maintainer_events(&event.pubkey, "test-repo", None);
1668 assert_eq!(events.len(), 1);
1669
1670 // Wait 2 seconds (total 2.2s > 2s expiry)
1671 std::thread::sleep(Duration::from_secs(2));
1672
1673 // Now it should be expired
1674 let events = index2
1675 .hot_cache
1676 .get_maintainer_events(&event.pubkey, "test-repo", None);
1677 assert_eq!(events.len(), 0);
1678 }
959} 1679}
diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs
new file mode 100644
index 0000000..acefb41
--- /dev/null
+++ b/tests/purgatory_persistence.rs
@@ -0,0 +1,755 @@
1//! Purgatory Persistence Integration Tests
2//!
3//! Tests that verify the full purgatory persistence save/restore cycle:
4//! - Purgatory save/restore with state events, PR events, and expired events
5//! - Rejected cache save/restore with hot cache and cold index entries
6//! - Integration with shutdown/startup hooks
7//! - Graceful degradation with missing or corrupted files
8//! - Time adjustment for downtime
9//!
10//! # Test Strategy
11//!
12//! These tests verify end-to-end persistence functionality:
13//! 1. Create purgatory/rejected cache instances with various entries
14//! 2. Save state to disk
15//! 3. Create new instances and restore from disk
16//! 4. Verify all data is restored correctly
17//! 5. Verify system continues to work after restore
18//!
19//! # Running Tests
20//!
21//! ```bash
22//! # Run all purgatory persistence tests
23//! cargo test --test purgatory_persistence
24//!
25//! # Run specific test
26//! cargo test --test purgatory_persistence test_full_purgatory_save_restore_cycle
27//!
28//! # With output for debugging
29//! cargo test --test purgatory_persistence -- --nocapture
30//! ```
31
32mod common;
33
34use ngit_grasp::purgatory::Purgatory;
35use ngit_grasp::sync::rejected_index::{EventType, RejectedEventsIndex, RejectionReason};
36use nostr_sdk::prelude::*;
37use std::time::Duration;
38
39/// Helper to create a test event
40async fn create_test_event(keys: &Keys, content: &str) -> Event {
41 EventBuilder::text_note(content)
42 .sign_with_keys(keys)
43 .unwrap()
44}
45
46/// Helper to create a state event with specific refs
47fn create_state_event_with_refs(
48 keys: &Keys,
49 identifier: &str,
50 refs: &[(&str, &str)],
51) -> Result<Event, Box<dyn std::error::Error>> {
52 let mut tags = vec![Tag::identifier(identifier)];
53
54 // Add ref tags
55 for (ref_name, commit_hash) in refs {
56 tags.push(Tag::custom(
57 TagKind::custom("ref"),
58 vec![ref_name.to_string(), commit_hash.to_string()],
59 ));
60 }
61
62 let event = EventBuilder::new(Kind::from(30618), "")
63 .tags(tags)
64 .sign_with_keys(keys)?;
65
66 Ok(event)
67}
68
69/// Test 1: Full save/restore cycle with state events, PR events, and expired events
70#[tokio::test]
71async fn test_full_purgatory_save_restore_cycle() {
72 let temp_dir = tempfile::tempdir().unwrap();
73 let git_data_path = temp_dir.path().join("git");
74 let state_path = temp_dir.path().join("purgatory.json");
75
76 // Create purgatory instance
77 let purgatory = Purgatory::new(&git_data_path);
78
79 // Create test keys and events
80 let keys1 = Keys::generate();
81 let keys2 = Keys::generate();
82 let keys3 = Keys::generate();
83
84 let state_event1 =
85 create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")]).unwrap();
86 let state_event2 =
87 create_state_event_with_refs(&keys2, "repo2", &[("main", "def456")]).unwrap();
88
89 let pr_event1 = create_test_event(&keys3, "PR 1").await;
90 let pr_event2 = create_test_event(&keys3, "PR 2").await;
91
92 // Add state events to purgatory
93 purgatory.add_state(
94 state_event1.clone(),
95 "repo1".to_string(),
96 keys1.public_key(),
97 );
98 purgatory.add_state(
99 state_event2.clone(),
100 "repo2".to_string(),
101 keys2.public_key(),
102 );
103
104 // Add PR events to purgatory
105 purgatory.add_pr(
106 pr_event1.clone(),
107 pr_event1.id.to_hex(),
108 "commit-abc".to_string(),
109 );
110 purgatory.add_pr(
111 pr_event2.clone(),
112 pr_event2.id.to_hex(),
113 "commit-def".to_string(),
114 );
115
116 // Add a PR placeholder (git-data-first scenario)
117 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-xyz".to_string());
118
119 // Note: We can't directly test expired events without accessing private fields,
120 // so we'll focus on testing state and PR events persistence
121
122 // Verify initial counts
123 let (state_count, pr_count) = purgatory.count();
124 assert_eq!(state_count, 2, "Should have 2 state events");
125 assert_eq!(
126 pr_count, 3,
127 "Should have 3 PR events (2 events + 1 placeholder)"
128 );
129
130 // Save to disk
131 purgatory.save_to_disk(&state_path).unwrap();
132 assert!(state_path.exists(), "State file should exist after save");
133
134 // Create new purgatory instance and restore
135 let purgatory2 = Purgatory::new(&git_data_path);
136 purgatory2.restore_from_disk(&state_path).unwrap();
137
138 // Verify state file was deleted after restore
139 assert!(
140 !state_path.exists(),
141 "State file should be deleted after restore"
142 );
143
144 // Verify all data was restored
145 let (state_count2, pr_count2) = purgatory2.count();
146 assert_eq!(state_count2, 2, "Should have 2 state events after restore");
147 assert_eq!(
148 pr_count2, 3,
149 "Should have 3 PR events after restore (2 events + 1 placeholder)"
150 );
151
152 // Verify specific state events
153 let repo1_states = purgatory2.find_state("repo1");
154 assert_eq!(repo1_states.len(), 1);
155 assert_eq!(repo1_states[0].event.id, state_event1.id);
156
157 let repo2_states = purgatory2.find_state("repo2");
158 assert_eq!(repo2_states.len(), 1);
159 assert_eq!(repo2_states[0].event.id, state_event2.id);
160
161 // Verify PR events
162 let pr1 = purgatory2.find_pr(&pr_event1.id.to_hex());
163 assert!(pr1.is_some());
164 assert_eq!(pr1.unwrap().commit, "commit-abc");
165
166 let pr2 = purgatory2.find_pr(&pr_event2.id.to_hex());
167 assert!(pr2.is_some());
168 assert_eq!(pr2.unwrap().commit, "commit-def");
169
170 // Verify placeholder
171 let placeholder = purgatory2.find_pr_placeholder("placeholder-id");
172 assert_eq!(placeholder, Some("commit-xyz".to_string()));
173
174 // Verify re-queueing works - get all identifiers
175 let identifiers = purgatory2.get_all_identifiers();
176 assert_eq!(identifiers.len(), 2);
177 assert!(identifiers.contains(&"repo1".to_string()));
178 assert!(identifiers.contains(&"repo2".to_string()));
179}
180
181/// Test 2: Rejected cache integration - save/restore hot cache and cold index
182#[tokio::test]
183async fn test_rejected_cache_save_restore_cycle() {
184 let temp_dir = tempfile::tempdir().unwrap();
185 let state_path = temp_dir.path().join("rejected_cache.json");
186
187 // Create rejected events index
188 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
189
190 // Create test events
191 let keys1 = Keys::generate();
192 let keys2 = Keys::generate();
193
194 let event1 = create_test_event(&keys1, "announcement 1").await;
195 let event2 = create_test_event(&keys2, "announcement 2").await;
196 let event3 = create_test_event(&keys1, "state 1").await;
197
198 // Add announcements to rejected cache
199 index.add_announcement(
200 event1.clone(),
201 event1.pubkey,
202 "repo1".to_string(),
203 RejectionReason::DoesNotListService,
204 );
205
206 index.add_announcement(
207 event2.clone(),
208 event2.pubkey,
209 "repo2".to_string(),
210 RejectionReason::MaintainerNotYetValid,
211 );
212
213 // Add state event to rejected cache
214 index.add_state(
215 event3.clone(),
216 event3.pubkey,
217 "repo1".to_string(),
218 RejectionReason::Other,
219 );
220
221 // Verify initial counts
222 assert_eq!(index.hot_cache_len(), 3);
223 assert_eq!(index.cold_index_len(), 3);
224
225 // Save to disk
226 index.save_to_disk(&state_path).unwrap();
227 assert!(state_path.exists());
228
229 // Create new index and restore
230 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
231 index2.restore_from_disk(&state_path).unwrap();
232
233 // Verify state file was deleted
234 assert!(!state_path.exists());
235
236 // Verify all entries restored
237 assert_eq!(index2.hot_cache_len(), 3);
238 assert_eq!(index2.cold_index_len(), 3);
239
240 // Verify specific entries
241 assert!(index2.contains(&event1.id));
242 assert!(index2.contains(&event2.id));
243 assert!(index2.contains(&event3.id));
244
245 // Verify we can invalidate and get events
246 let (removed, hot_events) =
247 index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement));
248 assert_eq!(removed, 1);
249 assert_eq!(hot_events.len(), 1);
250 assert_eq!(hot_events[0].id, event1.id);
251}
252
253/// Test 3: Simulated downtime - verify expiry times are adjusted correctly
254#[tokio::test]
255async fn test_purgatory_downtime_adjustment() {
256 let temp_dir = tempfile::tempdir().unwrap();
257 let git_data_path = temp_dir.path().join("git");
258 let state_path = temp_dir.path().join("purgatory.json");
259
260 let purgatory = Purgatory::new(&git_data_path);
261 let keys = Keys::generate();
262
263 let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
264 .unwrap();
265
266 purgatory.add_state(state_event.clone(), "repo1".to_string(), keys.public_key());
267
268 // Save to disk
269 purgatory.save_to_disk(&state_path).unwrap();
270
271 // Simulate downtime
272 tokio::time::sleep(Duration::from_millis(100)).await;
273
274 // Restore
275 let purgatory2 = Purgatory::new(&git_data_path);
276 purgatory2.restore_from_disk(&state_path).unwrap();
277
278 // Verify event is still there (downtime was accounted for)
279 let (state_count, _) = purgatory2.count();
280 assert_eq!(state_count, 1);
281
282 let repo1_states = purgatory2.find_state("repo1");
283 assert_eq!(repo1_states.len(), 1);
284 assert_eq!(repo1_states[0].event.id, state_event.id);
285
286 // Verify the event hasn't expired yet (expiry time was adjusted)
287 // The event should have ~30 minutes minus the downtime
288 let entry = &repo1_states[0];
289 let remaining = entry
290 .expires_at
291 .saturating_duration_since(std::time::Instant::now());
292 assert!(
293 remaining > Duration::from_secs(1700),
294 "Event should have most of its 30min expiry remaining"
295 );
296}
297
298/// Test 4: Rejected cache downtime adjustment
299#[tokio::test]
300async fn test_rejected_cache_downtime_adjustment() {
301 let temp_dir = tempfile::tempdir().unwrap();
302 let state_path = temp_dir.path().join("rejected_cache.json");
303
304 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
305 let keys = Keys::generate();
306
307 let event = create_test_event(&keys, "test").await;
308
309 index.add_announcement(
310 event.clone(),
311 event.pubkey,
312 "repo1".to_string(),
313 RejectionReason::DoesNotListService,
314 );
315
316 // Save to disk
317 index.save_to_disk(&state_path).unwrap();
318
319 // Simulate downtime
320 tokio::time::sleep(Duration::from_millis(100)).await;
321
322 // Restore
323 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
324 index2.restore_from_disk(&state_path).unwrap();
325
326 // Verify event is still in both caches (downtime was accounted for)
327 assert_eq!(index2.hot_cache_len(), 1);
328 assert_eq!(index2.cold_index_len(), 1);
329 assert!(index2.contains(&event.id));
330}
331
332/// Test 5: File cleanup - verify state files are deleted after successful restore
333#[tokio::test]
334async fn test_purgatory_file_cleanup_after_restore() {
335 let temp_dir = tempfile::tempdir().unwrap();
336 let git_data_path = temp_dir.path().join("git");
337 let state_path = temp_dir.path().join("purgatory.json");
338
339 let purgatory = Purgatory::new(&git_data_path);
340 let keys = Keys::generate();
341
342 let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
343 .unwrap();
344
345 purgatory.add_state(state_event, "repo1".to_string(), keys.public_key());
346
347 // Save to disk
348 purgatory.save_to_disk(&state_path).unwrap();
349 assert!(state_path.exists(), "State file should exist after save");
350
351 // Restore
352 let purgatory2 = Purgatory::new(&git_data_path);
353 purgatory2.restore_from_disk(&state_path).unwrap();
354
355 // Verify file was deleted
356 assert!(
357 !state_path.exists(),
358 "State file should be deleted after successful restore"
359 );
360}
361
362/// Test 6: Rejected cache file cleanup
363#[tokio::test]
364async fn test_rejected_cache_file_cleanup_after_restore() {
365 let temp_dir = tempfile::tempdir().unwrap();
366 let state_path = temp_dir.path().join("rejected_cache.json");
367
368 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
369 let keys = Keys::generate();
370
371 let event = create_test_event(&keys, "test").await;
372
373 index.add_announcement(
374 event,
375 keys.public_key(),
376 "repo1".to_string(),
377 RejectionReason::DoesNotListService,
378 );
379
380 // Save to disk
381 index.save_to_disk(&state_path).unwrap();
382 assert!(state_path.exists());
383
384 // Restore
385 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
386 index2.restore_from_disk(&state_path).unwrap();
387
388 // Verify file was deleted
389 assert!(!state_path.exists());
390}
391
392/// Test 7: Graceful degradation - missing purgatory file
393#[tokio::test]
394async fn test_purgatory_restore_missing_file() {
395 let temp_dir = tempfile::tempdir().unwrap();
396 let git_data_path = temp_dir.path().join("git");
397 let state_path = temp_dir.path().join("nonexistent.json");
398
399 let purgatory = Purgatory::new(&git_data_path);
400
401 // Attempting to restore missing file should return error
402 let result = purgatory.restore_from_disk(&state_path);
403 assert!(result.is_err(), "Should error on missing file");
404
405 // Purgatory should still be usable (empty state)
406 let (state_count, pr_count) = purgatory.count();
407 assert_eq!(state_count, 0);
408 assert_eq!(pr_count, 0);
409
410 // Should be able to add events normally
411 let keys = Keys::generate();
412 let event = create_test_event(&keys, "test").await;
413 purgatory.add_state(event, "repo1".to_string(), keys.public_key());
414
415 let (state_count, _) = purgatory.count();
416 assert_eq!(state_count, 1);
417}
418
419/// Test 8: Graceful degradation - missing rejected cache file
420#[tokio::test]
421async fn test_rejected_cache_restore_missing_file() {
422 let temp_dir = tempfile::tempdir().unwrap();
423 let state_path = temp_dir.path().join("nonexistent.json");
424
425 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
426
427 // Attempting to restore missing file should return error
428 let result = index.restore_from_disk(&state_path);
429 assert!(result.is_err());
430
431 // Index should still be usable (empty state)
432 assert_eq!(index.hot_cache_len(), 0);
433 assert_eq!(index.cold_index_len(), 0);
434
435 // Should be able to add events normally
436 let keys = Keys::generate();
437 let event = create_test_event(&keys, "test").await;
438 index.add_announcement(
439 event,
440 keys.public_key(),
441 "repo1".to_string(),
442 RejectionReason::DoesNotListService,
443 );
444
445 assert_eq!(index.hot_cache_len(), 1);
446 assert_eq!(index.cold_index_len(), 1);
447}
448
449/// Test 9: Graceful degradation - corrupted purgatory file
450#[tokio::test]
451async fn test_purgatory_restore_corrupted_file() {
452 let temp_dir = tempfile::tempdir().unwrap();
453 let git_data_path = temp_dir.path().join("git");
454 let state_path = temp_dir.path().join("corrupted.json");
455
456 // Write corrupted JSON
457 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
458
459 let purgatory = Purgatory::new(&git_data_path);
460
461 // Attempting to restore corrupted file should return error
462 let result = purgatory.restore_from_disk(&state_path);
463 assert!(result.is_err(), "Should error on corrupted file");
464
465 // Purgatory should still be usable
466 let (state_count, pr_count) = purgatory.count();
467 assert_eq!(state_count, 0);
468 assert_eq!(pr_count, 0);
469}
470
471/// Test 10: Graceful degradation - corrupted rejected cache file
472#[tokio::test]
473async fn test_rejected_cache_restore_corrupted_file() {
474 let temp_dir = tempfile::tempdir().unwrap();
475 let state_path = temp_dir.path().join("corrupted.json");
476
477 // Write corrupted JSON
478 std::fs::write(&state_path, "{ invalid json !!!").unwrap();
479
480 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
481
482 // Attempting to restore corrupted file should return error
483 let result = index.restore_from_disk(&state_path);
484 assert!(result.is_err());
485
486 // Index should still be usable
487 assert_eq!(index.hot_cache_len(), 0);
488 assert_eq!(index.cold_index_len(), 0);
489}
490
491/// Test 11: Empty purgatory save/restore
492#[tokio::test]
493async fn test_empty_purgatory_save_restore() {
494 let temp_dir = tempfile::tempdir().unwrap();
495 let git_data_path = temp_dir.path().join("git");
496 let state_path = temp_dir.path().join("purgatory.json");
497
498 let purgatory = Purgatory::new(&git_data_path);
499
500 // Save empty purgatory
501 purgatory.save_to_disk(&state_path).unwrap();
502 assert!(state_path.exists());
503
504 // Restore
505 let purgatory2 = Purgatory::new(&git_data_path);
506 purgatory2.restore_from_disk(&state_path).unwrap();
507
508 // Verify empty state
509 let (state_count, pr_count) = purgatory2.count();
510 assert_eq!(state_count, 0);
511 assert_eq!(pr_count, 0);
512 assert_eq!(purgatory2.expired_count(), 0);
513}
514
515/// Test 12: Empty rejected cache save/restore
516#[tokio::test]
517async fn test_empty_rejected_cache_save_restore() {
518 let temp_dir = tempfile::tempdir().unwrap();
519 let state_path = temp_dir.path().join("rejected_cache.json");
520
521 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
522
523 // Save empty cache
524 index.save_to_disk(&state_path).unwrap();
525 assert!(state_path.exists());
526
527 // Restore
528 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
529 index2.restore_from_disk(&state_path).unwrap();
530
531 // Verify empty state
532 assert_eq!(index2.hot_cache_len(), 0);
533 assert_eq!(index2.cold_index_len(), 0);
534}
535
536/// Test 13: Multiple state events for same identifier
537#[tokio::test]
538async fn test_purgatory_multiple_state_events_same_identifier() {
539 let temp_dir = tempfile::tempdir().unwrap();
540 let git_data_path = temp_dir.path().join("git");
541 let state_path = temp_dir.path().join("purgatory.json");
542
543 let purgatory = Purgatory::new(&git_data_path);
544
545 // Create multiple state events for same identifier (different maintainers)
546 let keys1 = Keys::generate();
547 let keys2 = Keys::generate();
548
549 let event1 = create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")])
550 .unwrap();
551 let event2 = create_state_event_with_refs(&keys2, "repo1", &[("main", "def456")])
552 .unwrap();
553
554 purgatory.add_state(event1.clone(), "repo1".to_string(), keys1.public_key());
555 purgatory.add_state(event2.clone(), "repo1".to_string(), keys2.public_key());
556
557 // Save and restore
558 purgatory.save_to_disk(&state_path).unwrap();
559
560 let purgatory2 = Purgatory::new(&git_data_path);
561 purgatory2.restore_from_disk(&state_path).unwrap();
562
563 // Verify both events restored
564 let repo1_states = purgatory2.find_state("repo1");
565 assert_eq!(repo1_states.len(), 2);
566
567 let event_ids: Vec<_> = repo1_states.iter().map(|e| e.event.id).collect();
568 assert!(event_ids.contains(&event1.id));
569 assert!(event_ids.contains(&event2.id));
570}
571
572/// Test 14: Verify system continues to work after restore
573#[tokio::test]
574async fn test_purgatory_continues_working_after_restore() {
575 let temp_dir = tempfile::tempdir().unwrap();
576 let git_data_path = temp_dir.path().join("git");
577 let state_path = temp_dir.path().join("purgatory.json");
578
579 let purgatory = Purgatory::new(&git_data_path);
580 let keys = Keys::generate();
581
582 let event1 = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
583 .unwrap();
584
585 purgatory.add_state(event1.clone(), "repo1".to_string(), keys.public_key());
586
587 // Save and restore
588 purgatory.save_to_disk(&state_path).unwrap();
589
590 let purgatory2 = Purgatory::new(&git_data_path);
591 purgatory2.restore_from_disk(&state_path).unwrap();
592
593 // Add new events after restore
594 let event2 = create_state_event_with_refs(&keys, "repo2", &[("main", "xyz789")])
595 .unwrap();
596
597 purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key());
598
599 // Verify both old and new events work
600 let (state_count, _) = purgatory2.count();
601 assert_eq!(state_count, 2);
602
603 let repo1_states = purgatory2.find_state("repo1");
604 assert_eq!(repo1_states.len(), 1);
605 assert_eq!(repo1_states[0].event.id, event1.id);
606
607 let repo2_states = purgatory2.find_state("repo2");
608 assert_eq!(repo2_states.len(), 1);
609 assert_eq!(repo2_states[0].event.id, event2.id);
610
611 // Verify cleanup still works
612 let (state_removed, pr_removed) = purgatory2.cleanup();
613 // Nothing should be expired yet
614 assert_eq!(state_removed, 0);
615 assert_eq!(pr_removed, 0);
616}
617
618/// Test 15: Verify rejected cache continues working after restore
619#[tokio::test]
620async fn test_rejected_cache_continues_working_after_restore() {
621 let temp_dir = tempfile::tempdir().unwrap();
622 let state_path = temp_dir.path().join("rejected_cache.json");
623
624 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
625 let keys = Keys::generate();
626
627 let event1 = create_test_event(&keys, "event1").await;
628
629 index.add_announcement(
630 event1.clone(),
631 event1.pubkey,
632 "repo1".to_string(),
633 RejectionReason::DoesNotListService,
634 );
635
636 // Save and restore
637 index.save_to_disk(&state_path).unwrap();
638
639 let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
640 index2.restore_from_disk(&state_path).unwrap();
641
642 // Add new events after restore
643 let event2 = create_test_event(&keys, "event2").await;
644
645 index2.add_announcement(
646 event2.clone(),
647 event2.pubkey,
648 "repo2".to_string(),
649 RejectionReason::MaintainerNotYetValid,
650 );
651
652 // Verify both old and new events work
653 assert_eq!(index2.hot_cache_len(), 2);
654 assert_eq!(index2.cold_index_len(), 2);
655 assert!(index2.contains(&event1.id));
656 assert!(index2.contains(&event2.id));
657
658 // Verify invalidation still works
659 let (removed, hot_events) =
660 index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement));
661 assert_eq!(removed, 1);
662 assert_eq!(hot_events.len(), 1);
663 assert_eq!(hot_events[0].id, event1.id);
664}
665
666/// Test 16: Entries that expired during downtime are properly handled
667#[tokio::test]
668async fn test_purgatory_entries_expired_during_downtime() {
669 let temp_dir = tempfile::tempdir().unwrap();
670 let git_data_path = temp_dir.path().join("git");
671 let state_path = temp_dir.path().join("purgatory.json");
672
673 let purgatory = Purgatory::new(&git_data_path);
674 let keys = Keys::generate();
675
676 let event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")])
677 .unwrap();
678
679 purgatory.add_state(event.clone(), "repo1".to_string(), keys.public_key());
680
681 // Save to disk
682 purgatory.save_to_disk(&state_path).unwrap();
683
684 // Simulate very long downtime (longer than the 30min default expiry)
685 // Note: We can't manually set expiry without accessing private fields,
686 // so this test verifies that the system handles already-expired entries gracefully
687 // In a real scenario, if downtime > 30 minutes, entries would be expired on restore
688
689 // For this test, we'll just verify the restore works and cleanup can be called
690 let purgatory2 = Purgatory::new(&git_data_path);
691 purgatory2.restore_from_disk(&state_path).unwrap();
692
693 // Event should be restored
694 let (state_count, _) = purgatory2.count();
695 assert_eq!(state_count, 1);
696
697 // Cleanup should work (even if nothing is expired yet)
698 let (state_removed, _) = purgatory2.cleanup();
699 // Nothing expired yet since we didn't wait 30 minutes
700 assert_eq!(state_removed, 0);
701
702 let (state_count, _) = purgatory2.count();
703 assert_eq!(state_count, 1);
704}
705
706/// Test 17: Rejected cache entries that expired during downtime
707#[tokio::test]
708async fn test_rejected_cache_entries_expired_during_downtime() {
709 let temp_dir = tempfile::tempdir().unwrap();
710 let state_path = temp_dir.path().join("rejected_cache.json");
711
712 // Create index with very short expiry
713 let index = RejectedEventsIndex::new(
714 Duration::from_millis(50), // Hot cache: 50ms
715 Duration::from_millis(100), // Cold index: 100ms
716 );
717 let keys = Keys::generate();
718
719 let event = create_test_event(&keys, "test").await;
720
721 index.add_announcement(
722 event.clone(),
723 event.pubkey,
724 "repo1".to_string(),
725 RejectionReason::DoesNotListService,
726 );
727
728 // Save to disk
729 index.save_to_disk(&state_path).unwrap();
730
731 // Simulate downtime longer than hot cache expiry
732 tokio::time::sleep(Duration::from_millis(75)).await;
733
734 // Restore
735 let index2 = RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_millis(100));
736 index2.restore_from_disk(&state_path).unwrap();
737
738 // Both should be restored initially
739 assert_eq!(index2.hot_cache_len(), 1);
740 assert_eq!(index2.cold_index_len(), 1);
741
742 // Note: We can't directly access hot_cache.get_maintainer_events (private method)
743 // But we can verify the entry is there via contains() and test cleanup
744
745 // Verify entry is still tracked
746 assert!(index2.contains(&event.id));
747
748 // Cleanup should remove expired hot cache entry
749 let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement");
750 assert_eq!(hot_expired, 1);
751 assert_eq!(cold_expired, 0); // Cold index still valid
752
753 assert_eq!(index2.hot_cache_len(), 0);
754 assert_eq!(index2.cold_index_len(), 1);
755}