upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:18:47 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:18:47 +0000
commitb6c70f765dd02fb0297888d671e455df33d6fcb4 (patch)
tree6458d711ef9a7776c31bdb108c39930cbcc3bd68
parent7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff)
feat(purgatory): add persistence to survive relay restarts
Implement save/restore functionality for purgatory state to prevent event loss during relay restarts. Events in purgatory (state events, PR events, and expired events) are now saved to disk on graceful shutdown and restored on startup. Key features: - Serialize purgatory state to JSON (purgatory-state.json) - Time conversion helpers for Instant <-> Duration serialization - Restore with downtime adjustment (preserves remaining TTL) - Graceful degradation (missing/corrupted files don't crash) - File cleanup after successful restore - get_all_identifiers() for re-queueing after restore Files: - src/purgatory/persistence.rs: Time conversion helpers - src/purgatory/types.rs: Serialization derives - src/purgatory/mod.rs: save_to_disk/restore_from_disk methods Tests: 15 unit tests covering serialization, downtime, edge cases
-rw-r--r--src/purgatory/mod.rs925
-rw-r--r--src/purgatory/persistence.rs198
-rw-r--r--src/purgatory/types.rs20
3 files changed, 1139 insertions, 4 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 20df19b..47798a6 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -12,6 +12,7 @@
12//! - **Separate stores**: State events and PR events use different indexing strategies 12//! - **Separate stores**: State events and PR events use different indexing strategies
13 13
14mod helpers; 14mod helpers;
15pub mod persistence;
15pub mod sync; 16pub mod sync;
16mod types; 17mod types;
17 18
@@ -20,10 +21,12 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
20 21
21use dashmap::DashMap; 22use dashmap::DashMap;
22use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
23use std::collections::HashSet; 26use std::collections::HashSet;
24use std::path::PathBuf; 27use std::path::{Path, PathBuf};
25use std::sync::Arc; 28use std::sync::Arc;
26use std::time::{Duration, Instant}; 29use std::time::{Duration, Instant, SystemTime};
27 30
28pub use sync::SyncQueueEntry; 31pub use sync::SyncQueueEntry;
29 32
@@ -38,6 +41,63 @@ const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180);
38/// Used for batching burst arrivals during negentropy sync. 41/// Used for batching burst arrivals during negentropy sync.
39const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); 42const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500);
40 43
44/// Serializable wrapper for `StatePurgatoryEntry` with time offsets.
45///
46/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
47/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49struct SerializableStatePurgatoryEntry {
50 /// The nostr state event (kind 30618) awaiting git data
51 event: Event,
52 /// The repository identifier from the event's 'd' tag
53 identifier: String,
54 /// Event author pubkey
55 author: PublicKey,
56 /// Duration offset from saved_at for created_at
57 created_at_offset_secs: u64,
58 /// Duration offset from saved_at for expires_at
59 expires_at_offset_secs: u64,
60}
61
62/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
63///
64/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
65/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67struct SerializablePrPurgatoryEntry {
68 /// The nostr PR event, if received (None = git data arrived first)
69 event: Option<Event>,
70 /// The expected commit SHA from 'c' tag (if event exists)
71 /// or the actual commit pushed (if git arrived first)
72 commit: String,
73 /// Duration offset from saved_at for created_at
74 created_at_offset_secs: u64,
75 /// Duration offset from saved_at for expires_at
76 expires_at_offset_secs: u64,
77}
78
79/// Serializable purgatory state for disk persistence.
80///
81/// Contains all purgatory data needed to restore state across restarts:
82/// - State events (indexed by identifier)
83/// - PR events (indexed by event ID)
84/// - Expired events (to prevent re-sync loops)
85/// - Version number for future compatibility
86/// - Saved timestamp for downtime calculation
87#[derive(Debug, Clone, Serialize, Deserialize)]
88struct PurgatoryState {
89 /// Version number for state format (currently 1)
90 version: u32,
91 /// When this state was saved to disk
92 saved_at: SystemTime,
93 /// State events indexed by repository identifier
94 state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>,
95 /// PR events indexed by event ID (hex string)
96 pr_events: HashMap<String, SerializablePrPurgatoryEntry>,
97 /// Expired event IDs with their expiry timestamps
98 expired_events: HashMap<String, SystemTime>,
99}
100
41/// Main purgatory structure holding events awaiting git data. 101/// Main purgatory structure holding events awaiting git data.
42/// 102///
43/// Provides thread-safe concurrent access to two separate stores: 103/// Provides thread-safe concurrent access to two separate stores:
@@ -667,6 +727,260 @@ impl Purgatory {
667 pub fn sync_queue_size(&self) -> usize { 727 pub fn sync_queue_size(&self) -> usize {
668 self.sync_queue.len() 728 self.sync_queue.len()
669 } 729 }
730
731 /// Get all repository identifiers currently in purgatory.
732 ///
733 /// Returns a list of all unique repository identifiers that have state events
734 /// in purgatory. This is useful for re-queueing repositories after restore.
735 ///
736 /// # Returns
737 /// Vector of repository identifiers (e.g., "owner/repo")
738 ///
739 /// # Example
740 /// ```no_run
741 /// use ngit_grasp::purgatory::Purgatory;
742 /// use std::path::PathBuf;
743 ///
744 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
745 /// let identifiers = purgatory.get_all_identifiers();
746 /// for id in identifiers {
747 /// println!("Repository in purgatory: {}", id);
748 /// }
749 /// ```
750 pub fn get_all_identifiers(&self) -> Vec<String> {
751 self.state_events
752 .iter()
753 .map(|entry| entry.key().clone())
754 .collect()
755 }
756
757 /// Save purgatory state to disk.
758 ///
759 /// Serializes the current purgatory state (state_events, pr_events, expired_events)
760 /// to JSON and saves it to the specified path. Time-based fields (`Instant`) are
761 /// converted to duration offsets from the current `SystemTime` for persistence.
762 ///
763 /// Note: The sync_queue is NOT persisted - it will be rebuilt when events are
764 /// restored from disk.
765 ///
766 /// # Arguments
767 /// * `path` - Path to save the state file
768 ///
769 /// # Returns
770 /// Ok(()) on success, Err on failure
771 ///
772 /// # Example
773 /// ```no_run
774 /// use ngit_grasp::purgatory::Purgatory;
775 /// use std::path::PathBuf;
776 ///
777 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
778 /// purgatory.save_to_disk(&PathBuf::from("/tmp/purgatory.json")).unwrap();
779 /// ```
780 pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
781 let saved_at = SystemTime::now();
782 let now_instant = Instant::now();
783
784 // Convert state_events to serializable format
785 let mut state_events = HashMap::new();
786 for entry in self.state_events.iter() {
787 let identifier = entry.key().clone();
788 let entries: Vec<SerializableStatePurgatoryEntry> = entry
789 .value()
790 .iter()
791 .map(|e| {
792 let created_offset =
793 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
794 let expires_offset =
795 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
796
797 SerializableStatePurgatoryEntry {
798 event: e.event.clone(),
799 identifier: e.identifier.clone(),
800 author: e.author,
801 created_at_offset_secs: created_offset.as_secs(),
802 expires_at_offset_secs: expires_offset.as_secs(),
803 }
804 })
805 .collect();
806 state_events.insert(identifier, entries);
807 }
808
809 // Convert pr_events to serializable format
810 let mut pr_events = HashMap::new();
811 for entry in self.pr_events.iter() {
812 let event_id = entry.key().clone();
813 let e = entry.value();
814
815 let created_offset =
816 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
817 let expires_offset =
818 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
819
820 let serializable = SerializablePrPurgatoryEntry {
821 event: e.event.clone(),
822 commit: e.commit.clone(),
823 created_at_offset_secs: created_offset.as_secs(),
824 expires_at_offset_secs: expires_offset.as_secs(),
825 };
826 pr_events.insert(event_id, serializable);
827 }
828
829 // Convert expired_events to serializable format
830 // We use SystemTime instead of Instant offsets for expired events since
831 // we don't need high precision for cleanup timing
832 let mut expired_events = HashMap::new();
833 for entry in self.expired_events.iter() {
834 let event_id = entry.key().to_hex();
835 // Convert Instant to SystemTime (approximate)
836 let expired_at_instant = *entry.value();
837 let elapsed_since_expire = now_instant.saturating_duration_since(expired_at_instant);
838 let expired_at_system = saved_at - elapsed_since_expire;
839 expired_events.insert(event_id, expired_at_system);
840 }
841
842 // Create state structure
843 let state = PurgatoryState {
844 version: 1,
845 saved_at,
846 state_events,
847 pr_events,
848 expired_events,
849 };
850
851 // Serialize to JSON and write to file
852 let json = serde_json::to_string_pretty(&state)?;
853 std::fs::write(path, json)?;
854
855 tracing::info!(
856 path = %path.display(),
857 state_events = state.state_events.len(),
858 pr_events = state.pr_events.len(),
859 expired_events = state.expired_events.len(),
860 "Saved purgatory state to disk"
861 );
862
863 Ok(())
864 }
865
866 /// Restore purgatory state from disk.
867 ///
868 /// Loads a previously saved purgatory state from the specified path and populates
869 /// the current purgatory instance. Adjusts time-based fields to account for downtime
870 /// between save and restore.
871 ///
872 /// After successful restore, the state file is deleted to prevent accidental
873 /// double-restore.
874 ///
875 /// # Arguments
876 /// * `path` - Path to the saved state file
877 ///
878 /// # Returns
879 /// Ok(()) on success, Err if file doesn't exist or is corrupted
880 ///
881 /// # Example
882 /// ```no_run
883 /// use ngit_grasp::purgatory::Purgatory;
884 /// use std::path::PathBuf;
885 ///
886 /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git"));
887 /// match purgatory.restore_from_disk(&PathBuf::from("/tmp/purgatory.json")) {
888 /// Ok(()) => println!("State restored successfully"),
889 /// Err(e) => eprintln!("Failed to restore state: {}", e),
890 /// }
891 /// ```
892 pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
893 // Read and parse state file
894 let json = std::fs::read_to_string(path)?;
895 let state: PurgatoryState = serde_json::from_str(&json)?;
896
897 // Verify version
898 if state.version != 1 {
899 return Err(format!("Unsupported state version: {}", state.version).into());
900 }
901
902 let now_instant = Instant::now();
903
904 // Restore state_events
905 for (identifier, entries) in state.state_events {
906 let restored_entries: Vec<StatePurgatoryEntry> = entries
907 .into_iter()
908 .map(|e| {
909 let created_at = persistence::offset_to_instant(
910 Duration::from_secs(e.created_at_offset_secs),
911 state.saved_at,
912 now_instant,
913 );
914 let expires_at = persistence::offset_to_instant(
915 Duration::from_secs(e.expires_at_offset_secs),
916 state.saved_at,
917 now_instant,
918 );
919
920 StatePurgatoryEntry {
921 event: e.event,
922 identifier: e.identifier,
923 author: e.author,
924 created_at,
925 expires_at,
926 }
927 })
928 .collect();
929
930 self.state_events.insert(identifier, restored_entries);
931 }
932
933 // Restore pr_events
934 for (event_id, e) in state.pr_events {
935 let created_at = persistence::offset_to_instant(
936 Duration::from_secs(e.created_at_offset_secs),
937 state.saved_at,
938 now_instant,
939 );
940 let expires_at = persistence::offset_to_instant(
941 Duration::from_secs(e.expires_at_offset_secs),
942 state.saved_at,
943 now_instant,
944 );
945
946 let entry = PrPurgatoryEntry {
947 event: e.event,
948 commit: e.commit,
949 created_at,
950 expires_at,
951 };
952
953 self.pr_events.insert(event_id, entry);
954 }
955
956 // Restore expired_events
957 for (event_id_hex, expired_at_system) in state.expired_events {
958 if let Ok(event_id) = EventId::from_hex(&event_id_hex) {
959 // Convert SystemTime back to Instant (approximate)
960 let elapsed_since_expire = SystemTime::now()
961 .duration_since(expired_at_system)
962 .unwrap_or(Duration::ZERO);
963 let expired_at_instant = now_instant - elapsed_since_expire;
964
965 self.expired_events.insert(event_id, expired_at_instant);
966 }
967 }
968
969 tracing::info!(
970 path = %path.display(),
971 state_events = self.state_events.len(),
972 pr_events = self.pr_events.len(),
973 expired_events = self.expired_events.len(),
974 saved_at = ?state.saved_at,
975 "Restored purgatory state from disk"
976 );
977
978 // Delete state file after successful restore
979 std::fs::remove_file(path)?;
980 tracing::debug!(path = %path.display(), "Deleted state file after restore");
981
982 Ok(())
983 }
670} 984}
671 985
672#[cfg(test)] 986#[cfg(test)]
@@ -1249,3 +1563,610 @@ fn test_user_can_resubmit_expired_event() {
1249 // - Skip the expired check for user-submitted events 1563 // - Skip the expired check for user-submitted events
1250 // - Allow the event to be re-added to purgatory or accepted if git data now exists 1564 // - Allow the event to be re-added to purgatory or accepted if git data now exists
1251} 1565}
1566
1567// ============================================================================
1568// Persistence Serialization Tests
1569// ============================================================================
1570
1571#[tokio::test]
1572async fn test_save_and_restore_state_events() {
1573 use tempfile::tempdir;
1574
1575 let temp_dir = tempdir().unwrap();
1576 let state_file = temp_dir.path().join("purgatory_state.json");
1577
1578 let purgatory = Purgatory::new(PathBuf::new());
1579 let keys = Keys::generate();
1580
1581 // Add multiple state events for the same identifier
1582 let event1 = EventBuilder::text_note("state event 1")
1583 .sign_with_keys(&keys)
1584 .unwrap();
1585 let event2 = EventBuilder::text_note("state event 2")
1586 .sign_with_keys(&keys)
1587 .unwrap();
1588
1589 let event1_id = event1.id;
1590 let event2_id = event2.id;
1591
1592 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key());
1593 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key());
1594
1595 // Save to disk
1596 purgatory.save_to_disk(&state_file).unwrap();
1597
1598 // Verify file exists
1599 assert!(state_file.exists());
1600
1601 // Create new purgatory and restore
1602 let purgatory2 = Purgatory::new(PathBuf::new());
1603 purgatory2.restore_from_disk(&state_file).unwrap();
1604
1605 // Verify file was deleted after restore
1606 assert!(!state_file.exists());
1607
1608 // Verify state events were restored
1609 let (state_count, _) = purgatory2.count();
1610 assert_eq!(state_count, 2);
1611
1612 let restored_entries = purgatory2.find_state("test-repo");
1613 assert_eq!(restored_entries.len(), 2);
1614
1615 // Verify event IDs match
1616 let restored_ids: Vec<EventId> = restored_entries.iter().map(|e| e.event.id).collect();
1617 assert!(restored_ids.contains(&event1_id));
1618 assert!(restored_ids.contains(&event2_id));
1619
1620 // Verify identifiers and authors match
1621 for entry in &restored_entries {
1622 assert_eq!(entry.identifier, "test-repo");
1623 assert_eq!(entry.author, keys.public_key());
1624 }
1625}
1626
1627#[tokio::test]
1628async fn test_save_and_restore_pr_events() {
1629 use nostr_sdk::{Kind, Tag, TagKind};
1630 use tempfile::tempdir;
1631
1632 let temp_dir = tempdir().unwrap();
1633 let state_file = temp_dir.path().join("purgatory_state.json");
1634
1635 let purgatory = Purgatory::new(PathBuf::new());
1636 let keys = Keys::generate();
1637
1638 // Add PR event with actual event
1639 let tags = vec![Tag::custom(
1640 TagKind::Custom("a".into()),
1641 vec!["30617:abc123:test-repo".to_string()],
1642 )];
1643
1644 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
1645 .tags(tags)
1646 .sign_with_keys(&keys)
1647 .unwrap();
1648
1649 let pr_event_id = pr_event.id;
1650
1651 purgatory.add_pr(
1652 pr_event.clone(),
1653 "pr-event-id".to_string(),
1654 "commit-abc".to_string(),
1655 );
1656
1657 // Save to disk
1658 purgatory.save_to_disk(&state_file).unwrap();
1659
1660 // Create new purgatory and restore
1661 let purgatory2 = Purgatory::new(PathBuf::new());
1662 purgatory2.restore_from_disk(&state_file).unwrap();
1663
1664 // Verify PR event was restored
1665 let (_, pr_count) = purgatory2.count();
1666 assert_eq!(pr_count, 1);
1667
1668 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap();
1669 assert!(restored_entry.event.is_some());
1670 assert_eq!(restored_entry.event.unwrap().id, pr_event_id);
1671 assert_eq!(restored_entry.commit, "commit-abc");
1672}
1673
1674#[tokio::test]
1675async fn test_save_and_restore_pr_placeholders() {
1676 use tempfile::tempdir;
1677
1678 let temp_dir = tempdir().unwrap();
1679 let state_file = temp_dir.path().join("purgatory_state.json");
1680
1681 let purgatory = Purgatory::new(PathBuf::new());
1682
1683 // Add PR placeholder (git data arrived first)
1684 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-def".to_string());
1685
1686 // Save to disk
1687 purgatory.save_to_disk(&state_file).unwrap();
1688
1689 // Create new purgatory and restore
1690 let purgatory2 = Purgatory::new(PathBuf::new());
1691 purgatory2.restore_from_disk(&state_file).unwrap();
1692
1693 // Verify placeholder was restored
1694 let (_, pr_count) = purgatory2.count();
1695 assert_eq!(pr_count, 1);
1696
1697 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap();
1698 assert!(restored_entry.event.is_none()); // Still a placeholder
1699 assert_eq!(restored_entry.commit, "commit-def");
1700
1701 // Verify it's findable as a placeholder
1702 assert_eq!(
1703 purgatory2.find_pr_placeholder("placeholder-id"),
1704 Some("commit-def".to_string())
1705 );
1706}
1707
1708#[tokio::test]
1709async fn test_save_and_restore_expired_events() {
1710 use tempfile::tempdir;
1711
1712 let temp_dir = tempdir().unwrap();
1713 let state_file = temp_dir.path().join("purgatory_state.json");
1714
1715 let purgatory = Purgatory::new(PathBuf::new());
1716 let keys = Keys::generate();
1717
1718 let event = EventBuilder::text_note("test")
1719 .sign_with_keys(&keys)
1720 .unwrap();
1721 let event_id = event.id;
1722
1723 // Add and expire event
1724 purgatory.add_state(event, "repo".to_string(), keys.public_key());
1725 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1726 for entry in entries.iter_mut() {
1727 entry.expires_at = Instant::now() - Duration::from_secs(1);
1728 }
1729 }
1730 purgatory.cleanup();
1731
1732 // Verify event is marked as expired
1733 assert!(purgatory.is_expired(&event_id));
1734 assert_eq!(purgatory.expired_count(), 1);
1735
1736 // Save to disk
1737 purgatory.save_to_disk(&state_file).unwrap();
1738
1739 // Create new purgatory and restore
1740 let purgatory2 = Purgatory::new(PathBuf::new());
1741 purgatory2.restore_from_disk(&state_file).unwrap();
1742
1743 // Verify expired event was restored
1744 assert!(purgatory2.is_expired(&event_id));
1745 assert_eq!(purgatory2.expired_count(), 1);
1746
1747 // Verify it's included in event_ids()
1748 let ids = purgatory2.event_ids();
1749 assert!(ids.contains(&event_id));
1750}
1751
1752#[tokio::test]
1753async fn test_save_and_restore_empty_purgatory() {
1754 use tempfile::tempdir;
1755
1756 let temp_dir = tempdir().unwrap();
1757 let state_file = temp_dir.path().join("purgatory_state.json");
1758
1759 let purgatory = Purgatory::new(PathBuf::new());
1760
1761 // Save empty purgatory
1762 purgatory.save_to_disk(&state_file).unwrap();
1763
1764 // Verify file exists
1765 assert!(state_file.exists());
1766
1767 // Create new purgatory and restore
1768 let purgatory2 = Purgatory::new(PathBuf::new());
1769 purgatory2.restore_from_disk(&state_file).unwrap();
1770
1771 // Verify purgatory is still empty
1772 let (state_count, pr_count) = purgatory2.count();
1773 assert_eq!(state_count, 0);
1774 assert_eq!(pr_count, 0);
1775 assert_eq!(purgatory2.expired_count(), 0);
1776}
1777
1778#[tokio::test]
1779async fn test_restore_missing_file() {
1780 use tempfile::tempdir;
1781
1782 let temp_dir = tempdir().unwrap();
1783 let state_file = temp_dir.path().join("nonexistent.json");
1784
1785 let purgatory = Purgatory::new(PathBuf::new());
1786
1787 // Attempting to restore from missing file should error
1788 let result = purgatory.restore_from_disk(&state_file);
1789 assert!(result.is_err());
1790
1791 // Purgatory should remain empty
1792 let (state_count, pr_count) = purgatory.count();
1793 assert_eq!(state_count, 0);
1794 assert_eq!(pr_count, 0);
1795}
1796
1797#[tokio::test]
1798async fn test_restore_corrupted_json() {
1799 use tempfile::tempdir;
1800
1801 let temp_dir = tempdir().unwrap();
1802 let state_file = temp_dir.path().join("corrupted.json");
1803
1804 // Write invalid JSON
1805 std::fs::write(&state_file, "{ this is not valid json }").unwrap();
1806
1807 let purgatory = Purgatory::new(PathBuf::new());
1808
1809 // Attempting to restore corrupted file should error
1810 let result = purgatory.restore_from_disk(&state_file);
1811 assert!(result.is_err());
1812
1813 // Purgatory should remain empty
1814 let (state_count, pr_count) = purgatory.count();
1815 assert_eq!(state_count, 0);
1816 assert_eq!(pr_count, 0);
1817}
1818
1819#[tokio::test]
1820async fn test_restore_unsupported_version() {
1821 use tempfile::tempdir;
1822
1823 let temp_dir = tempdir().unwrap();
1824 let state_file = temp_dir.path().join("wrong_version.json");
1825
1826 // Write state with unsupported version
1827 let state = r#"{
1828 "version": 999,
1829 "saved_at": {"secs_since_epoch": 1000000000, "nanos_since_epoch": 0},
1830 "state_events": {},
1831 "pr_events": {},
1832 "expired_events": {}
1833 }"#;
1834 std::fs::write(&state_file, state).unwrap();
1835
1836 let purgatory = Purgatory::new(PathBuf::new());
1837
1838 // Attempting to restore unsupported version should error
1839 let result = purgatory.restore_from_disk(&state_file);
1840 assert!(result.is_err());
1841 assert!(result
1842 .unwrap_err()
1843 .to_string()
1844 .contains("Unsupported state version"));
1845}
1846
1847#[tokio::test]
1848async fn test_downtime_calculation() {
1849 use tempfile::tempdir;
1850 use tokio::time::sleep;
1851
1852 let temp_dir = tempdir().unwrap();
1853 let state_file = temp_dir.path().join("purgatory_state.json");
1854
1855 let purgatory = Purgatory::new(PathBuf::new());
1856 let keys = Keys::generate();
1857
1858 // Add state event
1859 let event = EventBuilder::text_note("test")
1860 .sign_with_keys(&keys)
1861 .unwrap();
1862
1863 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1864
1865 // Get original expiry time
1866 let original_entries = purgatory.find_state("repo");
1867 let original_entry = &original_entries[0];
1868 let original_expires_at = original_entry.expires_at;
1869 let original_remaining = original_expires_at.saturating_duration_since(Instant::now());
1870
1871 // Save to disk
1872 purgatory.save_to_disk(&state_file).unwrap();
1873
1874 // Simulate downtime (100ms)
1875 sleep(Duration::from_millis(100)).await;
1876
1877 // Create new purgatory and restore
1878 let purgatory2 = Purgatory::new(PathBuf::new());
1879 purgatory2.restore_from_disk(&state_file).unwrap();
1880
1881 // Get restored expiry time
1882 let restored_entries = purgatory2.find_state("repo");
1883 let restored_entry = &restored_entries[0];
1884 let restored_expires_at = restored_entry.expires_at;
1885 let restored_remaining = restored_expires_at.saturating_duration_since(Instant::now());
1886
1887 // Remaining time should be approximately the same (accounting for downtime)
1888 // Allow 2000ms tolerance for test execution time and sleep duration
1889 let diff = if restored_remaining > original_remaining {
1890 restored_remaining.as_millis() - original_remaining.as_millis()
1891 } else {
1892 original_remaining.as_millis() - restored_remaining.as_millis()
1893 };
1894
1895 assert!(
1896 diff < 2000,
1897 "Downtime calculation should preserve remaining TTL. Original: {}ms, Restored: {}ms, Diff: {}ms",
1898 original_remaining.as_millis(),
1899 restored_remaining.as_millis(),
1900 diff
1901 );
1902}
1903
1904#[tokio::test]
1905async fn test_expiry_times_preserved() {
1906 use tempfile::tempdir;
1907
1908 let temp_dir = tempdir().unwrap();
1909 let state_file = temp_dir.path().join("purgatory_state.json");
1910
1911 let purgatory = Purgatory::new(PathBuf::new());
1912 let keys = Keys::generate();
1913
1914 // Add state event
1915 let event = EventBuilder::text_note("test")
1916 .sign_with_keys(&keys)
1917 .unwrap();
1918
1919 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key());
1920
1921 // Manually set expiry to a specific time in the future
1922 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
1923 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1924 for entry in entries.iter_mut() {
1925 entry.expires_at = custom_expiry;
1926 }
1927 }
1928
1929 // Save to disk
1930 purgatory.save_to_disk(&state_file).unwrap();
1931
1932 // Create new purgatory and restore
1933 let purgatory2 = Purgatory::new(PathBuf::new());
1934 purgatory2.restore_from_disk(&state_file).unwrap();
1935
1936 // Get restored expiry time
1937 let restored_entries = purgatory2.find_state("repo");
1938 let restored_entry = &restored_entries[0];
1939 let restored_remaining = restored_entry
1940 .expires_at
1941 .saturating_duration_since(Instant::now());
1942
1943 // Should be approximately 600 seconds (allow 3 second tolerance for test execution)
1944 assert!(
1945 restored_remaining.as_secs() >= 597 && restored_remaining.as_secs() <= 603,
1946 "Expected ~600s remaining, got {}s",
1947 restored_remaining.as_secs()
1948 );
1949}
1950
1951#[tokio::test]
1952async fn test_multiple_state_events_same_identifier() {
1953 use tempfile::tempdir;
1954
1955 let temp_dir = tempdir().unwrap();
1956 let state_file = temp_dir.path().join("purgatory_state.json");
1957
1958 let purgatory = Purgatory::new(PathBuf::new());
1959 let keys1 = Keys::generate();
1960 let keys2 = Keys::generate();
1961 let keys3 = Keys::generate();
1962
1963 // Add multiple state events for the same identifier from different authors
1964 let event1 = EventBuilder::text_note("maintainer 1")
1965 .sign_with_keys(&keys1)
1966 .unwrap();
1967 let event2 = EventBuilder::text_note("maintainer 2")
1968 .sign_with_keys(&keys2)
1969 .unwrap();
1970 let event3 = EventBuilder::text_note("maintainer 3")
1971 .sign_with_keys(&keys3)
1972 .unwrap();
1973
1974 purgatory.add_state(
1975 event1.clone(),
1976 "shared-repo".to_string(),
1977 keys1.public_key(),
1978 );
1979 purgatory.add_state(
1980 event2.clone(),
1981 "shared-repo".to_string(),
1982 keys2.public_key(),
1983 );
1984 purgatory.add_state(
1985 event3.clone(),
1986 "shared-repo".to_string(),
1987 keys3.public_key(),
1988 );
1989
1990 // Save to disk
1991 purgatory.save_to_disk(&state_file).unwrap();
1992
1993 // Create new purgatory and restore
1994 let purgatory2 = Purgatory::new(PathBuf::new());
1995 purgatory2.restore_from_disk(&state_file).unwrap();
1996
1997 // Verify all three events were restored
1998 let restored_entries = purgatory2.find_state("shared-repo");
1999 assert_eq!(restored_entries.len(), 3);
2000
2001 // Verify all authors are present
2002 let authors: Vec<PublicKey> = restored_entries.iter().map(|e| e.author).collect();
2003 assert!(authors.contains(&keys1.public_key()));
2004 assert!(authors.contains(&keys2.public_key()));
2005 assert!(authors.contains(&keys3.public_key()));
2006}
2007
2008#[tokio::test]
2009async fn test_mixed_pr_events_and_placeholders() {
2010 use nostr_sdk::{Kind, Tag, TagKind};
2011 use tempfile::tempdir;
2012
2013 let temp_dir = tempdir().unwrap();
2014 let state_file = temp_dir.path().join("purgatory_state.json");
2015
2016 let purgatory = Purgatory::new(PathBuf::new());
2017 let keys = Keys::generate();
2018
2019 // Add PR event with actual event
2020 let tags = vec![Tag::custom(
2021 TagKind::Custom("a".into()),
2022 vec!["30617:abc123:test-repo".to_string()],
2023 )];
2024
2025 let pr_event = EventBuilder::new(Kind::from(1618), "PR content")
2026 .tags(tags)
2027 .sign_with_keys(&keys)
2028 .unwrap();
2029
2030 purgatory.add_pr(
2031 pr_event.clone(),
2032 "pr-with-event".to_string(),
2033 "commit-abc".to_string(),
2034 );
2035
2036 // Add PR placeholder
2037 purgatory.add_pr_placeholder("pr-placeholder".to_string(), "commit-def".to_string());
2038
2039 // Save to disk
2040 purgatory.save_to_disk(&state_file).unwrap();
2041
2042 // Create new purgatory and restore
2043 let purgatory2 = Purgatory::new(PathBuf::new());
2044 purgatory2.restore_from_disk(&state_file).unwrap();
2045
2046 // Verify both were restored correctly
2047 let (_, pr_count) = purgatory2.count();
2048 assert_eq!(pr_count, 2);
2049
2050 // Verify PR event
2051 let pr_entry = purgatory2.find_pr("pr-with-event").unwrap();
2052 assert!(pr_entry.event.is_some());
2053 assert_eq!(pr_entry.commit, "commit-abc");
2054
2055 // Verify placeholder
2056 let placeholder_entry = purgatory2.find_pr("pr-placeholder").unwrap();
2057 assert!(placeholder_entry.event.is_none());
2058 assert_eq!(placeholder_entry.commit, "commit-def");
2059 assert_eq!(
2060 purgatory2.find_pr_placeholder("pr-placeholder"),
2061 Some("commit-def".to_string())
2062 );
2063}
2064
2065#[tokio::test]
2066async fn test_file_cleanup_after_successful_restore() {
2067 use tempfile::tempdir;
2068
2069 let temp_dir = tempdir().unwrap();
2070 let state_file = temp_dir.path().join("purgatory_state.json");
2071
2072 let purgatory = Purgatory::new(PathBuf::new());
2073 let keys = Keys::generate();
2074
2075 // Add some data
2076 let event = EventBuilder::text_note("test")
2077 .sign_with_keys(&keys)
2078 .unwrap();
2079 purgatory.add_state(event, "repo".to_string(), keys.public_key());
2080
2081 // Save to disk
2082 purgatory.save_to_disk(&state_file).unwrap();
2083 assert!(state_file.exists());
2084
2085 // Restore
2086 let purgatory2 = Purgatory::new(PathBuf::new());
2087 purgatory2.restore_from_disk(&state_file).unwrap();
2088
2089 // File should be deleted after successful restore
2090 assert!(!state_file.exists());
2091}
2092
2093#[tokio::test]
2094async fn test_comprehensive_roundtrip() {
2095 use nostr_sdk::{Kind, Tag, TagKind};
2096 use tempfile::tempdir;
2097
2098 let temp_dir = tempdir().unwrap();
2099 let state_file = temp_dir.path().join("purgatory_state.json");
2100
2101 let purgatory = Purgatory::new(PathBuf::new());
2102 let keys1 = Keys::generate();
2103 let keys2 = Keys::generate();
2104
2105 // Add multiple state events
2106 let state1 = EventBuilder::text_note("state 1")
2107 .sign_with_keys(&keys1)
2108 .unwrap();
2109 let state2 = EventBuilder::text_note("state 2")
2110 .sign_with_keys(&keys2)
2111 .unwrap();
2112
2113 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key());
2114 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key());
2115
2116 // Add PR event
2117 let tags = vec![Tag::custom(
2118 TagKind::Custom("a".into()),
2119 vec!["30617:abc123:repo1".to_string()],
2120 )];
2121 let pr_event = EventBuilder::new(Kind::from(1618), "PR")
2122 .tags(tags)
2123 .sign_with_keys(&keys1)
2124 .unwrap();
2125 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string());
2126
2127 // Add PR placeholder
2128 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
2129
2130 // Add and expire an event
2131 let expired_event = EventBuilder::text_note("expired")
2132 .sign_with_keys(&keys1)
2133 .unwrap();
2134 let expired_id = expired_event.id;
2135 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key());
2136 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2137 for entry in entries.iter_mut() {
2138 entry.expires_at = Instant::now() - Duration::from_secs(1);
2139 }
2140 }
2141 purgatory.cleanup();
2142
2143 // Verify initial state
2144 let (state_count, pr_count) = purgatory.count();
2145 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up)
2146 assert_eq!(pr_count, 2); // pr-1, pr-2
2147 assert_eq!(purgatory.expired_count(), 1); // expired_event
2148
2149 // Save to disk
2150 purgatory.save_to_disk(&state_file).unwrap();
2151
2152 // Create new purgatory and restore
2153 let purgatory2 = Purgatory::new(PathBuf::new());
2154 purgatory2.restore_from_disk(&state_file).unwrap();
2155
2156 // Verify all data was restored correctly
2157 let (state_count2, pr_count2) = purgatory2.count();
2158 assert_eq!(state_count2, 2);
2159 assert_eq!(pr_count2, 2);
2160 assert_eq!(purgatory2.expired_count(), 1);
2161
2162 // Verify state events
2163 assert_eq!(purgatory2.find_state("repo1").len(), 1);
2164 assert_eq!(purgatory2.find_state("repo2").len(), 1);
2165
2166 // Verify PR events
2167 assert!(purgatory2.find_pr("pr-1").unwrap().event.is_some());
2168 assert!(purgatory2.find_pr("pr-2").unwrap().event.is_none());
2169
2170 // Verify expired event
2171 assert!(purgatory2.is_expired(&expired_id));
2172}
diff --git a/src/purgatory/persistence.rs b/src/purgatory/persistence.rs
new file mode 100644
index 0000000..7fca2cf
--- /dev/null
+++ b/src/purgatory/persistence.rs
@@ -0,0 +1,198 @@
1//! Persistence utilities for purgatory state.
2//!
3//! This module provides conversion functions between `Instant` (which cannot be
4//! serialized) and `Duration` offsets from a reference `SystemTime`. This allows
5//! purgatory state to be persisted to disk and restored across restarts.
6//!
7//! ## Time Handling
8//!
9//! - `Instant` is monotonic but cannot be serialized
10//! - `SystemTime` can be serialized but may go backwards (NTP, user changes)
11//! - We use `SystemTime` for persistence and convert to/from `Instant` at runtime
12//! - Downtime is accounted for when restoring state (elapsed time is preserved)
13
14use std::time::{Duration, Instant, SystemTime};
15
16/// Convert an `Instant` to a `Duration` offset from a reference `SystemTime`.
17///
18/// This allows storing an `Instant` as a serializable offset that can be
19/// restored later, accounting for system downtime.
20///
21/// # Arguments
22/// * `instant` - The `Instant` to convert
23/// * `reference_time` - The reference `SystemTime` (typically SystemTime::now())
24/// * `reference_instant` - The corresponding `Instant` (typically Instant::now())
25///
26/// # Returns
27/// Duration offset from the reference time
28///
29/// # Example
30/// ```
31/// use std::time::{Duration, Instant, SystemTime};
32/// use ngit_grasp::purgatory::persistence::instant_to_offset;
33///
34/// let now_system = SystemTime::now();
35/// let now_instant = Instant::now();
36/// let future = now_instant + Duration::from_secs(60);
37///
38/// let offset = instant_to_offset(future, now_system, now_instant);
39/// assert!(offset.as_secs() >= 60);
40/// ```
41pub fn instant_to_offset(
42 instant: Instant,
43 _reference_time: SystemTime,
44 reference_instant: Instant,
45) -> Duration {
46 if instant >= reference_instant {
47 // Future instant - return positive offset
48 instant.duration_since(reference_instant)
49 } else {
50 // Past instant - this shouldn't happen in normal operation,
51 // but we handle it by returning zero duration
52 Duration::ZERO
53 }
54}
55
56/// Convert a `Duration` offset back to an `Instant`, accounting for downtime.
57///
58/// This restores an `Instant` from a serialized offset, adjusting for the time
59/// that has elapsed since the state was saved.
60///
61/// # Arguments
62/// * `offset` - The duration offset from the saved reference time
63/// * `saved_at` - The `SystemTime` when the state was saved
64/// * `reference_instant` - The current `Instant` (typically Instant::now())
65///
66/// # Returns
67/// The restored `Instant`, adjusted for downtime
68///
69/// # Example
70/// ```
71/// use std::time::{Duration, Instant, SystemTime};
72/// use ngit_grasp::purgatory::persistence::offset_to_instant;
73///
74/// let saved_at = SystemTime::now();
75/// let offset = Duration::from_secs(60);
76/// let now_instant = Instant::now();
77///
78/// let restored = offset_to_instant(offset, saved_at, now_instant);
79/// // restored will be approximately now_instant + 60 seconds
80/// ```
81pub fn offset_to_instant(
82 offset: Duration,
83 saved_at: SystemTime,
84 reference_instant: Instant,
85) -> Instant {
86 // Calculate how much time has elapsed since the state was saved
87 let now_system = SystemTime::now();
88 let elapsed_since_save = now_system
89 .duration_since(saved_at)
90 .unwrap_or(Duration::ZERO);
91
92 // The original deadline was: saved_at + offset
93 // Time remaining = (saved_at + offset) - now_system
94 // = offset - elapsed_since_save
95
96 if offset > elapsed_since_save {
97 // Deadline is still in the future
98 let remaining = offset - elapsed_since_save;
99 reference_instant + remaining
100 } else {
101 // Deadline has already passed or is right now
102 reference_instant
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use std::thread;
110 use std::time::Duration;
111
112 #[test]
113 fn test_instant_to_offset_future() {
114 let now_system = SystemTime::now();
115 let now_instant = Instant::now();
116 let future = now_instant + Duration::from_secs(60);
117
118 let offset = instant_to_offset(future, now_system, now_instant);
119
120 // Should be approximately 60 seconds (within tolerance)
121 assert!(offset.as_secs() >= 59 && offset.as_secs() <= 61);
122 }
123
124 #[test]
125 fn test_instant_to_offset_past() {
126 let now_system = SystemTime::now();
127 let past_instant = Instant::now();
128 // Simulate some time passing
129 thread::sleep(Duration::from_millis(10));
130 let now_instant = Instant::now();
131
132 let offset = instant_to_offset(past_instant, now_system, now_instant);
133
134 // Past instants return zero duration
135 assert_eq!(offset, Duration::ZERO);
136 }
137
138 #[test]
139 fn test_offset_to_instant_with_time_remaining() {
140 let saved_at = SystemTime::now();
141 let offset = Duration::from_secs(60);
142
143 // Simulate a very short downtime (< 10ms)
144 thread::sleep(Duration::from_millis(5));
145
146 let now_instant = Instant::now();
147 let restored = offset_to_instant(offset, saved_at, now_instant);
148
149 // Should be approximately 60 seconds in the future
150 let remaining = restored.duration_since(now_instant);
151 assert!(
152 remaining.as_secs() >= 59 && remaining.as_secs() <= 61,
153 "Expected ~60s, got {}s",
154 remaining.as_secs()
155 );
156 }
157
158 #[test]
159 fn test_offset_to_instant_deadline_passed() {
160 // Simulate state saved 70 seconds ago with 60 second offset
161 let saved_at = SystemTime::now() - Duration::from_secs(70);
162 let offset = Duration::from_secs(60);
163
164 let now_instant = Instant::now();
165 let restored = offset_to_instant(offset, saved_at, now_instant);
166
167 // Deadline has passed, should be now or in the past
168 let remaining = restored.saturating_duration_since(now_instant);
169 assert_eq!(remaining, Duration::ZERO);
170 }
171
172 #[test]
173 fn test_round_trip_conversion() {
174 let now_system = SystemTime::now();
175 let now_instant = Instant::now();
176 let future = now_instant + Duration::from_secs(120);
177
178 // Convert to offset
179 let offset = instant_to_offset(future, now_system, now_instant);
180
181 // Immediately convert back (minimal downtime)
182 let restored = offset_to_instant(offset, now_system, now_instant);
183
184 // Should be very close to the original future instant
185 let diff = if restored > future {
186 restored.duration_since(future)
187 } else {
188 future.duration_since(restored)
189 };
190
191 // Allow for small timing differences (< 100ms)
192 assert!(
193 diff < Duration::from_millis(100),
194 "Round trip should preserve instant within 100ms, got {}ms",
195 diff.as_millis()
196 );
197 }
198}
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index 9c47616..919504b 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -5,8 +5,14 @@
5//! problem where either the nostr event or git push can arrive first. 5//! problem where either the nostr event or git push can arrive first.
6 6
7use nostr_sdk::prelude::*; 7use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize};
8use std::time::Instant; 9use std::time::Instant;
9 10
11/// Default value for Instant fields during deserialization
12fn instant_now() -> Instant {
13 Instant::now()
14}
15
10/// A reference name and its target object. 16/// A reference name and its target object.
11/// 17///
12/// Used to identify specific git refs (branches, tags) that a state event 18/// Used to identify specific git refs (branches, tags) that a state event
@@ -59,7 +65,10 @@ impl RefUpdate {
59/// State events declare the current state of a repository but may arrive 65/// State events declare the current state of a repository but may arrive
60/// before the corresponding git data has been pushed. This entry holds 66/// before the corresponding git data has been pushed. This entry holds
61/// the event and associated metadata until the git data arrives. 67/// the event and associated metadata until the git data arrives.
62#[derive(Debug, Clone)] 68///
69/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
70/// module to convert to/from serializable wrapper types.
71#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct StatePurgatoryEntry { 72pub struct StatePurgatoryEntry {
64 /// The nostr state event (kind 30618) awaiting git data 73 /// The nostr state event (kind 30618) awaiting git data
65 pub event: Event, 74 pub event: Event,
@@ -71,9 +80,11 @@ pub struct StatePurgatoryEntry {
71 pub author: PublicKey, 80 pub author: PublicKey,
72 81
73 /// When this entry was added to purgatory 82 /// When this entry was added to purgatory
83 #[serde(skip, default = "instant_now")]
74 pub created_at: Instant, 84 pub created_at: Instant,
75 85
76 /// Expiry deadline (30 min from creation, may be extended) 86 /// Expiry deadline (30 min from creation, may be extended)
87 #[serde(skip, default = "instant_now")]
77 pub expires_at: Instant, 88 pub expires_at: Instant,
78} 89}
79 90
@@ -82,7 +93,10 @@ pub struct StatePurgatoryEntry {
82/// PR events reference specific commits but may arrive before the git push 93/// PR events reference specific commits but may arrive before the git push
83/// containing those commits. Alternatively, a git push may arrive first, 94/// containing those commits. Alternatively, a git push may arrive first,
84/// creating a placeholder entry waiting for the corresponding PR event. 95/// creating a placeholder entry waiting for the corresponding PR event.
85#[derive(Debug, Clone)] 96///
97/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
98/// module to convert to/from serializable wrapper types.
99#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PrPurgatoryEntry { 100pub struct PrPurgatoryEntry {
87 /// The nostr PR event, if received (None = git data arrived first) 101 /// The nostr PR event, if received (None = git data arrived first)
88 pub event: Option<Event>, 102 pub event: Option<Event>,
@@ -92,8 +106,10 @@ pub struct PrPurgatoryEntry {
92 pub commit: String, 106 pub commit: String,
93 107
94 /// When this entry was added to purgatory 108 /// When this entry was added to purgatory
109 #[serde(skip, default = "instant_now")]
95 pub created_at: Instant, 110 pub created_at: Instant,
96 111
97 /// Expiry deadline (30 min from creation, may be extended) 112 /// Expiry deadline (30 min from creation, may be extended)
113 #[serde(skip, default = "instant_now")]
98 pub expires_at: Instant, 114 pub expires_at: Instant,
99} 115}