diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
| commit | 4c8f1813fada9ce2bfd371095b0721bff68173e3 (patch) | |
| tree | d42869f89e4916bb8dc36fd26c9ac5f888e042ac | |
| parent | 7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff) | |
| parent | b101afa00bc28e1b55286145cb81e32a5b3decc9 (diff) | |
Add purgatory persistence to survive relay restarts
Implement save/restore functionality for both purgatory state and
rejected events cache. Events are now saved to disk on graceful
shutdown and restored on startup, preventing data loss during
relay restarts.
Key features:
- Purgatory state persisted to JSON (state events, PR events, expired events)
- Rejected events cache persisted (hot cache + cold index)
- Downtime adjustment preserves remaining TTL
- Graceful degradation on missing/corrupted files
- Automatic re-queueing of restored repositories
- Comprehensive test coverage (45 tests)
| -rw-r--r-- | src/main.rs | 50 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 925 | ||||
| -rw-r--r-- | src/purgatory/persistence.rs | 198 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 20 | ||||
| -rw-r--r-- | src/sync/mod.rs | 71 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 726 | ||||
| -rw-r--r-- | tests/purgatory_persistence.rs | 755 |
7 files changed, 2725 insertions, 20 deletions
diff --git a/src/main.rs b/src/main.rs index a6f1d9d..5e5b83a 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc}; | |||
| 3 | 3 | ||
| 4 | use anyhow::Result; | 4 | use anyhow::Result; |
| 5 | use tokio::signal; | 5 | use tokio::signal; |
| 6 | use tracing::{info, Level}; | 6 | use tracing::{error, info, warn, Level}; |
| 7 | use tracing_subscriber::FmtSubscriber; | 7 | use tracing_subscriber::FmtSubscriber; |
| 8 | 8 | ||
| 9 | use ngit_grasp::{ | 9 | use ngit_grasp::{ |
| @@ -64,6 +64,22 @@ async fn main() -> Result<()> { | |||
| 64 | ))); | 64 | ))); |
| 65 | info!("Purgatory initialized for event coordination"); | 65 | info!("Purgatory initialized for event coordination"); |
| 66 | 66 | ||
| 67 | // Restore purgatory state from disk if available | ||
| 68 | let purgatory_path = | ||
| 69 | PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json"); | ||
| 70 | |||
| 71 | if purgatory_path.exists() { | ||
| 72 | match purgatory.restore_from_disk(&purgatory_path) { | ||
| 73 | Ok(()) => { | ||
| 74 | info!("Restored purgatory state from disk"); | ||
| 75 | // Re-queueing will happen later after sync system is created | ||
| 76 | } | ||
| 77 | Err(e) => { | ||
| 78 | warn!("Failed to restore purgatory state: {}, starting empty", e); | ||
| 79 | } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 67 | // Create Nostr relay with NIP-34 validation | 83 | // Create Nostr relay with NIP-34 validation |
| 68 | // Returns both the relay and database for direct queries in handlers | 84 | // Returns both the relay and database for direct queries in handlers |
| 69 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { | 85 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { |
| @@ -88,6 +104,7 @@ async fn main() -> Result<()> { | |||
| 88 | relay_with_db.write_policy.clone(), | 104 | relay_with_db.write_policy.clone(), |
| 89 | relay_with_db.relay.clone(), | 105 | relay_with_db.relay.clone(), |
| 90 | &config, | 106 | &config, |
| 107 | PathBuf::from(config.effective_git_data_path()), | ||
| 91 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), | 108 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), |
| 92 | ); | 109 | ); |
| 93 | 110 | ||
| @@ -100,6 +117,21 @@ async fn main() -> Result<()> { | |||
| 100 | info!("Proactive sync enabled (will discover relays from stored announcements)"); | 117 | info!("Proactive sync enabled (will discover relays from stored announcements)"); |
| 101 | } | 118 | } |
| 102 | 119 | ||
| 120 | // Re-queue all restored purgatory repos for sync | ||
| 121 | let restored_identifiers = purgatory.get_all_identifiers(); | ||
| 122 | if !restored_identifiers.is_empty() { | ||
| 123 | info!( | ||
| 124 | "Re-queueing {} restored repositories for sync", | ||
| 125 | restored_identifiers.len() | ||
| 126 | ); | ||
| 127 | for identifier in restored_identifiers { | ||
| 128 | purgatory.enqueue_sync_immediate(&identifier); | ||
| 129 | } | ||
| 130 | } | ||
| 131 | |||
| 132 | // Get a reference to the rejected events index for shutdown persistence | ||
| 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); | ||
| 134 | |||
| 103 | tokio::spawn(async move { | 135 | tokio::spawn(async move { |
| 104 | sync_manager.run().await; | 136 | sync_manager.run().await; |
| 105 | }); | 137 | }); |
| @@ -190,6 +222,22 @@ async fn main() -> Result<()> { | |||
| 190 | } | 222 | } |
| 191 | } | 223 | } |
| 192 | 224 | ||
| 225 | // Save purgatory state to disk | ||
| 226 | let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json"); | ||
| 227 | if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) { | ||
| 228 | error!("Failed to save purgatory state: {}", e); | ||
| 229 | } else { | ||
| 230 | info!("Purgatory state saved to disk"); | ||
| 231 | } | ||
| 232 | |||
| 233 | // Save rejected events cache to disk | ||
| 234 | let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json"); | ||
| 235 | if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) { | ||
| 236 | error!("Failed to save rejected events cache: {}", e); | ||
| 237 | } else { | ||
| 238 | info!("Rejected events cache saved to disk"); | ||
| 239 | } | ||
| 240 | |||
| 193 | // Cleanup placeholder refs on shutdown | 241 | // Cleanup placeholder refs on shutdown |
| 194 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); | 242 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); |
| 195 | if !placeholder_ids.is_empty() { | 243 | if !placeholder_ids.is_empty() { |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 20df19b..47798a6 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -12,6 +12,7 @@ | |||
| 12 | //! - **Separate stores**: State events and PR events use different indexing strategies | 12 | //! - **Separate stores**: State events and PR events use different indexing strategies |
| 13 | 13 | ||
| 14 | mod helpers; | 14 | mod helpers; |
| 15 | pub mod persistence; | ||
| 15 | pub mod sync; | 16 | pub mod sync; |
| 16 | mod types; | 17 | mod types; |
| 17 | 18 | ||
| @@ -20,10 +21,12 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | |||
| 20 | 21 | ||
| 21 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 22 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| 24 | use serde::{Deserialize, Serialize}; | ||
| 25 | use std::collections::HashMap; | ||
| 23 | use std::collections::HashSet; | 26 | use std::collections::HashSet; |
| 24 | use std::path::PathBuf; | 27 | use std::path::{Path, PathBuf}; |
| 25 | use std::sync::Arc; | 28 | use std::sync::Arc; |
| 26 | use std::time::{Duration, Instant}; | 29 | use std::time::{Duration, Instant, SystemTime}; |
| 27 | 30 | ||
| 28 | pub use sync::SyncQueueEntry; | 31 | pub use sync::SyncQueueEntry; |
| 29 | 32 | ||
| @@ -38,6 +41,63 @@ const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); | |||
| 38 | /// Used for batching burst arrivals during negentropy sync. | 41 | /// Used for batching burst arrivals during negentropy sync. |
| 39 | const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); | 42 | const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); |
| 40 | 43 | ||
| 44 | /// Serializable wrapper for `StatePurgatoryEntry` with time offsets. | ||
| 45 | /// | ||
| 46 | /// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp | ||
| 47 | /// in `PurgatoryState`, allowing state to be persisted and restored across restarts. | ||
| 48 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 49 | struct SerializableStatePurgatoryEntry { | ||
| 50 | /// The nostr state event (kind 30618) awaiting git data | ||
| 51 | event: Event, | ||
| 52 | /// The repository identifier from the event's 'd' tag | ||
| 53 | identifier: String, | ||
| 54 | /// Event author pubkey | ||
| 55 | author: PublicKey, | ||
| 56 | /// Duration offset from saved_at for created_at | ||
| 57 | created_at_offset_secs: u64, | ||
| 58 | /// Duration offset from saved_at for expires_at | ||
| 59 | expires_at_offset_secs: u64, | ||
| 60 | } | ||
| 61 | |||
| 62 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | ||
| 63 | /// | ||
| 64 | /// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp | ||
| 65 | /// in `PurgatoryState`, allowing state to be persisted and restored across restarts. | ||
| 66 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 67 | struct SerializablePrPurgatoryEntry { | ||
| 68 | /// The nostr PR event, if received (None = git data arrived first) | ||
| 69 | event: Option<Event>, | ||
| 70 | /// The expected commit SHA from 'c' tag (if event exists) | ||
| 71 | /// or the actual commit pushed (if git arrived first) | ||
| 72 | commit: String, | ||
| 73 | /// Duration offset from saved_at for created_at | ||
| 74 | created_at_offset_secs: u64, | ||
| 75 | /// Duration offset from saved_at for expires_at | ||
| 76 | expires_at_offset_secs: u64, | ||
| 77 | } | ||
| 78 | |||
| 79 | /// Serializable purgatory state for disk persistence. | ||
| 80 | /// | ||
| 81 | /// Contains all purgatory data needed to restore state across restarts: | ||
| 82 | /// - State events (indexed by identifier) | ||
| 83 | /// - PR events (indexed by event ID) | ||
| 84 | /// - Expired events (to prevent re-sync loops) | ||
| 85 | /// - Version number for future compatibility | ||
| 86 | /// - Saved timestamp for downtime calculation | ||
| 87 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 88 | struct PurgatoryState { | ||
| 89 | /// Version number for state format (currently 1) | ||
| 90 | version: u32, | ||
| 91 | /// When this state was saved to disk | ||
| 92 | saved_at: SystemTime, | ||
| 93 | /// State events indexed by repository identifier | ||
| 94 | state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>, | ||
| 95 | /// PR events indexed by event ID (hex string) | ||
| 96 | pr_events: HashMap<String, SerializablePrPurgatoryEntry>, | ||
| 97 | /// Expired event IDs with their expiry timestamps | ||
| 98 | expired_events: HashMap<String, SystemTime>, | ||
| 99 | } | ||
| 100 | |||
| 41 | /// Main purgatory structure holding events awaiting git data. | 101 | /// Main purgatory structure holding events awaiting git data. |
| 42 | /// | 102 | /// |
| 43 | /// Provides thread-safe concurrent access to two separate stores: | 103 | /// Provides thread-safe concurrent access to two separate stores: |
| @@ -667,6 +727,260 @@ impl Purgatory { | |||
| 667 | pub fn sync_queue_size(&self) -> usize { | 727 | pub fn sync_queue_size(&self) -> usize { |
| 668 | self.sync_queue.len() | 728 | self.sync_queue.len() |
| 669 | } | 729 | } |
| 730 | |||
| 731 | /// Get all repository identifiers currently in purgatory. | ||
| 732 | /// | ||
| 733 | /// Returns a list of all unique repository identifiers that have state events | ||
| 734 | /// in purgatory. This is useful for re-queueing repositories after restore. | ||
| 735 | /// | ||
| 736 | /// # Returns | ||
| 737 | /// Vector of repository identifiers (e.g., "owner/repo") | ||
| 738 | /// | ||
| 739 | /// # Example | ||
| 740 | /// ```no_run | ||
| 741 | /// use ngit_grasp::purgatory::Purgatory; | ||
| 742 | /// use std::path::PathBuf; | ||
| 743 | /// | ||
| 744 | /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); | ||
| 745 | /// let identifiers = purgatory.get_all_identifiers(); | ||
| 746 | /// for id in identifiers { | ||
| 747 | /// println!("Repository in purgatory: {}", id); | ||
| 748 | /// } | ||
| 749 | /// ``` | ||
| 750 | pub fn get_all_identifiers(&self) -> Vec<String> { | ||
| 751 | self.state_events | ||
| 752 | .iter() | ||
| 753 | .map(|entry| entry.key().clone()) | ||
| 754 | .collect() | ||
| 755 | } | ||
| 756 | |||
| 757 | /// Save purgatory state to disk. | ||
| 758 | /// | ||
| 759 | /// Serializes the current purgatory state (state_events, pr_events, expired_events) | ||
| 760 | /// to JSON and saves it to the specified path. Time-based fields (`Instant`) are | ||
| 761 | /// converted to duration offsets from the current `SystemTime` for persistence. | ||
| 762 | /// | ||
| 763 | /// Note: The sync_queue is NOT persisted - it will be rebuilt when events are | ||
| 764 | /// restored from disk. | ||
| 765 | /// | ||
| 766 | /// # Arguments | ||
| 767 | /// * `path` - Path to save the state file | ||
| 768 | /// | ||
| 769 | /// # Returns | ||
| 770 | /// Ok(()) on success, Err on failure | ||
| 771 | /// | ||
| 772 | /// # Example | ||
| 773 | /// ```no_run | ||
| 774 | /// use ngit_grasp::purgatory::Purgatory; | ||
| 775 | /// use std::path::PathBuf; | ||
| 776 | /// | ||
| 777 | /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); | ||
| 778 | /// purgatory.save_to_disk(&PathBuf::from("/tmp/purgatory.json")).unwrap(); | ||
| 779 | /// ``` | ||
| 780 | pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 781 | let saved_at = SystemTime::now(); | ||
| 782 | let now_instant = Instant::now(); | ||
| 783 | |||
| 784 | // Convert state_events to serializable format | ||
| 785 | let mut state_events = HashMap::new(); | ||
| 786 | for entry in self.state_events.iter() { | ||
| 787 | let identifier = entry.key().clone(); | ||
| 788 | let entries: Vec<SerializableStatePurgatoryEntry> = entry | ||
| 789 | .value() | ||
| 790 | .iter() | ||
| 791 | .map(|e| { | ||
| 792 | let created_offset = | ||
| 793 | persistence::instant_to_offset(e.created_at, saved_at, now_instant); | ||
| 794 | let expires_offset = | ||
| 795 | persistence::instant_to_offset(e.expires_at, saved_at, now_instant); | ||
| 796 | |||
| 797 | SerializableStatePurgatoryEntry { | ||
| 798 | event: e.event.clone(), | ||
| 799 | identifier: e.identifier.clone(), | ||
| 800 | author: e.author, | ||
| 801 | created_at_offset_secs: created_offset.as_secs(), | ||
| 802 | expires_at_offset_secs: expires_offset.as_secs(), | ||
| 803 | } | ||
| 804 | }) | ||
| 805 | .collect(); | ||
| 806 | state_events.insert(identifier, entries); | ||
| 807 | } | ||
| 808 | |||
| 809 | // Convert pr_events to serializable format | ||
| 810 | let mut pr_events = HashMap::new(); | ||
| 811 | for entry in self.pr_events.iter() { | ||
| 812 | let event_id = entry.key().clone(); | ||
| 813 | let e = entry.value(); | ||
| 814 | |||
| 815 | let created_offset = | ||
| 816 | persistence::instant_to_offset(e.created_at, saved_at, now_instant); | ||
| 817 | let expires_offset = | ||
| 818 | persistence::instant_to_offset(e.expires_at, saved_at, now_instant); | ||
| 819 | |||
| 820 | let serializable = SerializablePrPurgatoryEntry { | ||
| 821 | event: e.event.clone(), | ||
| 822 | commit: e.commit.clone(), | ||
| 823 | created_at_offset_secs: created_offset.as_secs(), | ||
| 824 | expires_at_offset_secs: expires_offset.as_secs(), | ||
| 825 | }; | ||
| 826 | pr_events.insert(event_id, serializable); | ||
| 827 | } | ||
| 828 | |||
| 829 | // Convert expired_events to serializable format | ||
| 830 | // We use SystemTime instead of Instant offsets for expired events since | ||
| 831 | // we don't need high precision for cleanup timing | ||
| 832 | let mut expired_events = HashMap::new(); | ||
| 833 | for entry in self.expired_events.iter() { | ||
| 834 | let event_id = entry.key().to_hex(); | ||
| 835 | // Convert Instant to SystemTime (approximate) | ||
| 836 | let expired_at_instant = *entry.value(); | ||
| 837 | let elapsed_since_expire = now_instant.saturating_duration_since(expired_at_instant); | ||
| 838 | let expired_at_system = saved_at - elapsed_since_expire; | ||
| 839 | expired_events.insert(event_id, expired_at_system); | ||
| 840 | } | ||
| 841 | |||
| 842 | // Create state structure | ||
| 843 | let state = PurgatoryState { | ||
| 844 | version: 1, | ||
| 845 | saved_at, | ||
| 846 | state_events, | ||
| 847 | pr_events, | ||
| 848 | expired_events, | ||
| 849 | }; | ||
| 850 | |||
| 851 | // Serialize to JSON and write to file | ||
| 852 | let json = serde_json::to_string_pretty(&state)?; | ||
| 853 | std::fs::write(path, json)?; | ||
| 854 | |||
| 855 | tracing::info!( | ||
| 856 | path = %path.display(), | ||
| 857 | state_events = state.state_events.len(), | ||
| 858 | pr_events = state.pr_events.len(), | ||
| 859 | expired_events = state.expired_events.len(), | ||
| 860 | "Saved purgatory state to disk" | ||
| 861 | ); | ||
| 862 | |||
| 863 | Ok(()) | ||
| 864 | } | ||
| 865 | |||
| 866 | /// Restore purgatory state from disk. | ||
| 867 | /// | ||
| 868 | /// Loads a previously saved purgatory state from the specified path and populates | ||
| 869 | /// the current purgatory instance. Adjusts time-based fields to account for downtime | ||
| 870 | /// between save and restore. | ||
| 871 | /// | ||
| 872 | /// After successful restore, the state file is deleted to prevent accidental | ||
| 873 | /// double-restore. | ||
| 874 | /// | ||
| 875 | /// # Arguments | ||
| 876 | /// * `path` - Path to the saved state file | ||
| 877 | /// | ||
| 878 | /// # Returns | ||
| 879 | /// Ok(()) on success, Err if file doesn't exist or is corrupted | ||
| 880 | /// | ||
| 881 | /// # Example | ||
| 882 | /// ```no_run | ||
| 883 | /// use ngit_grasp::purgatory::Purgatory; | ||
| 884 | /// use std::path::PathBuf; | ||
| 885 | /// | ||
| 886 | /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); | ||
| 887 | /// match purgatory.restore_from_disk(&PathBuf::from("/tmp/purgatory.json")) { | ||
| 888 | /// Ok(()) => println!("State restored successfully"), | ||
| 889 | /// Err(e) => eprintln!("Failed to restore state: {}", e), | ||
| 890 | /// } | ||
| 891 | /// ``` | ||
| 892 | pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 893 | // Read and parse state file | ||
| 894 | let json = std::fs::read_to_string(path)?; | ||
| 895 | let state: PurgatoryState = serde_json::from_str(&json)?; | ||
| 896 | |||
| 897 | // Verify version | ||
| 898 | if state.version != 1 { | ||
| 899 | return Err(format!("Unsupported state version: {}", state.version).into()); | ||
| 900 | } | ||
| 901 | |||
| 902 | let now_instant = Instant::now(); | ||
| 903 | |||
| 904 | // Restore state_events | ||
| 905 | for (identifier, entries) in state.state_events { | ||
| 906 | let restored_entries: Vec<StatePurgatoryEntry> = entries | ||
| 907 | .into_iter() | ||
| 908 | .map(|e| { | ||
| 909 | let created_at = persistence::offset_to_instant( | ||
| 910 | Duration::from_secs(e.created_at_offset_secs), | ||
| 911 | state.saved_at, | ||
| 912 | now_instant, | ||
| 913 | ); | ||
| 914 | let expires_at = persistence::offset_to_instant( | ||
| 915 | Duration::from_secs(e.expires_at_offset_secs), | ||
| 916 | state.saved_at, | ||
| 917 | now_instant, | ||
| 918 | ); | ||
| 919 | |||
| 920 | StatePurgatoryEntry { | ||
| 921 | event: e.event, | ||
| 922 | identifier: e.identifier, | ||
| 923 | author: e.author, | ||
| 924 | created_at, | ||
| 925 | expires_at, | ||
| 926 | } | ||
| 927 | }) | ||
| 928 | .collect(); | ||
| 929 | |||
| 930 | self.state_events.insert(identifier, restored_entries); | ||
| 931 | } | ||
| 932 | |||
| 933 | // Restore pr_events | ||
| 934 | for (event_id, e) in state.pr_events { | ||
| 935 | let created_at = persistence::offset_to_instant( | ||
| 936 | Duration::from_secs(e.created_at_offset_secs), | ||
| 937 | state.saved_at, | ||
| 938 | now_instant, | ||
| 939 | ); | ||
| 940 | let expires_at = persistence::offset_to_instant( | ||
| 941 | Duration::from_secs(e.expires_at_offset_secs), | ||
| 942 | state.saved_at, | ||
| 943 | now_instant, | ||
| 944 | ); | ||
| 945 | |||
| 946 | let entry = PrPurgatoryEntry { | ||
| 947 | event: e.event, | ||
| 948 | commit: e.commit, | ||
| 949 | created_at, | ||
| 950 | expires_at, | ||
| 951 | }; | ||
| 952 | |||
| 953 | self.pr_events.insert(event_id, entry); | ||
| 954 | } | ||
| 955 | |||
| 956 | // Restore expired_events | ||
| 957 | for (event_id_hex, expired_at_system) in state.expired_events { | ||
| 958 | if let Ok(event_id) = EventId::from_hex(&event_id_hex) { | ||
| 959 | // Convert SystemTime back to Instant (approximate) | ||
| 960 | let elapsed_since_expire = SystemTime::now() | ||
| 961 | .duration_since(expired_at_system) | ||
| 962 | .unwrap_or(Duration::ZERO); | ||
| 963 | let expired_at_instant = now_instant - elapsed_since_expire; | ||
| 964 | |||
| 965 | self.expired_events.insert(event_id, expired_at_instant); | ||
| 966 | } | ||
| 967 | } | ||
| 968 | |||
| 969 | tracing::info!( | ||
| 970 | path = %path.display(), | ||
| 971 | state_events = self.state_events.len(), | ||
| 972 | pr_events = self.pr_events.len(), | ||
| 973 | expired_events = self.expired_events.len(), | ||
| 974 | saved_at = ?state.saved_at, | ||
| 975 | "Restored purgatory state from disk" | ||
| 976 | ); | ||
| 977 | |||
| 978 | // Delete state file after successful restore | ||
| 979 | std::fs::remove_file(path)?; | ||
| 980 | tracing::debug!(path = %path.display(), "Deleted state file after restore"); | ||
| 981 | |||
| 982 | Ok(()) | ||
| 983 | } | ||
| 670 | } | 984 | } |
| 671 | 985 | ||
| 672 | #[cfg(test)] | 986 | #[cfg(test)] |
| @@ -1249,3 +1563,610 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1249 | // - Skip the expired check for user-submitted events | 1563 | // - Skip the expired check for user-submitted events |
| 1250 | // - Allow the event to be re-added to purgatory or accepted if git data now exists | 1564 | // - Allow the event to be re-added to purgatory or accepted if git data now exists |
| 1251 | } | 1565 | } |
| 1566 | |||
| 1567 | // ============================================================================ | ||
| 1568 | // Persistence Serialization Tests | ||
| 1569 | // ============================================================================ | ||
| 1570 | |||
| 1571 | #[tokio::test] | ||
| 1572 | async fn test_save_and_restore_state_events() { | ||
| 1573 | use tempfile::tempdir; | ||
| 1574 | |||
| 1575 | let temp_dir = tempdir().unwrap(); | ||
| 1576 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1577 | |||
| 1578 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1579 | let keys = Keys::generate(); | ||
| 1580 | |||
| 1581 | // Add multiple state events for the same identifier | ||
| 1582 | let event1 = EventBuilder::text_note("state event 1") | ||
| 1583 | .sign_with_keys(&keys) | ||
| 1584 | .unwrap(); | ||
| 1585 | let event2 = EventBuilder::text_note("state event 2") | ||
| 1586 | .sign_with_keys(&keys) | ||
| 1587 | .unwrap(); | ||
| 1588 | |||
| 1589 | let event1_id = event1.id; | ||
| 1590 | let event2_id = event2.id; | ||
| 1591 | |||
| 1592 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | ||
| 1593 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | ||
| 1594 | |||
| 1595 | // Save to disk | ||
| 1596 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1597 | |||
| 1598 | // Verify file exists | ||
| 1599 | assert!(state_file.exists()); | ||
| 1600 | |||
| 1601 | // Create new purgatory and restore | ||
| 1602 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1603 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1604 | |||
| 1605 | // Verify file was deleted after restore | ||
| 1606 | assert!(!state_file.exists()); | ||
| 1607 | |||
| 1608 | // Verify state events were restored | ||
| 1609 | let (state_count, _) = purgatory2.count(); | ||
| 1610 | assert_eq!(state_count, 2); | ||
| 1611 | |||
| 1612 | let restored_entries = purgatory2.find_state("test-repo"); | ||
| 1613 | assert_eq!(restored_entries.len(), 2); | ||
| 1614 | |||
| 1615 | // Verify event IDs match | ||
| 1616 | let restored_ids: Vec<EventId> = restored_entries.iter().map(|e| e.event.id).collect(); | ||
| 1617 | assert!(restored_ids.contains(&event1_id)); | ||
| 1618 | assert!(restored_ids.contains(&event2_id)); | ||
| 1619 | |||
| 1620 | // Verify identifiers and authors match | ||
| 1621 | for entry in &restored_entries { | ||
| 1622 | assert_eq!(entry.identifier, "test-repo"); | ||
| 1623 | assert_eq!(entry.author, keys.public_key()); | ||
| 1624 | } | ||
| 1625 | } | ||
| 1626 | |||
| 1627 | #[tokio::test] | ||
| 1628 | async fn test_save_and_restore_pr_events() { | ||
| 1629 | use nostr_sdk::{Kind, Tag, TagKind}; | ||
| 1630 | use tempfile::tempdir; | ||
| 1631 | |||
| 1632 | let temp_dir = tempdir().unwrap(); | ||
| 1633 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1634 | |||
| 1635 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1636 | let keys = Keys::generate(); | ||
| 1637 | |||
| 1638 | // Add PR event with actual event | ||
| 1639 | let tags = vec![Tag::custom( | ||
| 1640 | TagKind::Custom("a".into()), | ||
| 1641 | vec!["30617:abc123:test-repo".to_string()], | ||
| 1642 | )]; | ||
| 1643 | |||
| 1644 | let pr_event = EventBuilder::new(Kind::from(1618), "PR content") | ||
| 1645 | .tags(tags) | ||
| 1646 | .sign_with_keys(&keys) | ||
| 1647 | .unwrap(); | ||
| 1648 | |||
| 1649 | let pr_event_id = pr_event.id; | ||
| 1650 | |||
| 1651 | purgatory.add_pr( | ||
| 1652 | pr_event.clone(), | ||
| 1653 | "pr-event-id".to_string(), | ||
| 1654 | "commit-abc".to_string(), | ||
| 1655 | ); | ||
| 1656 | |||
| 1657 | // Save to disk | ||
| 1658 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1659 | |||
| 1660 | // Create new purgatory and restore | ||
| 1661 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1662 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1663 | |||
| 1664 | // Verify PR event was restored | ||
| 1665 | let (_, pr_count) = purgatory2.count(); | ||
| 1666 | assert_eq!(pr_count, 1); | ||
| 1667 | |||
| 1668 | let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); | ||
| 1669 | assert!(restored_entry.event.is_some()); | ||
| 1670 | assert_eq!(restored_entry.event.unwrap().id, pr_event_id); | ||
| 1671 | assert_eq!(restored_entry.commit, "commit-abc"); | ||
| 1672 | } | ||
| 1673 | |||
| 1674 | #[tokio::test] | ||
| 1675 | async fn test_save_and_restore_pr_placeholders() { | ||
| 1676 | use tempfile::tempdir; | ||
| 1677 | |||
| 1678 | let temp_dir = tempdir().unwrap(); | ||
| 1679 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1680 | |||
| 1681 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1682 | |||
| 1683 | // Add PR placeholder (git data arrived first) | ||
| 1684 | purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-def".to_string()); | ||
| 1685 | |||
| 1686 | // Save to disk | ||
| 1687 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1688 | |||
| 1689 | // Create new purgatory and restore | ||
| 1690 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1691 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1692 | |||
| 1693 | // Verify placeholder was restored | ||
| 1694 | let (_, pr_count) = purgatory2.count(); | ||
| 1695 | assert_eq!(pr_count, 1); | ||
| 1696 | |||
| 1697 | let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); | ||
| 1698 | assert!(restored_entry.event.is_none()); // Still a placeholder | ||
| 1699 | assert_eq!(restored_entry.commit, "commit-def"); | ||
| 1700 | |||
| 1701 | // Verify it's findable as a placeholder | ||
| 1702 | assert_eq!( | ||
| 1703 | purgatory2.find_pr_placeholder("placeholder-id"), | ||
| 1704 | Some("commit-def".to_string()) | ||
| 1705 | ); | ||
| 1706 | } | ||
| 1707 | |||
| 1708 | #[tokio::test] | ||
| 1709 | async fn test_save_and_restore_expired_events() { | ||
| 1710 | use tempfile::tempdir; | ||
| 1711 | |||
| 1712 | let temp_dir = tempdir().unwrap(); | ||
| 1713 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1714 | |||
| 1715 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1716 | let keys = Keys::generate(); | ||
| 1717 | |||
| 1718 | let event = EventBuilder::text_note("test") | ||
| 1719 | .sign_with_keys(&keys) | ||
| 1720 | .unwrap(); | ||
| 1721 | let event_id = event.id; | ||
| 1722 | |||
| 1723 | // Add and expire event | ||
| 1724 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | ||
| 1725 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | ||
| 1726 | for entry in entries.iter_mut() { | ||
| 1727 | entry.expires_at = Instant::now() - Duration::from_secs(1); | ||
| 1728 | } | ||
| 1729 | } | ||
| 1730 | purgatory.cleanup(); | ||
| 1731 | |||
| 1732 | // Verify event is marked as expired | ||
| 1733 | assert!(purgatory.is_expired(&event_id)); | ||
| 1734 | assert_eq!(purgatory.expired_count(), 1); | ||
| 1735 | |||
| 1736 | // Save to disk | ||
| 1737 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1738 | |||
| 1739 | // Create new purgatory and restore | ||
| 1740 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1741 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1742 | |||
| 1743 | // Verify expired event was restored | ||
| 1744 | assert!(purgatory2.is_expired(&event_id)); | ||
| 1745 | assert_eq!(purgatory2.expired_count(), 1); | ||
| 1746 | |||
| 1747 | // Verify it's included in event_ids() | ||
| 1748 | let ids = purgatory2.event_ids(); | ||
| 1749 | assert!(ids.contains(&event_id)); | ||
| 1750 | } | ||
| 1751 | |||
| 1752 | #[tokio::test] | ||
| 1753 | async fn test_save_and_restore_empty_purgatory() { | ||
| 1754 | use tempfile::tempdir; | ||
| 1755 | |||
| 1756 | let temp_dir = tempdir().unwrap(); | ||
| 1757 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1758 | |||
| 1759 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1760 | |||
| 1761 | // Save empty purgatory | ||
| 1762 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1763 | |||
| 1764 | // Verify file exists | ||
| 1765 | assert!(state_file.exists()); | ||
| 1766 | |||
| 1767 | // Create new purgatory and restore | ||
| 1768 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1769 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1770 | |||
| 1771 | // Verify purgatory is still empty | ||
| 1772 | let (state_count, pr_count) = purgatory2.count(); | ||
| 1773 | assert_eq!(state_count, 0); | ||
| 1774 | assert_eq!(pr_count, 0); | ||
| 1775 | assert_eq!(purgatory2.expired_count(), 0); | ||
| 1776 | } | ||
| 1777 | |||
| 1778 | #[tokio::test] | ||
| 1779 | async fn test_restore_missing_file() { | ||
| 1780 | use tempfile::tempdir; | ||
| 1781 | |||
| 1782 | let temp_dir = tempdir().unwrap(); | ||
| 1783 | let state_file = temp_dir.path().join("nonexistent.json"); | ||
| 1784 | |||
| 1785 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1786 | |||
| 1787 | // Attempting to restore from missing file should error | ||
| 1788 | let result = purgatory.restore_from_disk(&state_file); | ||
| 1789 | assert!(result.is_err()); | ||
| 1790 | |||
| 1791 | // Purgatory should remain empty | ||
| 1792 | let (state_count, pr_count) = purgatory.count(); | ||
| 1793 | assert_eq!(state_count, 0); | ||
| 1794 | assert_eq!(pr_count, 0); | ||
| 1795 | } | ||
| 1796 | |||
| 1797 | #[tokio::test] | ||
| 1798 | async fn test_restore_corrupted_json() { | ||
| 1799 | use tempfile::tempdir; | ||
| 1800 | |||
| 1801 | let temp_dir = tempdir().unwrap(); | ||
| 1802 | let state_file = temp_dir.path().join("corrupted.json"); | ||
| 1803 | |||
| 1804 | // Write invalid JSON | ||
| 1805 | std::fs::write(&state_file, "{ this is not valid json }").unwrap(); | ||
| 1806 | |||
| 1807 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1808 | |||
| 1809 | // Attempting to restore corrupted file should error | ||
| 1810 | let result = purgatory.restore_from_disk(&state_file); | ||
| 1811 | assert!(result.is_err()); | ||
| 1812 | |||
| 1813 | // Purgatory should remain empty | ||
| 1814 | let (state_count, pr_count) = purgatory.count(); | ||
| 1815 | assert_eq!(state_count, 0); | ||
| 1816 | assert_eq!(pr_count, 0); | ||
| 1817 | } | ||
| 1818 | |||
| 1819 | #[tokio::test] | ||
| 1820 | async fn test_restore_unsupported_version() { | ||
| 1821 | use tempfile::tempdir; | ||
| 1822 | |||
| 1823 | let temp_dir = tempdir().unwrap(); | ||
| 1824 | let state_file = temp_dir.path().join("wrong_version.json"); | ||
| 1825 | |||
| 1826 | // Write state with unsupported version | ||
| 1827 | let state = r#"{ | ||
| 1828 | "version": 999, | ||
| 1829 | "saved_at": {"secs_since_epoch": 1000000000, "nanos_since_epoch": 0}, | ||
| 1830 | "state_events": {}, | ||
| 1831 | "pr_events": {}, | ||
| 1832 | "expired_events": {} | ||
| 1833 | }"#; | ||
| 1834 | std::fs::write(&state_file, state).unwrap(); | ||
| 1835 | |||
| 1836 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1837 | |||
| 1838 | // Attempting to restore unsupported version should error | ||
| 1839 | let result = purgatory.restore_from_disk(&state_file); | ||
| 1840 | assert!(result.is_err()); | ||
| 1841 | assert!(result | ||
| 1842 | .unwrap_err() | ||
| 1843 | .to_string() | ||
| 1844 | .contains("Unsupported state version")); | ||
| 1845 | } | ||
| 1846 | |||
| 1847 | #[tokio::test] | ||
| 1848 | async fn test_downtime_calculation() { | ||
| 1849 | use tempfile::tempdir; | ||
| 1850 | use tokio::time::sleep; | ||
| 1851 | |||
| 1852 | let temp_dir = tempdir().unwrap(); | ||
| 1853 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1854 | |||
| 1855 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1856 | let keys = Keys::generate(); | ||
| 1857 | |||
| 1858 | // Add state event | ||
| 1859 | let event = EventBuilder::text_note("test") | ||
| 1860 | .sign_with_keys(&keys) | ||
| 1861 | .unwrap(); | ||
| 1862 | |||
| 1863 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | ||
| 1864 | |||
| 1865 | // Get original expiry time | ||
| 1866 | let original_entries = purgatory.find_state("repo"); | ||
| 1867 | let original_entry = &original_entries[0]; | ||
| 1868 | let original_expires_at = original_entry.expires_at; | ||
| 1869 | let original_remaining = original_expires_at.saturating_duration_since(Instant::now()); | ||
| 1870 | |||
| 1871 | // Save to disk | ||
| 1872 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1873 | |||
| 1874 | // Simulate downtime (100ms) | ||
| 1875 | sleep(Duration::from_millis(100)).await; | ||
| 1876 | |||
| 1877 | // Create new purgatory and restore | ||
| 1878 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1879 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1880 | |||
| 1881 | // Get restored expiry time | ||
| 1882 | let restored_entries = purgatory2.find_state("repo"); | ||
| 1883 | let restored_entry = &restored_entries[0]; | ||
| 1884 | let restored_expires_at = restored_entry.expires_at; | ||
| 1885 | let restored_remaining = restored_expires_at.saturating_duration_since(Instant::now()); | ||
| 1886 | |||
| 1887 | // Remaining time should be approximately the same (accounting for downtime) | ||
| 1888 | // Allow 2000ms tolerance for test execution time and sleep duration | ||
| 1889 | let diff = if restored_remaining > original_remaining { | ||
| 1890 | restored_remaining.as_millis() - original_remaining.as_millis() | ||
| 1891 | } else { | ||
| 1892 | original_remaining.as_millis() - restored_remaining.as_millis() | ||
| 1893 | }; | ||
| 1894 | |||
| 1895 | assert!( | ||
| 1896 | diff < 2000, | ||
| 1897 | "Downtime calculation should preserve remaining TTL. Original: {}ms, Restored: {}ms, Diff: {}ms", | ||
| 1898 | original_remaining.as_millis(), | ||
| 1899 | restored_remaining.as_millis(), | ||
| 1900 | diff | ||
| 1901 | ); | ||
| 1902 | } | ||
| 1903 | |||
| 1904 | #[tokio::test] | ||
| 1905 | async fn test_expiry_times_preserved() { | ||
| 1906 | use tempfile::tempdir; | ||
| 1907 | |||
| 1908 | let temp_dir = tempdir().unwrap(); | ||
| 1909 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1910 | |||
| 1911 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1912 | let keys = Keys::generate(); | ||
| 1913 | |||
| 1914 | // Add state event | ||
| 1915 | let event = EventBuilder::text_note("test") | ||
| 1916 | .sign_with_keys(&keys) | ||
| 1917 | .unwrap(); | ||
| 1918 | |||
| 1919 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | ||
| 1920 | |||
| 1921 | // Manually set expiry to a specific time in the future | ||
| 1922 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | ||
| 1923 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | ||
| 1924 | for entry in entries.iter_mut() { | ||
| 1925 | entry.expires_at = custom_expiry; | ||
| 1926 | } | ||
| 1927 | } | ||
| 1928 | |||
| 1929 | // Save to disk | ||
| 1930 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1931 | |||
| 1932 | // Create new purgatory and restore | ||
| 1933 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1934 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1935 | |||
| 1936 | // Get restored expiry time | ||
| 1937 | let restored_entries = purgatory2.find_state("repo"); | ||
| 1938 | let restored_entry = &restored_entries[0]; | ||
| 1939 | let restored_remaining = restored_entry | ||
| 1940 | .expires_at | ||
| 1941 | .saturating_duration_since(Instant::now()); | ||
| 1942 | |||
| 1943 | // Should be approximately 600 seconds (allow 3 second tolerance for test execution) | ||
| 1944 | assert!( | ||
| 1945 | restored_remaining.as_secs() >= 597 && restored_remaining.as_secs() <= 603, | ||
| 1946 | "Expected ~600s remaining, got {}s", | ||
| 1947 | restored_remaining.as_secs() | ||
| 1948 | ); | ||
| 1949 | } | ||
| 1950 | |||
| 1951 | #[tokio::test] | ||
| 1952 | async fn test_multiple_state_events_same_identifier() { | ||
| 1953 | use tempfile::tempdir; | ||
| 1954 | |||
| 1955 | let temp_dir = tempdir().unwrap(); | ||
| 1956 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1957 | |||
| 1958 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1959 | let keys1 = Keys::generate(); | ||
| 1960 | let keys2 = Keys::generate(); | ||
| 1961 | let keys3 = Keys::generate(); | ||
| 1962 | |||
| 1963 | // Add multiple state events for the same identifier from different authors | ||
| 1964 | let event1 = EventBuilder::text_note("maintainer 1") | ||
| 1965 | .sign_with_keys(&keys1) | ||
| 1966 | .unwrap(); | ||
| 1967 | let event2 = EventBuilder::text_note("maintainer 2") | ||
| 1968 | .sign_with_keys(&keys2) | ||
| 1969 | .unwrap(); | ||
| 1970 | let event3 = EventBuilder::text_note("maintainer 3") | ||
| 1971 | .sign_with_keys(&keys3) | ||
| 1972 | .unwrap(); | ||
| 1973 | |||
| 1974 | purgatory.add_state( | ||
| 1975 | event1.clone(), | ||
| 1976 | "shared-repo".to_string(), | ||
| 1977 | keys1.public_key(), | ||
| 1978 | ); | ||
| 1979 | purgatory.add_state( | ||
| 1980 | event2.clone(), | ||
| 1981 | "shared-repo".to_string(), | ||
| 1982 | keys2.public_key(), | ||
| 1983 | ); | ||
| 1984 | purgatory.add_state( | ||
| 1985 | event3.clone(), | ||
| 1986 | "shared-repo".to_string(), | ||
| 1987 | keys3.public_key(), | ||
| 1988 | ); | ||
| 1989 | |||
| 1990 | // Save to disk | ||
| 1991 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1992 | |||
| 1993 | // Create new purgatory and restore | ||
| 1994 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1995 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1996 | |||
| 1997 | // Verify all three events were restored | ||
| 1998 | let restored_entries = purgatory2.find_state("shared-repo"); | ||
| 1999 | assert_eq!(restored_entries.len(), 3); | ||
| 2000 | |||
| 2001 | // Verify all authors are present | ||
| 2002 | let authors: Vec<PublicKey> = restored_entries.iter().map(|e| e.author).collect(); | ||
| 2003 | assert!(authors.contains(&keys1.public_key())); | ||
| 2004 | assert!(authors.contains(&keys2.public_key())); | ||
| 2005 | assert!(authors.contains(&keys3.public_key())); | ||
| 2006 | } | ||
| 2007 | |||
| 2008 | #[tokio::test] | ||
| 2009 | async fn test_mixed_pr_events_and_placeholders() { | ||
| 2010 | use nostr_sdk::{Kind, Tag, TagKind}; | ||
| 2011 | use tempfile::tempdir; | ||
| 2012 | |||
| 2013 | let temp_dir = tempdir().unwrap(); | ||
| 2014 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2015 | |||
| 2016 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2017 | let keys = Keys::generate(); | ||
| 2018 | |||
| 2019 | // Add PR event with actual event | ||
| 2020 | let tags = vec![Tag::custom( | ||
| 2021 | TagKind::Custom("a".into()), | ||
| 2022 | vec!["30617:abc123:test-repo".to_string()], | ||
| 2023 | )]; | ||
| 2024 | |||
| 2025 | let pr_event = EventBuilder::new(Kind::from(1618), "PR content") | ||
| 2026 | .tags(tags) | ||
| 2027 | .sign_with_keys(&keys) | ||
| 2028 | .unwrap(); | ||
| 2029 | |||
| 2030 | purgatory.add_pr( | ||
| 2031 | pr_event.clone(), | ||
| 2032 | "pr-with-event".to_string(), | ||
| 2033 | "commit-abc".to_string(), | ||
| 2034 | ); | ||
| 2035 | |||
| 2036 | // Add PR placeholder | ||
| 2037 | purgatory.add_pr_placeholder("pr-placeholder".to_string(), "commit-def".to_string()); | ||
| 2038 | |||
| 2039 | // Save to disk | ||
| 2040 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2041 | |||
| 2042 | // Create new purgatory and restore | ||
| 2043 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2044 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2045 | |||
| 2046 | // Verify both were restored correctly | ||
| 2047 | let (_, pr_count) = purgatory2.count(); | ||
| 2048 | assert_eq!(pr_count, 2); | ||
| 2049 | |||
| 2050 | // Verify PR event | ||
| 2051 | let pr_entry = purgatory2.find_pr("pr-with-event").unwrap(); | ||
| 2052 | assert!(pr_entry.event.is_some()); | ||
| 2053 | assert_eq!(pr_entry.commit, "commit-abc"); | ||
| 2054 | |||
| 2055 | // Verify placeholder | ||
| 2056 | let placeholder_entry = purgatory2.find_pr("pr-placeholder").unwrap(); | ||
| 2057 | assert!(placeholder_entry.event.is_none()); | ||
| 2058 | assert_eq!(placeholder_entry.commit, "commit-def"); | ||
| 2059 | assert_eq!( | ||
| 2060 | purgatory2.find_pr_placeholder("pr-placeholder"), | ||
| 2061 | Some("commit-def".to_string()) | ||
| 2062 | ); | ||
| 2063 | } | ||
| 2064 | |||
| 2065 | #[tokio::test] | ||
| 2066 | async fn test_file_cleanup_after_successful_restore() { | ||
| 2067 | use tempfile::tempdir; | ||
| 2068 | |||
| 2069 | let temp_dir = tempdir().unwrap(); | ||
| 2070 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2071 | |||
| 2072 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2073 | let keys = Keys::generate(); | ||
| 2074 | |||
| 2075 | // Add some data | ||
| 2076 | let event = EventBuilder::text_note("test") | ||
| 2077 | .sign_with_keys(&keys) | ||
| 2078 | .unwrap(); | ||
| 2079 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | ||
| 2080 | |||
| 2081 | // Save to disk | ||
| 2082 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2083 | assert!(state_file.exists()); | ||
| 2084 | |||
| 2085 | // Restore | ||
| 2086 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2087 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2088 | |||
| 2089 | // File should be deleted after successful restore | ||
| 2090 | assert!(!state_file.exists()); | ||
| 2091 | } | ||
| 2092 | |||
| 2093 | #[tokio::test] | ||
| 2094 | async fn test_comprehensive_roundtrip() { | ||
| 2095 | use nostr_sdk::{Kind, Tag, TagKind}; | ||
| 2096 | use tempfile::tempdir; | ||
| 2097 | |||
| 2098 | let temp_dir = tempdir().unwrap(); | ||
| 2099 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2100 | |||
| 2101 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2102 | let keys1 = Keys::generate(); | ||
| 2103 | let keys2 = Keys::generate(); | ||
| 2104 | |||
| 2105 | // Add multiple state events | ||
| 2106 | let state1 = EventBuilder::text_note("state 1") | ||
| 2107 | .sign_with_keys(&keys1) | ||
| 2108 | .unwrap(); | ||
| 2109 | let state2 = EventBuilder::text_note("state 2") | ||
| 2110 | .sign_with_keys(&keys2) | ||
| 2111 | .unwrap(); | ||
| 2112 | |||
| 2113 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | ||
| 2114 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | ||
| 2115 | |||
| 2116 | // Add PR event | ||
| 2117 | let tags = vec![Tag::custom( | ||
| 2118 | TagKind::Custom("a".into()), | ||
| 2119 | vec!["30617:abc123:repo1".to_string()], | ||
| 2120 | )]; | ||
| 2121 | let pr_event = EventBuilder::new(Kind::from(1618), "PR") | ||
| 2122 | .tags(tags) | ||
| 2123 | .sign_with_keys(&keys1) | ||
| 2124 | .unwrap(); | ||
| 2125 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | ||
| 2126 | |||
| 2127 | // Add PR placeholder | ||
| 2128 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | ||
| 2129 | |||
| 2130 | // Add and expire an event | ||
| 2131 | let expired_event = EventBuilder::text_note("expired") | ||
| 2132 | .sign_with_keys(&keys1) | ||
| 2133 | .unwrap(); | ||
| 2134 | let expired_id = expired_event.id; | ||
| 2135 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | ||
| 2136 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | ||
| 2137 | for entry in entries.iter_mut() { | ||
| 2138 | entry.expires_at = Instant::now() - Duration::from_secs(1); | ||
| 2139 | } | ||
| 2140 | } | ||
| 2141 | purgatory.cleanup(); | ||
| 2142 | |||
| 2143 | // Verify initial state | ||
| 2144 | let (state_count, pr_count) = purgatory.count(); | ||
| 2145 | assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) | ||
| 2146 | assert_eq!(pr_count, 2); // pr-1, pr-2 | ||
| 2147 | assert_eq!(purgatory.expired_count(), 1); // expired_event | ||
| 2148 | |||
| 2149 | // Save to disk | ||
| 2150 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2151 | |||
| 2152 | // Create new purgatory and restore | ||
| 2153 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2154 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2155 | |||
| 2156 | // Verify all data was restored correctly | ||
| 2157 | let (state_count2, pr_count2) = purgatory2.count(); | ||
| 2158 | assert_eq!(state_count2, 2); | ||
| 2159 | assert_eq!(pr_count2, 2); | ||
| 2160 | assert_eq!(purgatory2.expired_count(), 1); | ||
| 2161 | |||
| 2162 | // Verify state events | ||
| 2163 | assert_eq!(purgatory2.find_state("repo1").len(), 1); | ||
| 2164 | assert_eq!(purgatory2.find_state("repo2").len(), 1); | ||
| 2165 | |||
| 2166 | // Verify PR events | ||
| 2167 | assert!(purgatory2.find_pr("pr-1").unwrap().event.is_some()); | ||
| 2168 | assert!(purgatory2.find_pr("pr-2").unwrap().event.is_none()); | ||
| 2169 | |||
| 2170 | // Verify expired event | ||
| 2171 | assert!(purgatory2.is_expired(&expired_id)); | ||
| 2172 | } | ||
diff --git a/src/purgatory/persistence.rs b/src/purgatory/persistence.rs new file mode 100644 index 0000000..7fca2cf --- /dev/null +++ b/src/purgatory/persistence.rs | |||
| @@ -0,0 +1,198 @@ | |||
| 1 | //! Persistence utilities for purgatory state. | ||
| 2 | //! | ||
| 3 | //! This module provides conversion functions between `Instant` (which cannot be | ||
| 4 | //! serialized) and `Duration` offsets from a reference `SystemTime`. This allows | ||
| 5 | //! purgatory state to be persisted to disk and restored across restarts. | ||
| 6 | //! | ||
| 7 | //! ## Time Handling | ||
| 8 | //! | ||
| 9 | //! - `Instant` is monotonic but cannot be serialized | ||
| 10 | //! - `SystemTime` can be serialized but may go backwards (NTP, user changes) | ||
| 11 | //! - We use `SystemTime` for persistence and convert to/from `Instant` at runtime | ||
| 12 | //! - Downtime is accounted for when restoring state (elapsed time is preserved) | ||
| 13 | |||
| 14 | use std::time::{Duration, Instant, SystemTime}; | ||
| 15 | |||
| 16 | /// Convert an `Instant` to a `Duration` offset from a reference `SystemTime`. | ||
| 17 | /// | ||
| 18 | /// This allows storing an `Instant` as a serializable offset that can be | ||
| 19 | /// restored later, accounting for system downtime. | ||
| 20 | /// | ||
| 21 | /// # Arguments | ||
| 22 | /// * `instant` - The `Instant` to convert | ||
| 23 | /// * `reference_time` - The reference `SystemTime` (typically SystemTime::now()) | ||
| 24 | /// * `reference_instant` - The corresponding `Instant` (typically Instant::now()) | ||
| 25 | /// | ||
| 26 | /// # Returns | ||
| 27 | /// Duration offset from the reference time | ||
| 28 | /// | ||
| 29 | /// # Example | ||
| 30 | /// ``` | ||
| 31 | /// use std::time::{Duration, Instant, SystemTime}; | ||
| 32 | /// use ngit_grasp::purgatory::persistence::instant_to_offset; | ||
| 33 | /// | ||
| 34 | /// let now_system = SystemTime::now(); | ||
| 35 | /// let now_instant = Instant::now(); | ||
| 36 | /// let future = now_instant + Duration::from_secs(60); | ||
| 37 | /// | ||
| 38 | /// let offset = instant_to_offset(future, now_system, now_instant); | ||
| 39 | /// assert!(offset.as_secs() >= 60); | ||
| 40 | /// ``` | ||
| 41 | pub fn instant_to_offset( | ||
| 42 | instant: Instant, | ||
| 43 | _reference_time: SystemTime, | ||
| 44 | reference_instant: Instant, | ||
| 45 | ) -> Duration { | ||
| 46 | if instant >= reference_instant { | ||
| 47 | // Future instant - return positive offset | ||
| 48 | instant.duration_since(reference_instant) | ||
| 49 | } else { | ||
| 50 | // Past instant - this shouldn't happen in normal operation, | ||
| 51 | // but we handle it by returning zero duration | ||
| 52 | Duration::ZERO | ||
| 53 | } | ||
| 54 | } | ||
| 55 | |||
| 56 | /// Convert a `Duration` offset back to an `Instant`, accounting for downtime. | ||
| 57 | /// | ||
| 58 | /// This restores an `Instant` from a serialized offset, adjusting for the time | ||
| 59 | /// that has elapsed since the state was saved. | ||
| 60 | /// | ||
| 61 | /// # Arguments | ||
| 62 | /// * `offset` - The duration offset from the saved reference time | ||
| 63 | /// * `saved_at` - The `SystemTime` when the state was saved | ||
| 64 | /// * `reference_instant` - The current `Instant` (typically Instant::now()) | ||
| 65 | /// | ||
| 66 | /// # Returns | ||
| 67 | /// The restored `Instant`, adjusted for downtime | ||
| 68 | /// | ||
| 69 | /// # Example | ||
| 70 | /// ``` | ||
| 71 | /// use std::time::{Duration, Instant, SystemTime}; | ||
| 72 | /// use ngit_grasp::purgatory::persistence::offset_to_instant; | ||
| 73 | /// | ||
| 74 | /// let saved_at = SystemTime::now(); | ||
| 75 | /// let offset = Duration::from_secs(60); | ||
| 76 | /// let now_instant = Instant::now(); | ||
| 77 | /// | ||
| 78 | /// let restored = offset_to_instant(offset, saved_at, now_instant); | ||
| 79 | /// // restored will be approximately now_instant + 60 seconds | ||
| 80 | /// ``` | ||
| 81 | pub fn offset_to_instant( | ||
| 82 | offset: Duration, | ||
| 83 | saved_at: SystemTime, | ||
| 84 | reference_instant: Instant, | ||
| 85 | ) -> Instant { | ||
| 86 | // Calculate how much time has elapsed since the state was saved | ||
| 87 | let now_system = SystemTime::now(); | ||
| 88 | let elapsed_since_save = now_system | ||
| 89 | .duration_since(saved_at) | ||
| 90 | .unwrap_or(Duration::ZERO); | ||
| 91 | |||
| 92 | // The original deadline was: saved_at + offset | ||
| 93 | // Time remaining = (saved_at + offset) - now_system | ||
| 94 | // = offset - elapsed_since_save | ||
| 95 | |||
| 96 | if offset > elapsed_since_save { | ||
| 97 | // Deadline is still in the future | ||
| 98 | let remaining = offset - elapsed_since_save; | ||
| 99 | reference_instant + remaining | ||
| 100 | } else { | ||
| 101 | // Deadline has already passed or is right now | ||
| 102 | reference_instant | ||
| 103 | } | ||
| 104 | } | ||
| 105 | |||
| 106 | #[cfg(test)] | ||
| 107 | mod tests { | ||
| 108 | use super::*; | ||
| 109 | use std::thread; | ||
| 110 | use std::time::Duration; | ||
| 111 | |||
| 112 | #[test] | ||
| 113 | fn test_instant_to_offset_future() { | ||
| 114 | let now_system = SystemTime::now(); | ||
| 115 | let now_instant = Instant::now(); | ||
| 116 | let future = now_instant + Duration::from_secs(60); | ||
| 117 | |||
| 118 | let offset = instant_to_offset(future, now_system, now_instant); | ||
| 119 | |||
| 120 | // Should be approximately 60 seconds (within tolerance) | ||
| 121 | assert!(offset.as_secs() >= 59 && offset.as_secs() <= 61); | ||
| 122 | } | ||
| 123 | |||
| 124 | #[test] | ||
| 125 | fn test_instant_to_offset_past() { | ||
| 126 | let now_system = SystemTime::now(); | ||
| 127 | let past_instant = Instant::now(); | ||
| 128 | // Simulate some time passing | ||
| 129 | thread::sleep(Duration::from_millis(10)); | ||
| 130 | let now_instant = Instant::now(); | ||
| 131 | |||
| 132 | let offset = instant_to_offset(past_instant, now_system, now_instant); | ||
| 133 | |||
| 134 | // Past instants return zero duration | ||
| 135 | assert_eq!(offset, Duration::ZERO); | ||
| 136 | } | ||
| 137 | |||
| 138 | #[test] | ||
| 139 | fn test_offset_to_instant_with_time_remaining() { | ||
| 140 | let saved_at = SystemTime::now(); | ||
| 141 | let offset = Duration::from_secs(60); | ||
| 142 | |||
| 143 | // Simulate a very short downtime (< 10ms) | ||
| 144 | thread::sleep(Duration::from_millis(5)); | ||
| 145 | |||
| 146 | let now_instant = Instant::now(); | ||
| 147 | let restored = offset_to_instant(offset, saved_at, now_instant); | ||
| 148 | |||
| 149 | // Should be approximately 60 seconds in the future | ||
| 150 | let remaining = restored.duration_since(now_instant); | ||
| 151 | assert!( | ||
| 152 | remaining.as_secs() >= 59 && remaining.as_secs() <= 61, | ||
| 153 | "Expected ~60s, got {}s", | ||
| 154 | remaining.as_secs() | ||
| 155 | ); | ||
| 156 | } | ||
| 157 | |||
| 158 | #[test] | ||
| 159 | fn test_offset_to_instant_deadline_passed() { | ||
| 160 | // Simulate state saved 70 seconds ago with 60 second offset | ||
| 161 | let saved_at = SystemTime::now() - Duration::from_secs(70); | ||
| 162 | let offset = Duration::from_secs(60); | ||
| 163 | |||
| 164 | let now_instant = Instant::now(); | ||
| 165 | let restored = offset_to_instant(offset, saved_at, now_instant); | ||
| 166 | |||
| 167 | // Deadline has passed, should be now or in the past | ||
| 168 | let remaining = restored.saturating_duration_since(now_instant); | ||
| 169 | assert_eq!(remaining, Duration::ZERO); | ||
| 170 | } | ||
| 171 | |||
| 172 | #[test] | ||
| 173 | fn test_round_trip_conversion() { | ||
| 174 | let now_system = SystemTime::now(); | ||
| 175 | let now_instant = Instant::now(); | ||
| 176 | let future = now_instant + Duration::from_secs(120); | ||
| 177 | |||
| 178 | // Convert to offset | ||
| 179 | let offset = instant_to_offset(future, now_system, now_instant); | ||
| 180 | |||
| 181 | // Immediately convert back (minimal downtime) | ||
| 182 | let restored = offset_to_instant(offset, now_system, now_instant); | ||
| 183 | |||
| 184 | // Should be very close to the original future instant | ||
| 185 | let diff = if restored > future { | ||
| 186 | restored.duration_since(future) | ||
| 187 | } else { | ||
| 188 | future.duration_since(restored) | ||
| 189 | }; | ||
| 190 | |||
| 191 | // Allow for small timing differences (< 100ms) | ||
| 192 | assert!( | ||
| 193 | diff < Duration::from_millis(100), | ||
| 194 | "Round trip should preserve instant within 100ms, got {}ms", | ||
| 195 | diff.as_millis() | ||
| 196 | ); | ||
| 197 | } | ||
| 198 | } | ||
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 9c47616..919504b 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -5,8 +5,14 @@ | |||
| 5 | //! problem where either the nostr event or git push can arrive first. | 5 | //! problem where either the nostr event or git push can arrive first. |
| 6 | 6 | ||
| 7 | use nostr_sdk::prelude::*; | 7 | use nostr_sdk::prelude::*; |
| 8 | use serde::{Deserialize, Serialize}; | ||
| 8 | use std::time::Instant; | 9 | use std::time::Instant; |
| 9 | 10 | ||
| 11 | /// Default value for Instant fields during deserialization | ||
| 12 | fn instant_now() -> Instant { | ||
| 13 | Instant::now() | ||
| 14 | } | ||
| 15 | |||
| 10 | /// A reference name and its target object. | 16 | /// A reference name and its target object. |
| 11 | /// | 17 | /// |
| 12 | /// Used to identify specific git refs (branches, tags) that a state event | 18 | /// Used to identify specific git refs (branches, tags) that a state event |
| @@ -59,7 +65,10 @@ impl RefUpdate { | |||
| 59 | /// State events declare the current state of a repository but may arrive | 65 | /// State events declare the current state of a repository but may arrive |
| 60 | /// before the corresponding git data has been pushed. This entry holds | 66 | /// before the corresponding git data has been pushed. This entry holds |
| 61 | /// the event and associated metadata until the git data arrives. | 67 | /// the event and associated metadata until the git data arrives. |
| 62 | #[derive(Debug, Clone)] | 68 | /// |
| 69 | /// Note: `Instant` fields cannot be serialized directly. Use the `persistence` | ||
| 70 | /// module to convert to/from serializable wrapper types. | ||
| 71 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 63 | pub struct StatePurgatoryEntry { | 72 | pub struct StatePurgatoryEntry { |
| 64 | /// The nostr state event (kind 30618) awaiting git data | 73 | /// The nostr state event (kind 30618) awaiting git data |
| 65 | pub event: Event, | 74 | pub event: Event, |
| @@ -71,9 +80,11 @@ pub struct StatePurgatoryEntry { | |||
| 71 | pub author: PublicKey, | 80 | pub author: PublicKey, |
| 72 | 81 | ||
| 73 | /// When this entry was added to purgatory | 82 | /// When this entry was added to purgatory |
| 83 | #[serde(skip, default = "instant_now")] | ||
| 74 | pub created_at: Instant, | 84 | pub created_at: Instant, |
| 75 | 85 | ||
| 76 | /// Expiry deadline (30 min from creation, may be extended) | 86 | /// Expiry deadline (30 min from creation, may be extended) |
| 87 | #[serde(skip, default = "instant_now")] | ||
| 77 | pub expires_at: Instant, | 88 | pub expires_at: Instant, |
| 78 | } | 89 | } |
| 79 | 90 | ||
| @@ -82,7 +93,10 @@ pub struct StatePurgatoryEntry { | |||
| 82 | /// PR events reference specific commits but may arrive before the git push | 93 | /// PR events reference specific commits but may arrive before the git push |
| 83 | /// containing those commits. Alternatively, a git push may arrive first, | 94 | /// containing those commits. Alternatively, a git push may arrive first, |
| 84 | /// creating a placeholder entry waiting for the corresponding PR event. | 95 | /// creating a placeholder entry waiting for the corresponding PR event. |
| 85 | #[derive(Debug, Clone)] | 96 | /// |
| 97 | /// Note: `Instant` fields cannot be serialized directly. Use the `persistence` | ||
| 98 | /// module to convert to/from serializable wrapper types. | ||
| 99 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 86 | pub struct PrPurgatoryEntry { | 100 | pub struct PrPurgatoryEntry { |
| 87 | /// The nostr PR event, if received (None = git data arrived first) | 101 | /// The nostr PR event, if received (None = git data arrived first) |
| 88 | pub event: Option<Event>, | 102 | pub event: Option<Event>, |
| @@ -92,8 +106,10 @@ pub struct PrPurgatoryEntry { | |||
| 92 | pub commit: String, | 106 | pub commit: String, |
| 93 | 107 | ||
| 94 | /// When this entry was added to purgatory | 108 | /// When this entry was added to purgatory |
| 109 | #[serde(skip, default = "instant_now")] | ||
| 95 | pub created_at: Instant, | 110 | pub created_at: Instant, |
| 96 | 111 | ||
| 97 | /// Expiry deadline (30 min from creation, may be extended) | 112 | /// Expiry deadline (30 min from creation, may be extended) |
| 113 | #[serde(skip, default = "instant_now")] | ||
| 98 | pub expires_at: Instant, | 114 | pub expires_at: Instant, |
| 99 | } | 115 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e2d55bd..3213dfb 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -43,6 +43,7 @@ pub use health::RelayHealthTracker; | |||
| 43 | use tokio::time::sleep; | 43 | use tokio::time::sleep; |
| 44 | 44 | ||
| 45 | use std::collections::{HashMap, HashSet}; | 45 | use std::collections::{HashMap, HashSet}; |
| 46 | use std::path::{Path, PathBuf}; | ||
| 46 | use std::sync::Arc; | 47 | use std::sync::Arc; |
| 47 | use std::time::Duration; | 48 | use std::time::Duration; |
| 48 | 49 | ||
| @@ -581,6 +582,7 @@ impl SyncManager { | |||
| 581 | /// * `write_policy` - Policy for validating events before storage | 582 | /// * `write_policy` - Policy for validating events before storage |
| 582 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) | 583 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) |
| 583 | /// * `config` - Configuration for sync settings | 584 | /// * `config` - Configuration for sync settings |
| 585 | /// * `data_path` - Path to git data directory (for persistence) | ||
| 584 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) | 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) |
| 585 | pub fn new( | 587 | pub fn new( |
| 586 | bootstrap_relay_url: Option<String>, | 588 | bootstrap_relay_url: Option<String>, |
| @@ -589,11 +591,42 @@ impl SyncManager { | |||
| 589 | write_policy: Nip34WritePolicy, | 591 | write_policy: Nip34WritePolicy, |
| 590 | local_relay: LocalRelay, | 592 | local_relay: LocalRelay, |
| 591 | config: &Config, | 593 | config: &Config, |
| 594 | data_path: PathBuf, | ||
| 592 | sync_metrics: Option<SyncMetrics>, | 595 | sync_metrics: Option<SyncMetrics>, |
| 593 | ) -> Self { | 596 | ) -> Self { |
| 594 | // Extract purgatory from write_policy for read-only access | 597 | // Extract purgatory from write_policy for read-only access |
| 595 | let purgatory = write_policy.purgatory().clone(); | 598 | let purgatory = write_policy.purgatory().clone(); |
| 596 | 599 | ||
| 600 | // Create rejected events index | ||
| 601 | let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 602 | RejectedEventsIndex::with_metrics( | ||
| 603 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 604 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 605 | metrics.clone(), | ||
| 606 | ) | ||
| 607 | } else { | ||
| 608 | RejectedEventsIndex::new( | ||
| 609 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 610 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 611 | ) | ||
| 612 | }); | ||
| 613 | |||
| 614 | // Attempt to restore rejected events index from disk | ||
| 615 | let rejected_index_path = data_path.join("rejected-events-cache.json"); | ||
| 616 | if rejected_index_path.exists() { | ||
| 617 | match rejected_events_index.restore_from_disk(&rejected_index_path) { | ||
| 618 | Ok(()) => { | ||
| 619 | tracing::info!("Restored rejected events index from disk"); | ||
| 620 | } | ||
| 621 | Err(e) => { | ||
| 622 | tracing::warn!( | ||
| 623 | "Failed to restore rejected events index: {}, starting empty", | ||
| 624 | e | ||
| 625 | ); | ||
| 626 | } | ||
| 627 | } | ||
| 628 | } | ||
| 629 | |||
| 597 | Self { | 630 | Self { |
| 598 | bootstrap_relay_url, | 631 | bootstrap_relay_url, |
| 599 | service_domain, | 632 | service_domain, |
| @@ -605,18 +638,7 @@ impl SyncManager { | |||
| 605 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 638 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 606 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 639 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 607 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | 640 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 608 | rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { | 641 | rejected_events_index, |
| 609 | RejectedEventsIndex::with_metrics( | ||
| 610 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 611 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 612 | metrics.clone(), | ||
| 613 | ) | ||
| 614 | } else { | ||
| 615 | RejectedEventsIndex::new( | ||
| 616 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 617 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 618 | ) | ||
| 619 | }), | ||
| 620 | connections: HashMap::new(), | 642 | connections: HashMap::new(), |
| 621 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 643 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 622 | next_batch_id: 0, | 644 | next_batch_id: 0, |
| @@ -637,6 +659,31 @@ impl SyncManager { | |||
| 637 | self.next_batch_id | 659 | self.next_batch_id |
| 638 | } | 660 | } |
| 639 | 661 | ||
| 662 | /// Get a clone of the rejected events index Arc. | ||
| 663 | /// | ||
| 664 | /// This allows access to the rejected events index for persistence | ||
| 665 | /// even after the SyncManager has been moved into a task. | ||
| 666 | /// | ||
| 667 | /// # Returns | ||
| 668 | /// Arc clone of the rejected events index | ||
| 669 | pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> { | ||
| 670 | self.rejected_events_index.clone() | ||
| 671 | } | ||
| 672 | |||
| 673 | /// Save rejected events index to disk. | ||
| 674 | /// | ||
| 675 | /// This is called during shutdown to persist the rejected events cache, | ||
| 676 | /// allowing us to avoid re-downloading rejected events after restart. | ||
| 677 | /// | ||
| 678 | /// # Arguments | ||
| 679 | /// * `path` - Path to save the rejected index file | ||
| 680 | /// | ||
| 681 | /// # Returns | ||
| 682 | /// Ok(()) on success, Err if save fails | ||
| 683 | pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 684 | self.rejected_events_index.save_to_disk(path) | ||
| 685 | } | ||
| 686 | |||
| 640 | /// Handle EOSE (End Of Stored Events) for a subscription | 687 | /// Handle EOSE (End Of Stored Events) for a subscription |
| 641 | /// | 688 | /// |
| 642 | /// This method: | 689 | /// This method: |
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 4d31901..f25f22a 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -86,12 +86,14 @@ | |||
| 86 | //! ``` | 86 | //! ``` |
| 87 | 87 | ||
| 88 | use nostr_sdk::{Event, EventId, PublicKey}; | 88 | use nostr_sdk::{Event, EventId, PublicKey}; |
| 89 | use serde::{Deserialize, Serialize}; | ||
| 89 | use std::collections::{HashMap, HashSet}; | 90 | use std::collections::{HashMap, HashSet}; |
| 91 | use std::path::Path; | ||
| 90 | use std::sync::{Arc, RwLock}; | 92 | use std::sync::{Arc, RwLock}; |
| 91 | use std::time::{Duration, Instant}; | 93 | use std::time::{Duration, Instant, SystemTime}; |
| 92 | 94 | ||
| 93 | /// Type of event stored in the rejected events index | 95 | /// Type of event stored in the rejected events index |
| 94 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 96 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] |
| 95 | pub enum EventType { | 97 | pub enum EventType { |
| 96 | /// Repository announcement (kind 30617) | 98 | /// Repository announcement (kind 30617) |
| 97 | Announcement, | 99 | Announcement, |
| @@ -109,7 +111,7 @@ impl std::fmt::Display for EventType { | |||
| 109 | } | 111 | } |
| 110 | 112 | ||
| 111 | /// Reason why a repository announcement was rejected | 113 | /// Reason why a repository announcement was rejected |
| 112 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 114 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] |
| 113 | pub enum RejectionReason { | 115 | pub enum RejectionReason { |
| 114 | /// Announcement doesn't list this service in clone/web URLs | 116 | /// Announcement doesn't list this service in clone/web URLs |
| 115 | DoesNotListService, | 117 | DoesNotListService, |
| @@ -141,6 +143,20 @@ struct HotCacheEntry { | |||
| 141 | cached_at: Instant, | 143 | cached_at: Instant, |
| 142 | } | 144 | } |
| 143 | 145 | ||
| 146 | /// Serializable version of HotCacheEntry for persistence | ||
| 147 | /// | ||
| 148 | /// Converts Instant to Duration offset from saved_at time | ||
| 149 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 150 | struct SerializableHotCacheEntry { | ||
| 151 | event: Event, | ||
| 152 | pubkey: PublicKey, | ||
| 153 | identifier: String, | ||
| 154 | event_type: EventType, | ||
| 155 | reason: RejectionReason, | ||
| 156 | /// Duration since saved_at when this entry was cached | ||
| 157 | cached_at_offset_secs: u64, | ||
| 158 | } | ||
| 159 | |||
| 144 | /// Entry in the cold index (metadata only) | 160 | /// Entry in the cold index (metadata only) |
| 145 | /// | 161 | /// |
| 146 | /// Note: event_id is stored as the HashMap key, not in this struct | 162 | /// Note: event_id is stored as the HashMap key, not in this struct |
| @@ -154,6 +170,49 @@ struct ColdIndexEntry { | |||
| 154 | rejected_at: Instant, | 170 | rejected_at: Instant, |
| 155 | } | 171 | } |
| 156 | 172 | ||
| 173 | /// Serializable version of ColdIndexEntry for persistence | ||
| 174 | /// | ||
| 175 | /// Converts Instant to Duration offset from saved_at time | ||
| 176 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 177 | struct SerializableColdIndexEntry { | ||
| 178 | pubkey: PublicKey, | ||
| 179 | identifier: String, | ||
| 180 | event_type: EventType, | ||
| 181 | reason: RejectionReason, | ||
| 182 | /// Duration since saved_at when this entry was rejected | ||
| 183 | rejected_at_offset_secs: u64, | ||
| 184 | } | ||
| 185 | |||
| 186 | /// Serializable state for hot cache | ||
| 187 | #[derive(Debug, Serialize, Deserialize)] | ||
| 188 | struct SerializableHotCache { | ||
| 189 | expiry_duration_secs: u64, | ||
| 190 | entries: HashMap<EventId, SerializableHotCacheEntry>, | ||
| 191 | } | ||
| 192 | |||
| 193 | /// Serializable state for cold index | ||
| 194 | #[derive(Debug, Serialize, Deserialize)] | ||
| 195 | struct SerializableColdIndex { | ||
| 196 | expiry_duration_secs: u64, | ||
| 197 | entries: HashMap<EventId, SerializableColdIndexEntry>, | ||
| 198 | } | ||
| 199 | |||
| 200 | /// Complete rejected cache state for persistence | ||
| 201 | /// | ||
| 202 | /// Stores both hot cache and cold index with version and timestamp information. | ||
| 203 | /// All Instant fields are converted to Duration offsets from saved_at. | ||
| 204 | #[derive(Debug, Serialize, Deserialize)] | ||
| 205 | struct RejectedCacheState { | ||
| 206 | /// Version for future compatibility | ||
| 207 | version: u32, | ||
| 208 | /// When this state was saved | ||
| 209 | saved_at: SystemTime, | ||
| 210 | /// Hot cache entries with full events | ||
| 211 | hot_cache: SerializableHotCache, | ||
| 212 | /// Cold index entries with metadata only | ||
| 213 | cold_index: SerializableColdIndex, | ||
| 214 | } | ||
| 215 | |||
| 157 | /// Hot cache: Stores full events for immediate re-processing | 216 | /// Hot cache: Stores full events for immediate re-processing |
| 158 | /// | 217 | /// |
| 159 | /// Events are stored for a short duration (default: 2 minutes) to enable | 218 | /// Events are stored for a short duration (default: 2 minutes) to enable |
| @@ -603,6 +662,168 @@ impl RejectedEventsIndex { | |||
| 603 | 662 | ||
| 604 | ids | 663 | ids |
| 605 | } | 664 | } |
| 665 | |||
| 666 | /// Save rejected events cache to disk | ||
| 667 | /// | ||
| 668 | /// Serializes both hot cache and cold index to JSON, converting Instant timestamps | ||
| 669 | /// to Duration offsets from the save time. This allows timestamps to be adjusted | ||
| 670 | /// for downtime when restored. | ||
| 671 | /// | ||
| 672 | /// # Arguments | ||
| 673 | /// | ||
| 674 | /// * `path` - File path to write the serialized state to | ||
| 675 | /// | ||
| 676 | /// # Returns | ||
| 677 | /// | ||
| 678 | /// Ok(()) on success, or an error if serialization or file write fails | ||
| 679 | pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 680 | let saved_at = SystemTime::now(); | ||
| 681 | let now = Instant::now(); | ||
| 682 | |||
| 683 | // Lock both caches for consistent snapshot | ||
| 684 | let hot_entries = self.hot_cache.entries.read().unwrap(); | ||
| 685 | let cold_entries = self.cold_index.entries.read().unwrap(); | ||
| 686 | |||
| 687 | // Convert hot cache entries to serializable format | ||
| 688 | let serializable_hot_entries: HashMap<EventId, SerializableHotCacheEntry> = hot_entries | ||
| 689 | .iter() | ||
| 690 | .map(|(event_id, entry)| { | ||
| 691 | let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs(); | ||
| 692 | |||
| 693 | let serializable_entry = SerializableHotCacheEntry { | ||
| 694 | event: entry.event.clone(), | ||
| 695 | pubkey: entry.pubkey, | ||
| 696 | identifier: entry.identifier.clone(), | ||
| 697 | event_type: entry.event_type, | ||
| 698 | reason: entry.reason, | ||
| 699 | cached_at_offset_secs, | ||
| 700 | }; | ||
| 701 | |||
| 702 | (*event_id, serializable_entry) | ||
| 703 | }) | ||
| 704 | .collect(); | ||
| 705 | |||
| 706 | // Convert cold index entries to serializable format | ||
| 707 | let serializable_cold_entries: HashMap<EventId, SerializableColdIndexEntry> = cold_entries | ||
| 708 | .iter() | ||
| 709 | .map(|(event_id, entry)| { | ||
| 710 | let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs(); | ||
| 711 | |||
| 712 | let serializable_entry = SerializableColdIndexEntry { | ||
| 713 | pubkey: entry.pubkey, | ||
| 714 | identifier: entry.identifier.clone(), | ||
| 715 | event_type: entry.event_type, | ||
| 716 | reason: entry.reason, | ||
| 717 | rejected_at_offset_secs, | ||
| 718 | }; | ||
| 719 | |||
| 720 | (*event_id, serializable_entry) | ||
| 721 | }) | ||
| 722 | .collect(); | ||
| 723 | |||
| 724 | // Create complete state | ||
| 725 | let state = RejectedCacheState { | ||
| 726 | version: 1, | ||
| 727 | saved_at, | ||
| 728 | hot_cache: SerializableHotCache { | ||
| 729 | expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(), | ||
| 730 | entries: serializable_hot_entries, | ||
| 731 | }, | ||
| 732 | cold_index: SerializableColdIndex { | ||
| 733 | expiry_duration_secs: self.cold_index.expiry_duration.as_secs(), | ||
| 734 | entries: serializable_cold_entries, | ||
| 735 | }, | ||
| 736 | }; | ||
| 737 | |||
| 738 | // Serialize to JSON and write to file | ||
| 739 | let json = serde_json::to_string_pretty(&state)?; | ||
| 740 | std::fs::write(path, json)?; | ||
| 741 | |||
| 742 | Ok(()) | ||
| 743 | } | ||
| 744 | |||
| 745 | /// Restore rejected events cache from disk | ||
| 746 | /// | ||
| 747 | /// Loads the serialized state from disk and populates both hot cache and cold index. | ||
| 748 | /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain | ||
| 749 | /// correct expiry behavior. Deletes the state file after successful restore. | ||
| 750 | /// | ||
| 751 | /// # Arguments | ||
| 752 | /// | ||
| 753 | /// * `path` - File path to read the serialized state from | ||
| 754 | /// | ||
| 755 | /// # Returns | ||
| 756 | /// | ||
| 757 | /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails | ||
| 758 | pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 759 | // Load and parse JSON | ||
| 760 | let json = std::fs::read_to_string(path)?; | ||
| 761 | let state: RejectedCacheState = serde_json::from_str(&json)?; | ||
| 762 | |||
| 763 | // Calculate downtime (how long the relay was offline) | ||
| 764 | let now_system = SystemTime::now(); | ||
| 765 | let downtime = now_system | ||
| 766 | .duration_since(state.saved_at) | ||
| 767 | .unwrap_or(Duration::ZERO); | ||
| 768 | |||
| 769 | let now_instant = Instant::now(); | ||
| 770 | |||
| 771 | // Lock both caches for restoration | ||
| 772 | let mut hot_entries = self.hot_cache.entries.write().unwrap(); | ||
| 773 | let mut cold_entries = self.cold_index.entries.write().unwrap(); | ||
| 774 | |||
| 775 | // Restore hot cache entries | ||
| 776 | for (event_id, serializable_entry) in state.hot_cache.entries { | ||
| 777 | // Reconstruct cached_at by extending the offset by downtime | ||
| 778 | // Original offset (how long ago it was cached when saved) | ||
| 779 | let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs); | ||
| 780 | // Total offset including downtime | ||
| 781 | let total_offset = original_offset + downtime; | ||
| 782 | |||
| 783 | // cached_at = now - total_offset | ||
| 784 | let cached_at = now_instant - total_offset; | ||
| 785 | |||
| 786 | let entry = HotCacheEntry { | ||
| 787 | event: serializable_entry.event, | ||
| 788 | pubkey: serializable_entry.pubkey, | ||
| 789 | identifier: serializable_entry.identifier, | ||
| 790 | event_type: serializable_entry.event_type, | ||
| 791 | reason: serializable_entry.reason, | ||
| 792 | cached_at, | ||
| 793 | }; | ||
| 794 | |||
| 795 | hot_entries.insert(event_id, entry); | ||
| 796 | } | ||
| 797 | |||
| 798 | // Restore cold index entries | ||
| 799 | for (event_id, serializable_entry) in state.cold_index.entries { | ||
| 800 | // Reconstruct rejected_at by extending the offset by downtime | ||
| 801 | let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs); | ||
| 802 | let total_offset = original_offset + downtime; | ||
| 803 | |||
| 804 | // rejected_at = now - total_offset | ||
| 805 | let rejected_at = now_instant - total_offset; | ||
| 806 | |||
| 807 | let entry = ColdIndexEntry { | ||
| 808 | pubkey: serializable_entry.pubkey, | ||
| 809 | identifier: serializable_entry.identifier, | ||
| 810 | event_type: serializable_entry.event_type, | ||
| 811 | reason: serializable_entry.reason, | ||
| 812 | rejected_at, | ||
| 813 | }; | ||
| 814 | |||
| 815 | cold_entries.insert(event_id, entry); | ||
| 816 | } | ||
| 817 | |||
| 818 | // Release locks before deleting file | ||
| 819 | drop(hot_entries); | ||
| 820 | drop(cold_entries); | ||
| 821 | |||
| 822 | // Delete the state file after successful restore | ||
| 823 | std::fs::remove_file(path)?; | ||
| 824 | |||
| 825 | Ok(()) | ||
| 826 | } | ||
| 606 | } | 827 | } |
| 607 | 828 | ||
| 608 | #[cfg(test)] | 829 | #[cfg(test)] |
| @@ -956,4 +1177,503 @@ mod tests { | |||
| 956 | // Cold index now empty | 1177 | // Cold index now empty |
| 957 | assert_eq!(index.cold_index_len(), 0); | 1178 | assert_eq!(index.cold_index_len(), 0); |
| 958 | } | 1179 | } |
| 1180 | |||
| 1181 | // ======================================================================== | ||
| 1182 | // Persistence Serialization Tests | ||
| 1183 | // ======================================================================== | ||
| 1184 | |||
| 1185 | #[tokio::test] | ||
| 1186 | async fn test_save_and_restore_hot_cache_roundtrip() { | ||
| 1187 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1188 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1189 | |||
| 1190 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1191 | let event = create_test_event().await; | ||
| 1192 | let pubkey = event.pubkey; | ||
| 1193 | let identifier = "test-repo".to_string(); | ||
| 1194 | |||
| 1195 | // Add event to hot cache | ||
| 1196 | index.add_announcement( | ||
| 1197 | event.clone(), | ||
| 1198 | pubkey, | ||
| 1199 | identifier.clone(), | ||
| 1200 | RejectionReason::DoesNotListService, | ||
| 1201 | ); | ||
| 1202 | |||
| 1203 | assert_eq!(index.hot_cache_len(), 1); | ||
| 1204 | assert_eq!(index.cold_index_len(), 1); | ||
| 1205 | |||
| 1206 | // Save to disk | ||
| 1207 | index.save_to_disk(&state_path).unwrap(); | ||
| 1208 | assert!(state_path.exists()); | ||
| 1209 | |||
| 1210 | // Create new index and restore | ||
| 1211 | let index2 = | ||
| 1212 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1213 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1214 | |||
| 1215 | // Verify state file was deleted after restore | ||
| 1216 | assert!(!state_path.exists()); | ||
| 1217 | |||
| 1218 | // Verify hot cache restored | ||
| 1219 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1220 | assert!(index2.hot_cache.contains(&event.id)); | ||
| 1221 | |||
| 1222 | // Verify cold index restored | ||
| 1223 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1224 | assert!(index2.cold_index.contains(&event.id)); | ||
| 1225 | |||
| 1226 | // Verify we can retrieve the event | ||
| 1227 | let events = index2 | ||
| 1228 | .hot_cache | ||
| 1229 | .get_maintainer_events(&pubkey, &identifier, None); | ||
| 1230 | assert_eq!(events.len(), 1); | ||
| 1231 | assert_eq!(events[0].id, event.id); | ||
| 1232 | } | ||
| 1233 | |||
| 1234 | #[tokio::test] | ||
| 1235 | async fn test_save_and_restore_cold_index_only() { | ||
| 1236 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1237 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1238 | |||
| 1239 | let index = RejectedEventsIndex::new( | ||
| 1240 | Duration::from_millis(50), // Hot cache expires quickly | ||
| 1241 | Duration::from_secs(604800), // Cold index lasts long | ||
| 1242 | ); | ||
| 1243 | let event = create_test_event().await; | ||
| 1244 | |||
| 1245 | // Add event | ||
| 1246 | index.add_announcement( | ||
| 1247 | event.clone(), | ||
| 1248 | event.pubkey, | ||
| 1249 | "test-repo".to_string(), | ||
| 1250 | RejectionReason::MaintainerNotYetValid, | ||
| 1251 | ); | ||
| 1252 | |||
| 1253 | // Wait for hot cache to expire | ||
| 1254 | std::thread::sleep(Duration::from_millis(60)); | ||
| 1255 | index.cleanup_expired_for_type("announcement"); | ||
| 1256 | |||
| 1257 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1258 | assert_eq!(index.cold_index_len(), 1); | ||
| 1259 | |||
| 1260 | // Save to disk | ||
| 1261 | index.save_to_disk(&state_path).unwrap(); | ||
| 1262 | |||
| 1263 | // Restore into new index | ||
| 1264 | let index2 = | ||
| 1265 | RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800)); | ||
| 1266 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1267 | |||
| 1268 | // Verify only cold index restored (hot cache was empty) | ||
| 1269 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 1270 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1271 | assert!(index2.cold_index.contains(&event.id)); | ||
| 1272 | } | ||
| 1273 | |||
| 1274 | #[tokio::test] | ||
| 1275 | async fn test_save_and_restore_both_hot_and_cold() { | ||
| 1276 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1277 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1278 | |||
| 1279 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1280 | let keys = Keys::generate(); | ||
| 1281 | |||
| 1282 | // Create two events | ||
| 1283 | let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); | ||
| 1284 | let event1 = keys.sign_event(unsigned1).await.unwrap(); | ||
| 1285 | |||
| 1286 | let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); | ||
| 1287 | let event2 = keys.sign_event(unsigned2).await.unwrap(); | ||
| 1288 | |||
| 1289 | // Add both events | ||
| 1290 | index.add_announcement( | ||
| 1291 | event1.clone(), | ||
| 1292 | event1.pubkey, | ||
| 1293 | "repo1".to_string(), | ||
| 1294 | RejectionReason::DoesNotListService, | ||
| 1295 | ); | ||
| 1296 | |||
| 1297 | index.add_state( | ||
| 1298 | event2.clone(), | ||
| 1299 | event2.pubkey, | ||
| 1300 | "repo2".to_string(), | ||
| 1301 | RejectionReason::Other, | ||
| 1302 | ); | ||
| 1303 | |||
| 1304 | assert_eq!(index.hot_cache_len(), 2); | ||
| 1305 | assert_eq!(index.cold_index_len(), 2); | ||
| 1306 | |||
| 1307 | // Save to disk | ||
| 1308 | index.save_to_disk(&state_path).unwrap(); | ||
| 1309 | |||
| 1310 | // Restore into new index | ||
| 1311 | let index2 = | ||
| 1312 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1313 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1314 | |||
| 1315 | // Verify both caches restored | ||
| 1316 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1317 | assert_eq!(index2.cold_index_len(), 2); | ||
| 1318 | assert!(index2.contains(&event1.id)); | ||
| 1319 | assert!(index2.contains(&event2.id)); | ||
| 1320 | } | ||
| 1321 | |||
| 1322 | #[tokio::test] | ||
| 1323 | async fn test_save_and_restore_empty_cache() { | ||
| 1324 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1325 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1326 | |||
| 1327 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1328 | |||
| 1329 | // Save empty cache | ||
| 1330 | index.save_to_disk(&state_path).unwrap(); | ||
| 1331 | assert!(state_path.exists()); | ||
| 1332 | |||
| 1333 | // Restore into new index | ||
| 1334 | let index2 = | ||
| 1335 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1336 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1337 | |||
| 1338 | // Verify empty state restored | ||
| 1339 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 1340 | assert_eq!(index2.cold_index_len(), 0); | ||
| 1341 | } | ||
| 1342 | |||
| 1343 | #[tokio::test] | ||
| 1344 | async fn test_restore_missing_file() { | ||
| 1345 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1346 | let state_path = temp_dir.path().join("nonexistent.json"); | ||
| 1347 | |||
| 1348 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1349 | |||
| 1350 | // Attempting to restore missing file should return error | ||
| 1351 | let result = index.restore_from_disk(&state_path); | ||
| 1352 | assert!(result.is_err()); | ||
| 1353 | |||
| 1354 | // Index should remain empty | ||
| 1355 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1356 | assert_eq!(index.cold_index_len(), 0); | ||
| 1357 | } | ||
| 1358 | |||
| 1359 | #[tokio::test] | ||
| 1360 | async fn test_restore_corrupted_json() { | ||
| 1361 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1362 | let state_path = temp_dir.path().join("corrupted.json"); | ||
| 1363 | |||
| 1364 | // Write corrupted JSON | ||
| 1365 | std::fs::write(&state_path, "{ invalid json !!!").unwrap(); | ||
| 1366 | |||
| 1367 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1368 | |||
| 1369 | // Attempting to restore corrupted file should return error | ||
| 1370 | let result = index.restore_from_disk(&state_path); | ||
| 1371 | assert!(result.is_err()); | ||
| 1372 | |||
| 1373 | // Index should remain empty | ||
| 1374 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1375 | assert_eq!(index.cold_index_len(), 0); | ||
| 1376 | } | ||
| 1377 | |||
| 1378 | #[tokio::test] | ||
| 1379 | async fn test_file_cleanup_after_successful_restore() { | ||
| 1380 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1381 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1382 | |||
| 1383 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1384 | let event = create_test_event().await; | ||
| 1385 | |||
| 1386 | index.add_announcement( | ||
| 1387 | event.clone(), | ||
| 1388 | event.pubkey, | ||
| 1389 | "test-repo".to_string(), | ||
| 1390 | RejectionReason::DoesNotListService, | ||
| 1391 | ); | ||
| 1392 | |||
| 1393 | // Save to disk | ||
| 1394 | index.save_to_disk(&state_path).unwrap(); | ||
| 1395 | assert!(state_path.exists()); | ||
| 1396 | |||
| 1397 | // Restore | ||
| 1398 | let index2 = | ||
| 1399 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1400 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1401 | |||
| 1402 | // File should be deleted after successful restore | ||
| 1403 | assert!(!state_path.exists()); | ||
| 1404 | } | ||
| 1405 | |||
| 1406 | #[tokio::test] | ||
| 1407 | async fn test_downtime_calculation_preserves_expiry() { | ||
| 1408 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1409 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1410 | |||
| 1411 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1412 | let event = create_test_event().await; | ||
| 1413 | |||
| 1414 | index.add_announcement( | ||
| 1415 | event.clone(), | ||
| 1416 | event.pubkey, | ||
| 1417 | "test-repo".to_string(), | ||
| 1418 | RejectionReason::DoesNotListService, | ||
| 1419 | ); | ||
| 1420 | |||
| 1421 | // Save to disk | ||
| 1422 | index.save_to_disk(&state_path).unwrap(); | ||
| 1423 | |||
| 1424 | // Simulate downtime by sleeping | ||
| 1425 | std::thread::sleep(Duration::from_millis(100)); | ||
| 1426 | |||
| 1427 | // Restore | ||
| 1428 | let index2 = | ||
| 1429 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1430 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1431 | |||
| 1432 | // Event should still be in both caches (downtime accounted for) | ||
| 1433 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1434 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1435 | assert!(index2.contains(&event.id)); | ||
| 1436 | } | ||
| 1437 | |||
| 1438 | #[tokio::test] | ||
| 1439 | async fn test_entries_expired_during_downtime() { | ||
| 1440 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1441 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1442 | |||
| 1443 | // Create index with very short expiry | ||
| 1444 | let index = RejectedEventsIndex::new( | ||
| 1445 | Duration::from_millis(100), // Hot cache: 100ms | ||
| 1446 | Duration::from_millis(200), // Cold index: 200ms | ||
| 1447 | ); | ||
| 1448 | let event = create_test_event().await; | ||
| 1449 | |||
| 1450 | index.add_announcement( | ||
| 1451 | event.clone(), | ||
| 1452 | event.pubkey, | ||
| 1453 | "test-repo".to_string(), | ||
| 1454 | RejectionReason::DoesNotListService, | ||
| 1455 | ); | ||
| 1456 | |||
| 1457 | // Save to disk | ||
| 1458 | index.save_to_disk(&state_path).unwrap(); | ||
| 1459 | |||
| 1460 | // Simulate downtime longer than hot cache expiry | ||
| 1461 | std::thread::sleep(Duration::from_millis(150)); | ||
| 1462 | |||
| 1463 | // Restore | ||
| 1464 | let index2 = | ||
| 1465 | RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200)); | ||
| 1466 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1467 | |||
| 1468 | // Hot cache entry should have expired during downtime | ||
| 1469 | // Cold index should still have it (200ms expiry) | ||
| 1470 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1471 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1472 | |||
| 1473 | // But when we try to get it, hot cache will see it's expired | ||
| 1474 | let events = index2 | ||
| 1475 | .hot_cache | ||
| 1476 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1477 | assert_eq!(events.len(), 0); // Expired! | ||
| 1478 | |||
| 1479 | // Cleanup should remove it | ||
| 1480 | let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement"); | ||
| 1481 | assert_eq!(hot_expired, 1); | ||
| 1482 | assert_eq!(cold_expired, 0); // Not expired yet | ||
| 1483 | } | ||
| 1484 | |||
| 1485 | #[tokio::test] | ||
| 1486 | async fn test_hot_cache_different_event_types() { | ||
| 1487 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1488 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1489 | |||
| 1490 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1491 | let keys = Keys::generate(); | ||
| 1492 | |||
| 1493 | // Create announcement event | ||
| 1494 | let unsigned_ann = | ||
| 1495 | nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); | ||
| 1496 | let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); | ||
| 1497 | |||
| 1498 | // Create state event | ||
| 1499 | let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); | ||
| 1500 | let event_state = keys.sign_event(unsigned_state).await.unwrap(); | ||
| 1501 | |||
| 1502 | // Add both types | ||
| 1503 | index.add_announcement( | ||
| 1504 | event_ann.clone(), | ||
| 1505 | event_ann.pubkey, | ||
| 1506 | "test-repo".to_string(), | ||
| 1507 | RejectionReason::DoesNotListService, | ||
| 1508 | ); | ||
| 1509 | |||
| 1510 | index.add_state( | ||
| 1511 | event_state.clone(), | ||
| 1512 | event_state.pubkey, | ||
| 1513 | "test-repo".to_string(), | ||
| 1514 | RejectionReason::Other, | ||
| 1515 | ); | ||
| 1516 | |||
| 1517 | // Save and restore | ||
| 1518 | index.save_to_disk(&state_path).unwrap(); | ||
| 1519 | let index2 = | ||
| 1520 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1521 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1522 | |||
| 1523 | // Verify both event types restored | ||
| 1524 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1525 | assert!(index2.contains(&event_ann.id)); | ||
| 1526 | assert!(index2.contains(&event_state.id)); | ||
| 1527 | |||
| 1528 | // Verify we can filter by type | ||
| 1529 | let (removed, events) = index2.invalidate_and_get( | ||
| 1530 | &event_ann.pubkey, | ||
| 1531 | "test-repo", | ||
| 1532 | Some(EventType::Announcement), | ||
| 1533 | ); | ||
| 1534 | assert_eq!(removed, 1); | ||
| 1535 | assert_eq!(events.len(), 1); | ||
| 1536 | assert_eq!(events[0].id, event_ann.id); | ||
| 1537 | } | ||
| 1538 | |||
| 1539 | #[tokio::test] | ||
| 1540 | async fn test_cold_index_different_rejection_reasons() { | ||
| 1541 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1542 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1543 | |||
| 1544 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1545 | let keys = Keys::generate(); | ||
| 1546 | |||
| 1547 | // Create events with different rejection reasons | ||
| 1548 | let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); | ||
| 1549 | let event1 = keys.sign_event(unsigned1).await.unwrap(); | ||
| 1550 | |||
| 1551 | let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); | ||
| 1552 | let event2 = keys.sign_event(unsigned2).await.unwrap(); | ||
| 1553 | |||
| 1554 | let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key()); | ||
| 1555 | let event3 = keys.sign_event(unsigned3).await.unwrap(); | ||
| 1556 | |||
| 1557 | // Add with different rejection reasons | ||
| 1558 | index.add_announcement( | ||
| 1559 | event1.clone(), | ||
| 1560 | event1.pubkey, | ||
| 1561 | "repo1".to_string(), | ||
| 1562 | RejectionReason::DoesNotListService, | ||
| 1563 | ); | ||
| 1564 | |||
| 1565 | index.add_announcement( | ||
| 1566 | event2.clone(), | ||
| 1567 | event2.pubkey, | ||
| 1568 | "repo2".to_string(), | ||
| 1569 | RejectionReason::MaintainerNotYetValid, | ||
| 1570 | ); | ||
| 1571 | |||
| 1572 | index.add_announcement( | ||
| 1573 | event3.clone(), | ||
| 1574 | event3.pubkey, | ||
| 1575 | "repo3".to_string(), | ||
| 1576 | RejectionReason::Other, | ||
| 1577 | ); | ||
| 1578 | |||
| 1579 | // Save and restore | ||
| 1580 | index.save_to_disk(&state_path).unwrap(); | ||
| 1581 | let index2 = | ||
| 1582 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1583 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1584 | |||
| 1585 | // Verify all entries restored with their rejection reasons | ||
| 1586 | assert_eq!(index2.cold_index_len(), 3); | ||
| 1587 | assert!(index2.contains(&event1.id)); | ||
| 1588 | assert!(index2.contains(&event2.id)); | ||
| 1589 | assert!(index2.contains(&event3.id)); | ||
| 1590 | } | ||
| 1591 | |||
| 1592 | #[tokio::test] | ||
| 1593 | async fn test_multiple_save_restore_cycles() { | ||
| 1594 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1595 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1596 | |||
| 1597 | // First cycle | ||
| 1598 | let index1 = | ||
| 1599 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1600 | let event1 = create_test_event().await; | ||
| 1601 | |||
| 1602 | index1.add_announcement( | ||
| 1603 | event1.clone(), | ||
| 1604 | event1.pubkey, | ||
| 1605 | "repo1".to_string(), | ||
| 1606 | RejectionReason::DoesNotListService, | ||
| 1607 | ); | ||
| 1608 | |||
| 1609 | index1.save_to_disk(&state_path).unwrap(); | ||
| 1610 | |||
| 1611 | // Second cycle - restore and add more | ||
| 1612 | let index2 = | ||
| 1613 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1614 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1615 | |||
| 1616 | let event2 = create_test_event().await; | ||
| 1617 | index2.add_announcement( | ||
| 1618 | event2.clone(), | ||
| 1619 | event2.pubkey, | ||
| 1620 | "repo2".to_string(), | ||
| 1621 | RejectionReason::MaintainerNotYetValid, | ||
| 1622 | ); | ||
| 1623 | |||
| 1624 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1625 | index2.save_to_disk(&state_path).unwrap(); | ||
| 1626 | |||
| 1627 | // Third cycle - restore again | ||
| 1628 | let index3 = | ||
| 1629 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1630 | index3.restore_from_disk(&state_path).unwrap(); | ||
| 1631 | |||
| 1632 | // Verify both events survived multiple cycles | ||
| 1633 | assert_eq!(index3.hot_cache_len(), 2); | ||
| 1634 | assert!(index3.contains(&event1.id)); | ||
| 1635 | assert!(index3.contains(&event2.id)); | ||
| 1636 | } | ||
| 1637 | |||
| 1638 | #[tokio::test] | ||
| 1639 | async fn test_restore_preserves_remaining_ttl() { | ||
| 1640 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1641 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1642 | |||
| 1643 | // Create index with 2 second hot cache expiry | ||
| 1644 | let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); | ||
| 1645 | let event = create_test_event().await; | ||
| 1646 | |||
| 1647 | index.add_announcement( | ||
| 1648 | event.clone(), | ||
| 1649 | event.pubkey, | ||
| 1650 | "test-repo".to_string(), | ||
| 1651 | RejectionReason::DoesNotListService, | ||
| 1652 | ); | ||
| 1653 | |||
| 1654 | // Wait 200ms (small fraction of TTL) | ||
| 1655 | std::thread::sleep(Duration::from_millis(200)); | ||
| 1656 | |||
| 1657 | // Save to disk | ||
| 1658 | index.save_to_disk(&state_path).unwrap(); | ||
| 1659 | |||
| 1660 | // Immediately restore (minimal downtime) | ||
| 1661 | let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); | ||
| 1662 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1663 | |||
| 1664 | // Event should still be retrievable (has ~1.8s remaining) | ||
| 1665 | let events = index2 | ||
| 1666 | .hot_cache | ||
| 1667 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1668 | assert_eq!(events.len(), 1); | ||
| 1669 | |||
| 1670 | // Wait 2 seconds (total 2.2s > 2s expiry) | ||
| 1671 | std::thread::sleep(Duration::from_secs(2)); | ||
| 1672 | |||
| 1673 | // Now it should be expired | ||
| 1674 | let events = index2 | ||
| 1675 | .hot_cache | ||
| 1676 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1677 | assert_eq!(events.len(), 0); | ||
| 1678 | } | ||
| 959 | } | 1679 | } |
diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs new file mode 100644 index 0000000..acefb41 --- /dev/null +++ b/tests/purgatory_persistence.rs | |||
| @@ -0,0 +1,755 @@ | |||
| 1 | //! Purgatory Persistence Integration Tests | ||
| 2 | //! | ||
| 3 | //! Tests that verify the full purgatory persistence save/restore cycle: | ||
| 4 | //! - Purgatory save/restore with state events, PR events, and expired events | ||
| 5 | //! - Rejected cache save/restore with hot cache and cold index entries | ||
| 6 | //! - Integration with shutdown/startup hooks | ||
| 7 | //! - Graceful degradation with missing or corrupted files | ||
| 8 | //! - Time adjustment for downtime | ||
| 9 | //! | ||
| 10 | //! # Test Strategy | ||
| 11 | //! | ||
| 12 | //! These tests verify end-to-end persistence functionality: | ||
| 13 | //! 1. Create purgatory/rejected cache instances with various entries | ||
| 14 | //! 2. Save state to disk | ||
| 15 | //! 3. Create new instances and restore from disk | ||
| 16 | //! 4. Verify all data is restored correctly | ||
| 17 | //! 5. Verify system continues to work after restore | ||
| 18 | //! | ||
| 19 | //! # Running Tests | ||
| 20 | //! | ||
| 21 | //! ```bash | ||
| 22 | //! # Run all purgatory persistence tests | ||
| 23 | //! cargo test --test purgatory_persistence | ||
| 24 | //! | ||
| 25 | //! # Run specific test | ||
| 26 | //! cargo test --test purgatory_persistence test_full_purgatory_save_restore_cycle | ||
| 27 | //! | ||
| 28 | //! # With output for debugging | ||
| 29 | //! cargo test --test purgatory_persistence -- --nocapture | ||
| 30 | //! ``` | ||
| 31 | |||
| 32 | mod common; | ||
| 33 | |||
| 34 | use ngit_grasp::purgatory::Purgatory; | ||
| 35 | use ngit_grasp::sync::rejected_index::{EventType, RejectedEventsIndex, RejectionReason}; | ||
| 36 | use nostr_sdk::prelude::*; | ||
| 37 | use std::time::Duration; | ||
| 38 | |||
| 39 | /// Helper to create a test event | ||
| 40 | async fn create_test_event(keys: &Keys, content: &str) -> Event { | ||
| 41 | EventBuilder::text_note(content) | ||
| 42 | .sign_with_keys(keys) | ||
| 43 | .unwrap() | ||
| 44 | } | ||
| 45 | |||
| 46 | /// Helper to create a state event with specific refs | ||
| 47 | fn create_state_event_with_refs( | ||
| 48 | keys: &Keys, | ||
| 49 | identifier: &str, | ||
| 50 | refs: &[(&str, &str)], | ||
| 51 | ) -> Result<Event, Box<dyn std::error::Error>> { | ||
| 52 | let mut tags = vec![Tag::identifier(identifier)]; | ||
| 53 | |||
| 54 | // Add ref tags | ||
| 55 | for (ref_name, commit_hash) in refs { | ||
| 56 | tags.push(Tag::custom( | ||
| 57 | TagKind::custom("ref"), | ||
| 58 | vec![ref_name.to_string(), commit_hash.to_string()], | ||
| 59 | )); | ||
| 60 | } | ||
| 61 | |||
| 62 | let event = EventBuilder::new(Kind::from(30618), "") | ||
| 63 | .tags(tags) | ||
| 64 | .sign_with_keys(keys)?; | ||
| 65 | |||
| 66 | Ok(event) | ||
| 67 | } | ||
| 68 | |||
| 69 | /// Test 1: Full save/restore cycle with state events, PR events, and expired events | ||
| 70 | #[tokio::test] | ||
| 71 | async fn test_full_purgatory_save_restore_cycle() { | ||
| 72 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 73 | let git_data_path = temp_dir.path().join("git"); | ||
| 74 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 75 | |||
| 76 | // Create purgatory instance | ||
| 77 | let purgatory = Purgatory::new(&git_data_path); | ||
| 78 | |||
| 79 | // Create test keys and events | ||
| 80 | let keys1 = Keys::generate(); | ||
| 81 | let keys2 = Keys::generate(); | ||
| 82 | let keys3 = Keys::generate(); | ||
| 83 | |||
| 84 | let state_event1 = | ||
| 85 | create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")]).unwrap(); | ||
| 86 | let state_event2 = | ||
| 87 | create_state_event_with_refs(&keys2, "repo2", &[("main", "def456")]).unwrap(); | ||
| 88 | |||
| 89 | let pr_event1 = create_test_event(&keys3, "PR 1").await; | ||
| 90 | let pr_event2 = create_test_event(&keys3, "PR 2").await; | ||
| 91 | |||
| 92 | // Add state events to purgatory | ||
| 93 | purgatory.add_state( | ||
| 94 | state_event1.clone(), | ||
| 95 | "repo1".to_string(), | ||
| 96 | keys1.public_key(), | ||
| 97 | ); | ||
| 98 | purgatory.add_state( | ||
| 99 | state_event2.clone(), | ||
| 100 | "repo2".to_string(), | ||
| 101 | keys2.public_key(), | ||
| 102 | ); | ||
| 103 | |||
| 104 | // Add PR events to purgatory | ||
| 105 | purgatory.add_pr( | ||
| 106 | pr_event1.clone(), | ||
| 107 | pr_event1.id.to_hex(), | ||
| 108 | "commit-abc".to_string(), | ||
| 109 | ); | ||
| 110 | purgatory.add_pr( | ||
| 111 | pr_event2.clone(), | ||
| 112 | pr_event2.id.to_hex(), | ||
| 113 | "commit-def".to_string(), | ||
| 114 | ); | ||
| 115 | |||
| 116 | // Add a PR placeholder (git-data-first scenario) | ||
| 117 | purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-xyz".to_string()); | ||
| 118 | |||
| 119 | // Note: We can't directly test expired events without accessing private fields, | ||
| 120 | // so we'll focus on testing state and PR events persistence | ||
| 121 | |||
| 122 | // Verify initial counts | ||
| 123 | let (state_count, pr_count) = purgatory.count(); | ||
| 124 | assert_eq!(state_count, 2, "Should have 2 state events"); | ||
| 125 | assert_eq!( | ||
| 126 | pr_count, 3, | ||
| 127 | "Should have 3 PR events (2 events + 1 placeholder)" | ||
| 128 | ); | ||
| 129 | |||
| 130 | // Save to disk | ||
| 131 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 132 | assert!(state_path.exists(), "State file should exist after save"); | ||
| 133 | |||
| 134 | // Create new purgatory instance and restore | ||
| 135 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 136 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 137 | |||
| 138 | // Verify state file was deleted after restore | ||
| 139 | assert!( | ||
| 140 | !state_path.exists(), | ||
| 141 | "State file should be deleted after restore" | ||
| 142 | ); | ||
| 143 | |||
| 144 | // Verify all data was restored | ||
| 145 | let (state_count2, pr_count2) = purgatory2.count(); | ||
| 146 | assert_eq!(state_count2, 2, "Should have 2 state events after restore"); | ||
| 147 | assert_eq!( | ||
| 148 | pr_count2, 3, | ||
| 149 | "Should have 3 PR events after restore (2 events + 1 placeholder)" | ||
| 150 | ); | ||
| 151 | |||
| 152 | // Verify specific state events | ||
| 153 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 154 | assert_eq!(repo1_states.len(), 1); | ||
| 155 | assert_eq!(repo1_states[0].event.id, state_event1.id); | ||
| 156 | |||
| 157 | let repo2_states = purgatory2.find_state("repo2"); | ||
| 158 | assert_eq!(repo2_states.len(), 1); | ||
| 159 | assert_eq!(repo2_states[0].event.id, state_event2.id); | ||
| 160 | |||
| 161 | // Verify PR events | ||
| 162 | let pr1 = purgatory2.find_pr(&pr_event1.id.to_hex()); | ||
| 163 | assert!(pr1.is_some()); | ||
| 164 | assert_eq!(pr1.unwrap().commit, "commit-abc"); | ||
| 165 | |||
| 166 | let pr2 = purgatory2.find_pr(&pr_event2.id.to_hex()); | ||
| 167 | assert!(pr2.is_some()); | ||
| 168 | assert_eq!(pr2.unwrap().commit, "commit-def"); | ||
| 169 | |||
| 170 | // Verify placeholder | ||
| 171 | let placeholder = purgatory2.find_pr_placeholder("placeholder-id"); | ||
| 172 | assert_eq!(placeholder, Some("commit-xyz".to_string())); | ||
| 173 | |||
| 174 | // Verify re-queueing works - get all identifiers | ||
| 175 | let identifiers = purgatory2.get_all_identifiers(); | ||
| 176 | assert_eq!(identifiers.len(), 2); | ||
| 177 | assert!(identifiers.contains(&"repo1".to_string())); | ||
| 178 | assert!(identifiers.contains(&"repo2".to_string())); | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Test 2: Rejected cache integration - save/restore hot cache and cold index | ||
| 182 | #[tokio::test] | ||
| 183 | async fn test_rejected_cache_save_restore_cycle() { | ||
| 184 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 185 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 186 | |||
| 187 | // Create rejected events index | ||
| 188 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 189 | |||
| 190 | // Create test events | ||
| 191 | let keys1 = Keys::generate(); | ||
| 192 | let keys2 = Keys::generate(); | ||
| 193 | |||
| 194 | let event1 = create_test_event(&keys1, "announcement 1").await; | ||
| 195 | let event2 = create_test_event(&keys2, "announcement 2").await; | ||
| 196 | let event3 = create_test_event(&keys1, "state 1").await; | ||
| 197 | |||
| 198 | // Add announcements to rejected cache | ||
| 199 | index.add_announcement( | ||
| 200 | event1.clone(), | ||
| 201 | event1.pubkey, | ||
| 202 | "repo1".to_string(), | ||
| 203 | RejectionReason::DoesNotListService, | ||
| 204 | ); | ||
| 205 | |||
| 206 | index.add_announcement( | ||
| 207 | event2.clone(), | ||
| 208 | event2.pubkey, | ||
| 209 | "repo2".to_string(), | ||
| 210 | RejectionReason::MaintainerNotYetValid, | ||
| 211 | ); | ||
| 212 | |||
| 213 | // Add state event to rejected cache | ||
| 214 | index.add_state( | ||
| 215 | event3.clone(), | ||
| 216 | event3.pubkey, | ||
| 217 | "repo1".to_string(), | ||
| 218 | RejectionReason::Other, | ||
| 219 | ); | ||
| 220 | |||
| 221 | // Verify initial counts | ||
| 222 | assert_eq!(index.hot_cache_len(), 3); | ||
| 223 | assert_eq!(index.cold_index_len(), 3); | ||
| 224 | |||
| 225 | // Save to disk | ||
| 226 | index.save_to_disk(&state_path).unwrap(); | ||
| 227 | assert!(state_path.exists()); | ||
| 228 | |||
| 229 | // Create new index and restore | ||
| 230 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 231 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 232 | |||
| 233 | // Verify state file was deleted | ||
| 234 | assert!(!state_path.exists()); | ||
| 235 | |||
| 236 | // Verify all entries restored | ||
| 237 | assert_eq!(index2.hot_cache_len(), 3); | ||
| 238 | assert_eq!(index2.cold_index_len(), 3); | ||
| 239 | |||
| 240 | // Verify specific entries | ||
| 241 | assert!(index2.contains(&event1.id)); | ||
| 242 | assert!(index2.contains(&event2.id)); | ||
| 243 | assert!(index2.contains(&event3.id)); | ||
| 244 | |||
| 245 | // Verify we can invalidate and get events | ||
| 246 | let (removed, hot_events) = | ||
| 247 | index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement)); | ||
| 248 | assert_eq!(removed, 1); | ||
| 249 | assert_eq!(hot_events.len(), 1); | ||
| 250 | assert_eq!(hot_events[0].id, event1.id); | ||
| 251 | } | ||
| 252 | |||
| 253 | /// Test 3: Simulated downtime - verify expiry times are adjusted correctly | ||
| 254 | #[tokio::test] | ||
| 255 | async fn test_purgatory_downtime_adjustment() { | ||
| 256 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 257 | let git_data_path = temp_dir.path().join("git"); | ||
| 258 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 259 | |||
| 260 | let purgatory = Purgatory::new(&git_data_path); | ||
| 261 | let keys = Keys::generate(); | ||
| 262 | |||
| 263 | let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 264 | .unwrap(); | ||
| 265 | |||
| 266 | purgatory.add_state(state_event.clone(), "repo1".to_string(), keys.public_key()); | ||
| 267 | |||
| 268 | // Save to disk | ||
| 269 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 270 | |||
| 271 | // Simulate downtime | ||
| 272 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 273 | |||
| 274 | // Restore | ||
| 275 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 276 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 277 | |||
| 278 | // Verify event is still there (downtime was accounted for) | ||
| 279 | let (state_count, _) = purgatory2.count(); | ||
| 280 | assert_eq!(state_count, 1); | ||
| 281 | |||
| 282 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 283 | assert_eq!(repo1_states.len(), 1); | ||
| 284 | assert_eq!(repo1_states[0].event.id, state_event.id); | ||
| 285 | |||
| 286 | // Verify the event hasn't expired yet (expiry time was adjusted) | ||
| 287 | // The event should have ~30 minutes minus the downtime | ||
| 288 | let entry = &repo1_states[0]; | ||
| 289 | let remaining = entry | ||
| 290 | .expires_at | ||
| 291 | .saturating_duration_since(std::time::Instant::now()); | ||
| 292 | assert!( | ||
| 293 | remaining > Duration::from_secs(1700), | ||
| 294 | "Event should have most of its 30min expiry remaining" | ||
| 295 | ); | ||
| 296 | } | ||
| 297 | |||
| 298 | /// Test 4: Rejected cache downtime adjustment | ||
| 299 | #[tokio::test] | ||
| 300 | async fn test_rejected_cache_downtime_adjustment() { | ||
| 301 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 302 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 303 | |||
| 304 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 305 | let keys = Keys::generate(); | ||
| 306 | |||
| 307 | let event = create_test_event(&keys, "test").await; | ||
| 308 | |||
| 309 | index.add_announcement( | ||
| 310 | event.clone(), | ||
| 311 | event.pubkey, | ||
| 312 | "repo1".to_string(), | ||
| 313 | RejectionReason::DoesNotListService, | ||
| 314 | ); | ||
| 315 | |||
| 316 | // Save to disk | ||
| 317 | index.save_to_disk(&state_path).unwrap(); | ||
| 318 | |||
| 319 | // Simulate downtime | ||
| 320 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 321 | |||
| 322 | // Restore | ||
| 323 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 324 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 325 | |||
| 326 | // Verify event is still in both caches (downtime was accounted for) | ||
| 327 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 328 | assert_eq!(index2.cold_index_len(), 1); | ||
| 329 | assert!(index2.contains(&event.id)); | ||
| 330 | } | ||
| 331 | |||
| 332 | /// Test 5: File cleanup - verify state files are deleted after successful restore | ||
| 333 | #[tokio::test] | ||
| 334 | async fn test_purgatory_file_cleanup_after_restore() { | ||
| 335 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 336 | let git_data_path = temp_dir.path().join("git"); | ||
| 337 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 338 | |||
| 339 | let purgatory = Purgatory::new(&git_data_path); | ||
| 340 | let keys = Keys::generate(); | ||
| 341 | |||
| 342 | let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 343 | .unwrap(); | ||
| 344 | |||
| 345 | purgatory.add_state(state_event, "repo1".to_string(), keys.public_key()); | ||
| 346 | |||
| 347 | // Save to disk | ||
| 348 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 349 | assert!(state_path.exists(), "State file should exist after save"); | ||
| 350 | |||
| 351 | // Restore | ||
| 352 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 353 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 354 | |||
| 355 | // Verify file was deleted | ||
| 356 | assert!( | ||
| 357 | !state_path.exists(), | ||
| 358 | "State file should be deleted after successful restore" | ||
| 359 | ); | ||
| 360 | } | ||
| 361 | |||
| 362 | /// Test 6: Rejected cache file cleanup | ||
| 363 | #[tokio::test] | ||
| 364 | async fn test_rejected_cache_file_cleanup_after_restore() { | ||
| 365 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 366 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 367 | |||
| 368 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 369 | let keys = Keys::generate(); | ||
| 370 | |||
| 371 | let event = create_test_event(&keys, "test").await; | ||
| 372 | |||
| 373 | index.add_announcement( | ||
| 374 | event, | ||
| 375 | keys.public_key(), | ||
| 376 | "repo1".to_string(), | ||
| 377 | RejectionReason::DoesNotListService, | ||
| 378 | ); | ||
| 379 | |||
| 380 | // Save to disk | ||
| 381 | index.save_to_disk(&state_path).unwrap(); | ||
| 382 | assert!(state_path.exists()); | ||
| 383 | |||
| 384 | // Restore | ||
| 385 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 386 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 387 | |||
| 388 | // Verify file was deleted | ||
| 389 | assert!(!state_path.exists()); | ||
| 390 | } | ||
| 391 | |||
| 392 | /// Test 7: Graceful degradation - missing purgatory file | ||
| 393 | #[tokio::test] | ||
| 394 | async fn test_purgatory_restore_missing_file() { | ||
| 395 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 396 | let git_data_path = temp_dir.path().join("git"); | ||
| 397 | let state_path = temp_dir.path().join("nonexistent.json"); | ||
| 398 | |||
| 399 | let purgatory = Purgatory::new(&git_data_path); | ||
| 400 | |||
| 401 | // Attempting to restore missing file should return error | ||
| 402 | let result = purgatory.restore_from_disk(&state_path); | ||
| 403 | assert!(result.is_err(), "Should error on missing file"); | ||
| 404 | |||
| 405 | // Purgatory should still be usable (empty state) | ||
| 406 | let (state_count, pr_count) = purgatory.count(); | ||
| 407 | assert_eq!(state_count, 0); | ||
| 408 | assert_eq!(pr_count, 0); | ||
| 409 | |||
| 410 | // Should be able to add events normally | ||
| 411 | let keys = Keys::generate(); | ||
| 412 | let event = create_test_event(&keys, "test").await; | ||
| 413 | purgatory.add_state(event, "repo1".to_string(), keys.public_key()); | ||
| 414 | |||
| 415 | let (state_count, _) = purgatory.count(); | ||
| 416 | assert_eq!(state_count, 1); | ||
| 417 | } | ||
| 418 | |||
| 419 | /// Test 8: Graceful degradation - missing rejected cache file | ||
| 420 | #[tokio::test] | ||
| 421 | async fn test_rejected_cache_restore_missing_file() { | ||
| 422 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 423 | let state_path = temp_dir.path().join("nonexistent.json"); | ||
| 424 | |||
| 425 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 426 | |||
| 427 | // Attempting to restore missing file should return error | ||
| 428 | let result = index.restore_from_disk(&state_path); | ||
| 429 | assert!(result.is_err()); | ||
| 430 | |||
| 431 | // Index should still be usable (empty state) | ||
| 432 | assert_eq!(index.hot_cache_len(), 0); | ||
| 433 | assert_eq!(index.cold_index_len(), 0); | ||
| 434 | |||
| 435 | // Should be able to add events normally | ||
| 436 | let keys = Keys::generate(); | ||
| 437 | let event = create_test_event(&keys, "test").await; | ||
| 438 | index.add_announcement( | ||
| 439 | event, | ||
| 440 | keys.public_key(), | ||
| 441 | "repo1".to_string(), | ||
| 442 | RejectionReason::DoesNotListService, | ||
| 443 | ); | ||
| 444 | |||
| 445 | assert_eq!(index.hot_cache_len(), 1); | ||
| 446 | assert_eq!(index.cold_index_len(), 1); | ||
| 447 | } | ||
| 448 | |||
| 449 | /// Test 9: Graceful degradation - corrupted purgatory file | ||
| 450 | #[tokio::test] | ||
| 451 | async fn test_purgatory_restore_corrupted_file() { | ||
| 452 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 453 | let git_data_path = temp_dir.path().join("git"); | ||
| 454 | let state_path = temp_dir.path().join("corrupted.json"); | ||
| 455 | |||
| 456 | // Write corrupted JSON | ||
| 457 | std::fs::write(&state_path, "{ invalid json !!!").unwrap(); | ||
| 458 | |||
| 459 | let purgatory = Purgatory::new(&git_data_path); | ||
| 460 | |||
| 461 | // Attempting to restore corrupted file should return error | ||
| 462 | let result = purgatory.restore_from_disk(&state_path); | ||
| 463 | assert!(result.is_err(), "Should error on corrupted file"); | ||
| 464 | |||
| 465 | // Purgatory should still be usable | ||
| 466 | let (state_count, pr_count) = purgatory.count(); | ||
| 467 | assert_eq!(state_count, 0); | ||
| 468 | assert_eq!(pr_count, 0); | ||
| 469 | } | ||
| 470 | |||
| 471 | /// Test 10: Graceful degradation - corrupted rejected cache file | ||
| 472 | #[tokio::test] | ||
| 473 | async fn test_rejected_cache_restore_corrupted_file() { | ||
| 474 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 475 | let state_path = temp_dir.path().join("corrupted.json"); | ||
| 476 | |||
| 477 | // Write corrupted JSON | ||
| 478 | std::fs::write(&state_path, "{ invalid json !!!").unwrap(); | ||
| 479 | |||
| 480 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 481 | |||
| 482 | // Attempting to restore corrupted file should return error | ||
| 483 | let result = index.restore_from_disk(&state_path); | ||
| 484 | assert!(result.is_err()); | ||
| 485 | |||
| 486 | // Index should still be usable | ||
| 487 | assert_eq!(index.hot_cache_len(), 0); | ||
| 488 | assert_eq!(index.cold_index_len(), 0); | ||
| 489 | } | ||
| 490 | |||
| 491 | /// Test 11: Empty purgatory save/restore | ||
| 492 | #[tokio::test] | ||
| 493 | async fn test_empty_purgatory_save_restore() { | ||
| 494 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 495 | let git_data_path = temp_dir.path().join("git"); | ||
| 496 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 497 | |||
| 498 | let purgatory = Purgatory::new(&git_data_path); | ||
| 499 | |||
| 500 | // Save empty purgatory | ||
| 501 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 502 | assert!(state_path.exists()); | ||
| 503 | |||
| 504 | // Restore | ||
| 505 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 506 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 507 | |||
| 508 | // Verify empty state | ||
| 509 | let (state_count, pr_count) = purgatory2.count(); | ||
| 510 | assert_eq!(state_count, 0); | ||
| 511 | assert_eq!(pr_count, 0); | ||
| 512 | assert_eq!(purgatory2.expired_count(), 0); | ||
| 513 | } | ||
| 514 | |||
| 515 | /// Test 12: Empty rejected cache save/restore | ||
| 516 | #[tokio::test] | ||
| 517 | async fn test_empty_rejected_cache_save_restore() { | ||
| 518 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 519 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 520 | |||
| 521 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 522 | |||
| 523 | // Save empty cache | ||
| 524 | index.save_to_disk(&state_path).unwrap(); | ||
| 525 | assert!(state_path.exists()); | ||
| 526 | |||
| 527 | // Restore | ||
| 528 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 529 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 530 | |||
| 531 | // Verify empty state | ||
| 532 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 533 | assert_eq!(index2.cold_index_len(), 0); | ||
| 534 | } | ||
| 535 | |||
| 536 | /// Test 13: Multiple state events for same identifier | ||
| 537 | #[tokio::test] | ||
| 538 | async fn test_purgatory_multiple_state_events_same_identifier() { | ||
| 539 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 540 | let git_data_path = temp_dir.path().join("git"); | ||
| 541 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 542 | |||
| 543 | let purgatory = Purgatory::new(&git_data_path); | ||
| 544 | |||
| 545 | // Create multiple state events for same identifier (different maintainers) | ||
| 546 | let keys1 = Keys::generate(); | ||
| 547 | let keys2 = Keys::generate(); | ||
| 548 | |||
| 549 | let event1 = create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")]) | ||
| 550 | .unwrap(); | ||
| 551 | let event2 = create_state_event_with_refs(&keys2, "repo1", &[("main", "def456")]) | ||
| 552 | .unwrap(); | ||
| 553 | |||
| 554 | purgatory.add_state(event1.clone(), "repo1".to_string(), keys1.public_key()); | ||
| 555 | purgatory.add_state(event2.clone(), "repo1".to_string(), keys2.public_key()); | ||
| 556 | |||
| 557 | // Save and restore | ||
| 558 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 559 | |||
| 560 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 561 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 562 | |||
| 563 | // Verify both events restored | ||
| 564 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 565 | assert_eq!(repo1_states.len(), 2); | ||
| 566 | |||
| 567 | let event_ids: Vec<_> = repo1_states.iter().map(|e| e.event.id).collect(); | ||
| 568 | assert!(event_ids.contains(&event1.id)); | ||
| 569 | assert!(event_ids.contains(&event2.id)); | ||
| 570 | } | ||
| 571 | |||
| 572 | /// Test 14: Verify system continues to work after restore | ||
| 573 | #[tokio::test] | ||
| 574 | async fn test_purgatory_continues_working_after_restore() { | ||
| 575 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 576 | let git_data_path = temp_dir.path().join("git"); | ||
| 577 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 578 | |||
| 579 | let purgatory = Purgatory::new(&git_data_path); | ||
| 580 | let keys = Keys::generate(); | ||
| 581 | |||
| 582 | let event1 = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 583 | .unwrap(); | ||
| 584 | |||
| 585 | purgatory.add_state(event1.clone(), "repo1".to_string(), keys.public_key()); | ||
| 586 | |||
| 587 | // Save and restore | ||
| 588 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 589 | |||
| 590 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 591 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 592 | |||
| 593 | // Add new events after restore | ||
| 594 | let event2 = create_state_event_with_refs(&keys, "repo2", &[("main", "xyz789")]) | ||
| 595 | .unwrap(); | ||
| 596 | |||
| 597 | purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key()); | ||
| 598 | |||
| 599 | // Verify both old and new events work | ||
| 600 | let (state_count, _) = purgatory2.count(); | ||
| 601 | assert_eq!(state_count, 2); | ||
| 602 | |||
| 603 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 604 | assert_eq!(repo1_states.len(), 1); | ||
| 605 | assert_eq!(repo1_states[0].event.id, event1.id); | ||
| 606 | |||
| 607 | let repo2_states = purgatory2.find_state("repo2"); | ||
| 608 | assert_eq!(repo2_states.len(), 1); | ||
| 609 | assert_eq!(repo2_states[0].event.id, event2.id); | ||
| 610 | |||
| 611 | // Verify cleanup still works | ||
| 612 | let (state_removed, pr_removed) = purgatory2.cleanup(); | ||
| 613 | // Nothing should be expired yet | ||
| 614 | assert_eq!(state_removed, 0); | ||
| 615 | assert_eq!(pr_removed, 0); | ||
| 616 | } | ||
| 617 | |||
| 618 | /// Test 15: Verify rejected cache continues working after restore | ||
| 619 | #[tokio::test] | ||
| 620 | async fn test_rejected_cache_continues_working_after_restore() { | ||
| 621 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 622 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 623 | |||
| 624 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 625 | let keys = Keys::generate(); | ||
| 626 | |||
| 627 | let event1 = create_test_event(&keys, "event1").await; | ||
| 628 | |||
| 629 | index.add_announcement( | ||
| 630 | event1.clone(), | ||
| 631 | event1.pubkey, | ||
| 632 | "repo1".to_string(), | ||
| 633 | RejectionReason::DoesNotListService, | ||
| 634 | ); | ||
| 635 | |||
| 636 | // Save and restore | ||
| 637 | index.save_to_disk(&state_path).unwrap(); | ||
| 638 | |||
| 639 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 640 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 641 | |||
| 642 | // Add new events after restore | ||
| 643 | let event2 = create_test_event(&keys, "event2").await; | ||
| 644 | |||
| 645 | index2.add_announcement( | ||
| 646 | event2.clone(), | ||
| 647 | event2.pubkey, | ||
| 648 | "repo2".to_string(), | ||
| 649 | RejectionReason::MaintainerNotYetValid, | ||
| 650 | ); | ||
| 651 | |||
| 652 | // Verify both old and new events work | ||
| 653 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 654 | assert_eq!(index2.cold_index_len(), 2); | ||
| 655 | assert!(index2.contains(&event1.id)); | ||
| 656 | assert!(index2.contains(&event2.id)); | ||
| 657 | |||
| 658 | // Verify invalidation still works | ||
| 659 | let (removed, hot_events) = | ||
| 660 | index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement)); | ||
| 661 | assert_eq!(removed, 1); | ||
| 662 | assert_eq!(hot_events.len(), 1); | ||
| 663 | assert_eq!(hot_events[0].id, event1.id); | ||
| 664 | } | ||
| 665 | |||
| 666 | /// Test 16: Entries that expired during downtime are properly handled | ||
| 667 | #[tokio::test] | ||
| 668 | async fn test_purgatory_entries_expired_during_downtime() { | ||
| 669 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 670 | let git_data_path = temp_dir.path().join("git"); | ||
| 671 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 672 | |||
| 673 | let purgatory = Purgatory::new(&git_data_path); | ||
| 674 | let keys = Keys::generate(); | ||
| 675 | |||
| 676 | let event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 677 | .unwrap(); | ||
| 678 | |||
| 679 | purgatory.add_state(event.clone(), "repo1".to_string(), keys.public_key()); | ||
| 680 | |||
| 681 | // Save to disk | ||
| 682 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 683 | |||
| 684 | // Simulate very long downtime (longer than the 30min default expiry) | ||
| 685 | // Note: We can't manually set expiry without accessing private fields, | ||
| 686 | // so this test verifies that the system handles already-expired entries gracefully | ||
| 687 | // In a real scenario, if downtime > 30 minutes, entries would be expired on restore | ||
| 688 | |||
| 689 | // For this test, we'll just verify the restore works and cleanup can be called | ||
| 690 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 691 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 692 | |||
| 693 | // Event should be restored | ||
| 694 | let (state_count, _) = purgatory2.count(); | ||
| 695 | assert_eq!(state_count, 1); | ||
| 696 | |||
| 697 | // Cleanup should work (even if nothing is expired yet) | ||
| 698 | let (state_removed, _) = purgatory2.cleanup(); | ||
| 699 | // Nothing expired yet since we didn't wait 30 minutes | ||
| 700 | assert_eq!(state_removed, 0); | ||
| 701 | |||
| 702 | let (state_count, _) = purgatory2.count(); | ||
| 703 | assert_eq!(state_count, 1); | ||
| 704 | } | ||
| 705 | |||
| 706 | /// Test 17: Rejected cache entries that expired during downtime | ||
| 707 | #[tokio::test] | ||
| 708 | async fn test_rejected_cache_entries_expired_during_downtime() { | ||
| 709 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 710 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 711 | |||
| 712 | // Create index with very short expiry | ||
| 713 | let index = RejectedEventsIndex::new( | ||
| 714 | Duration::from_millis(50), // Hot cache: 50ms | ||
| 715 | Duration::from_millis(100), // Cold index: 100ms | ||
| 716 | ); | ||
| 717 | let keys = Keys::generate(); | ||
| 718 | |||
| 719 | let event = create_test_event(&keys, "test").await; | ||
| 720 | |||
| 721 | index.add_announcement( | ||
| 722 | event.clone(), | ||
| 723 | event.pubkey, | ||
| 724 | "repo1".to_string(), | ||
| 725 | RejectionReason::DoesNotListService, | ||
| 726 | ); | ||
| 727 | |||
| 728 | // Save to disk | ||
| 729 | index.save_to_disk(&state_path).unwrap(); | ||
| 730 | |||
| 731 | // Simulate downtime longer than hot cache expiry | ||
| 732 | tokio::time::sleep(Duration::from_millis(75)).await; | ||
| 733 | |||
| 734 | // Restore | ||
| 735 | let index2 = RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_millis(100)); | ||
| 736 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 737 | |||
| 738 | // Both should be restored initially | ||
| 739 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 740 | assert_eq!(index2.cold_index_len(), 1); | ||
| 741 | |||
| 742 | // Note: We can't directly access hot_cache.get_maintainer_events (private method) | ||
| 743 | // But we can verify the entry is there via contains() and test cleanup | ||
| 744 | |||
| 745 | // Verify entry is still tracked | ||
| 746 | assert!(index2.contains(&event.id)); | ||
| 747 | |||
| 748 | // Cleanup should remove expired hot cache entry | ||
| 749 | let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement"); | ||
| 750 | assert_eq!(hot_expired, 1); | ||
| 751 | assert_eq!(cold_expired, 0); // Cold index still valid | ||
| 752 | |||
| 753 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 754 | assert_eq!(index2.cold_index_len(), 1); | ||
| 755 | } | ||