diff options
| -rw-r--r-- | src/main.rs | 50 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 925 | ||||
| -rw-r--r-- | src/purgatory/persistence.rs | 198 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 20 | ||||
| -rw-r--r-- | src/sync/mod.rs | 71 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 726 | ||||
| -rw-r--r-- | tests/purgatory_persistence.rs | 755 |
7 files changed, 2725 insertions, 20 deletions
diff --git a/src/main.rs b/src/main.rs index a6f1d9d..5e5b83a 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc}; | |||
| 3 | 3 | ||
| 4 | use anyhow::Result; | 4 | use anyhow::Result; |
| 5 | use tokio::signal; | 5 | use tokio::signal; |
| 6 | use tracing::{info, Level}; | 6 | use tracing::{error, info, warn, Level}; |
| 7 | use tracing_subscriber::FmtSubscriber; | 7 | use tracing_subscriber::FmtSubscriber; |
| 8 | 8 | ||
| 9 | use ngit_grasp::{ | 9 | use ngit_grasp::{ |
| @@ -64,6 +64,22 @@ async fn main() -> Result<()> { | |||
| 64 | ))); | 64 | ))); |
| 65 | info!("Purgatory initialized for event coordination"); | 65 | info!("Purgatory initialized for event coordination"); |
| 66 | 66 | ||
| 67 | // Restore purgatory state from disk if available | ||
| 68 | let purgatory_path = | ||
| 69 | PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json"); | ||
| 70 | |||
| 71 | if purgatory_path.exists() { | ||
| 72 | match purgatory.restore_from_disk(&purgatory_path) { | ||
| 73 | Ok(()) => { | ||
| 74 | info!("Restored purgatory state from disk"); | ||
| 75 | // Re-queueing will happen later after sync system is created | ||
| 76 | } | ||
| 77 | Err(e) => { | ||
| 78 | warn!("Failed to restore purgatory state: {}, starting empty", e); | ||
| 79 | } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 67 | // Create Nostr relay with NIP-34 validation | 83 | // Create Nostr relay with NIP-34 validation |
| 68 | // Returns both the relay and database for direct queries in handlers | 84 | // Returns both the relay and database for direct queries in handlers |
| 69 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { | 85 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { |
| @@ -88,6 +104,7 @@ async fn main() -> Result<()> { | |||
| 88 | relay_with_db.write_policy.clone(), | 104 | relay_with_db.write_policy.clone(), |
| 89 | relay_with_db.relay.clone(), | 105 | relay_with_db.relay.clone(), |
| 90 | &config, | 106 | &config, |
| 107 | PathBuf::from(config.effective_git_data_path()), | ||
| 91 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), | 108 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), |
| 92 | ); | 109 | ); |
| 93 | 110 | ||
| @@ -100,6 +117,21 @@ async fn main() -> Result<()> { | |||
| 100 | info!("Proactive sync enabled (will discover relays from stored announcements)"); | 117 | info!("Proactive sync enabled (will discover relays from stored announcements)"); |
| 101 | } | 118 | } |
| 102 | 119 | ||
| 120 | // Re-queue all restored purgatory repos for sync | ||
| 121 | let restored_identifiers = purgatory.get_all_identifiers(); | ||
| 122 | if !restored_identifiers.is_empty() { | ||
| 123 | info!( | ||
| 124 | "Re-queueing {} restored repositories for sync", | ||
| 125 | restored_identifiers.len() | ||
| 126 | ); | ||
| 127 | for identifier in restored_identifiers { | ||
| 128 | purgatory.enqueue_sync_immediate(&identifier); | ||
| 129 | } | ||
| 130 | } | ||
| 131 | |||
| 132 | // Get a reference to the rejected events index for shutdown persistence | ||
| 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); | ||
| 134 | |||
| 103 | tokio::spawn(async move { | 135 | tokio::spawn(async move { |
| 104 | sync_manager.run().await; | 136 | sync_manager.run().await; |
| 105 | }); | 137 | }); |
| @@ -190,6 +222,22 @@ async fn main() -> Result<()> { | |||
| 190 | } | 222 | } |
| 191 | } | 223 | } |
| 192 | 224 | ||
| 225 | // Save purgatory state to disk | ||
| 226 | let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json"); | ||
| 227 | if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) { | ||
| 228 | error!("Failed to save purgatory state: {}", e); | ||
| 229 | } else { | ||
| 230 | info!("Purgatory state saved to disk"); | ||
| 231 | } | ||
| 232 | |||
| 233 | // Save rejected events cache to disk | ||
| 234 | let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json"); | ||
| 235 | if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) { | ||
| 236 | error!("Failed to save rejected events cache: {}", e); | ||
| 237 | } else { | ||
| 238 | info!("Rejected events cache saved to disk"); | ||
| 239 | } | ||
| 240 | |||
| 193 | // Cleanup placeholder refs on shutdown | 241 | // Cleanup placeholder refs on shutdown |
| 194 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); | 242 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); |
| 195 | if !placeholder_ids.is_empty() { | 243 | if !placeholder_ids.is_empty() { |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 20df19b..47798a6 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -12,6 +12,7 @@ | |||
| 12 | //! - **Separate stores**: State events and PR events use different indexing strategies | 12 | //! - **Separate stores**: State events and PR events use different indexing strategies |
| 13 | 13 | ||
| 14 | mod helpers; | 14 | mod helpers; |
| 15 | pub mod persistence; | ||
| 15 | pub mod sync; | 16 | pub mod sync; |
| 16 | mod types; | 17 | mod types; |
| 17 | 18 | ||
| @@ -20,10 +21,12 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | |||
| 20 | 21 | ||
| 21 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 22 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| 24 | use serde::{Deserialize, Serialize}; | ||
| 25 | use std::collections::HashMap; | ||
| 23 | use std::collections::HashSet; | 26 | use std::collections::HashSet; |
| 24 | use std::path::PathBuf; | 27 | use std::path::{Path, PathBuf}; |
| 25 | use std::sync::Arc; | 28 | use std::sync::Arc; |
| 26 | use std::time::{Duration, Instant}; | 29 | use std::time::{Duration, Instant, SystemTime}; |
| 27 | 30 | ||
| 28 | pub use sync::SyncQueueEntry; | 31 | pub use sync::SyncQueueEntry; |
| 29 | 32 | ||
| @@ -38,6 +41,63 @@ const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); | |||
| 38 | /// Used for batching burst arrivals during negentropy sync. | 41 | /// Used for batching burst arrivals during negentropy sync. |
| 39 | const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); | 42 | const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); |
| 40 | 43 | ||
| 44 | /// Serializable wrapper for `StatePurgatoryEntry` with time offsets. | ||
| 45 | /// | ||
| 46 | /// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp | ||
| 47 | /// in `PurgatoryState`, allowing state to be persisted and restored across restarts. | ||
| 48 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 49 | struct SerializableStatePurgatoryEntry { | ||
| 50 | /// The nostr state event (kind 30618) awaiting git data | ||
| 51 | event: Event, | ||
| 52 | /// The repository identifier from the event's 'd' tag | ||
| 53 | identifier: String, | ||
| 54 | /// Event author pubkey | ||
| 55 | author: PublicKey, | ||
| 56 | /// Duration offset from saved_at for created_at | ||
| 57 | created_at_offset_secs: u64, | ||
| 58 | /// Duration offset from saved_at for expires_at | ||
| 59 | expires_at_offset_secs: u64, | ||
| 60 | } | ||
| 61 | |||
| 62 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | ||
| 63 | /// | ||
| 64 | /// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp | ||
| 65 | /// in `PurgatoryState`, allowing state to be persisted and restored across restarts. | ||
| 66 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 67 | struct SerializablePrPurgatoryEntry { | ||
| 68 | /// The nostr PR event, if received (None = git data arrived first) | ||
| 69 | event: Option<Event>, | ||
| 70 | /// The expected commit SHA from 'c' tag (if event exists) | ||
| 71 | /// or the actual commit pushed (if git arrived first) | ||
| 72 | commit: String, | ||
| 73 | /// Duration offset from saved_at for created_at | ||
| 74 | created_at_offset_secs: u64, | ||
| 75 | /// Duration offset from saved_at for expires_at | ||
| 76 | expires_at_offset_secs: u64, | ||
| 77 | } | ||
| 78 | |||
| 79 | /// Serializable purgatory state for disk persistence. | ||
| 80 | /// | ||
| 81 | /// Contains all purgatory data needed to restore state across restarts: | ||
| 82 | /// - State events (indexed by identifier) | ||
| 83 | /// - PR events (indexed by event ID) | ||
| 84 | /// - Expired events (to prevent re-sync loops) | ||
| 85 | /// - Version number for future compatibility | ||
| 86 | /// - Saved timestamp for downtime calculation | ||
| 87 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 88 | struct PurgatoryState { | ||
| 89 | /// Version number for state format (currently 1) | ||
| 90 | version: u32, | ||
| 91 | /// When this state was saved to disk | ||
| 92 | saved_at: SystemTime, | ||
| 93 | /// State events indexed by repository identifier | ||
| 94 | state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>, | ||
| 95 | /// PR events indexed by event ID (hex string) | ||
| 96 | pr_events: HashMap<String, SerializablePrPurgatoryEntry>, | ||
| 97 | /// Expired event IDs with their expiry timestamps | ||
| 98 | expired_events: HashMap<String, SystemTime>, | ||
| 99 | } | ||
| 100 | |||
| 41 | /// Main purgatory structure holding events awaiting git data. | 101 | /// Main purgatory structure holding events awaiting git data. |
| 42 | /// | 102 | /// |
| 43 | /// Provides thread-safe concurrent access to two separate stores: | 103 | /// Provides thread-safe concurrent access to two separate stores: |
| @@ -667,6 +727,260 @@ impl Purgatory { | |||
| 667 | pub fn sync_queue_size(&self) -> usize { | 727 | pub fn sync_queue_size(&self) -> usize { |
| 668 | self.sync_queue.len() | 728 | self.sync_queue.len() |
| 669 | } | 729 | } |
| 730 | |||
| 731 | /// Get all repository identifiers currently in purgatory. | ||
| 732 | /// | ||
| 733 | /// Returns a list of all unique repository identifiers that have state events | ||
| 734 | /// in purgatory. This is useful for re-queueing repositories after restore. | ||
| 735 | /// | ||
| 736 | /// # Returns | ||
| 737 | /// Vector of repository identifiers (e.g., "owner/repo") | ||
| 738 | /// | ||
| 739 | /// # Example | ||
| 740 | /// ```no_run | ||
| 741 | /// use ngit_grasp::purgatory::Purgatory; | ||
| 742 | /// use std::path::PathBuf; | ||
| 743 | /// | ||
| 744 | /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); | ||
| 745 | /// let identifiers = purgatory.get_all_identifiers(); | ||
| 746 | /// for id in identifiers { | ||
| 747 | /// println!("Repository in purgatory: {}", id); | ||
| 748 | /// } | ||
| 749 | /// ``` | ||
| 750 | pub fn get_all_identifiers(&self) -> Vec<String> { | ||
| 751 | self.state_events | ||
| 752 | .iter() | ||
| 753 | .map(|entry| entry.key().clone()) | ||
| 754 | .collect() | ||
| 755 | } | ||
| 756 | |||
| 757 | /// Save purgatory state to disk. | ||
| 758 | /// | ||
| 759 | /// Serializes the current purgatory state (state_events, pr_events, expired_events) | ||
| 760 | /// to JSON and saves it to the specified path. Time-based fields (`Instant`) are | ||
| 761 | /// converted to duration offsets from the current `SystemTime` for persistence. | ||
| 762 | /// | ||
| 763 | /// Note: The sync_queue is NOT persisted - it will be rebuilt when events are | ||
| 764 | /// restored from disk. | ||
| 765 | /// | ||
| 766 | /// # Arguments | ||
| 767 | /// * `path` - Path to save the state file | ||
| 768 | /// | ||
| 769 | /// # Returns | ||
| 770 | /// Ok(()) on success, Err on failure | ||
| 771 | /// | ||
| 772 | /// # Example | ||
| 773 | /// ```no_run | ||
| 774 | /// use ngit_grasp::purgatory::Purgatory; | ||
| 775 | /// use std::path::PathBuf; | ||
| 776 | /// | ||
| 777 | /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); | ||
| 778 | /// purgatory.save_to_disk(&PathBuf::from("/tmp/purgatory.json")).unwrap(); | ||
| 779 | /// ``` | ||
| 780 | pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 781 | let saved_at = SystemTime::now(); | ||
| 782 | let now_instant = Instant::now(); | ||
| 783 | |||
| 784 | // Convert state_events to serializable format | ||
| 785 | let mut state_events = HashMap::new(); | ||
| 786 | for entry in self.state_events.iter() { | ||
| 787 | let identifier = entry.key().clone(); | ||
| 788 | let entries: Vec<SerializableStatePurgatoryEntry> = entry | ||
| 789 | .value() | ||
| 790 | .iter() | ||
| 791 | .map(|e| { | ||
| 792 | let created_offset = | ||
| 793 | persistence::instant_to_offset(e.created_at, saved_at, now_instant); | ||
| 794 | let expires_offset = | ||
| 795 | persistence::instant_to_offset(e.expires_at, saved_at, now_instant); | ||
| 796 | |||
| 797 | SerializableStatePurgatoryEntry { | ||
| 798 | event: e.event.clone(), | ||
| 799 | identifier: e.identifier.clone(), | ||
| 800 | author: e.author, | ||
| 801 | created_at_offset_secs: created_offset.as_secs(), | ||
| 802 | expires_at_offset_secs: expires_offset.as_secs(), | ||
| 803 | } | ||
| 804 | }) | ||
| 805 | .collect(); | ||
| 806 | state_events.insert(identifier, entries); | ||
| 807 | } | ||
| 808 | |||
| 809 | // Convert pr_events to serializable format | ||
| 810 | let mut pr_events = HashMap::new(); | ||
| 811 | for entry in self.pr_events.iter() { | ||
| 812 | let event_id = entry.key().clone(); | ||
| 813 | let e = entry.value(); | ||
| 814 | |||
| 815 | let created_offset = | ||
| 816 | persistence::instant_to_offset(e.created_at, saved_at, now_instant); | ||
| 817 | let expires_offset = | ||
| 818 | persistence::instant_to_offset(e.expires_at, saved_at, now_instant); | ||
| 819 | |||
| 820 | let serializable = SerializablePrPurgatoryEntry { | ||
| 821 | event: e.event.clone(), | ||
| 822 | commit: e.commit.clone(), | ||
| 823 | created_at_offset_secs: created_offset.as_secs(), | ||
| 824 | expires_at_offset_secs: expires_offset.as_secs(), | ||
| 825 | }; | ||
| 826 | pr_events.insert(event_id, serializable); | ||
| 827 | } | ||
| 828 | |||
| 829 | // Convert expired_events to serializable format | ||
| 830 | // We use SystemTime instead of Instant offsets for expired events since | ||
| 831 | // we don't need high precision for cleanup timing | ||
| 832 | let mut expired_events = HashMap::new(); | ||
| 833 | for entry in self.expired_events.iter() { | ||
| 834 | let event_id = entry.key().to_hex(); | ||
| 835 | // Convert Instant to SystemTime (approximate) | ||
| 836 | let expired_at_instant = *entry.value(); | ||
| 837 | let elapsed_since_expire = now_instant.saturating_duration_since(expired_at_instant); | ||
| 838 | let expired_at_system = saved_at - elapsed_since_expire; | ||
| 839 | expired_events.insert(event_id, expired_at_system); | ||
| 840 | } | ||
| 841 | |||
| 842 | // Create state structure | ||
| 843 | let state = PurgatoryState { | ||
| 844 | version: 1, | ||
| 845 | saved_at, | ||
| 846 | state_events, | ||
| 847 | pr_events, | ||
| 848 | expired_events, | ||
| 849 | }; | ||
| 850 | |||
| 851 | // Serialize to JSON and write to file | ||
| 852 | let json = serde_json::to_string_pretty(&state)?; | ||
| 853 | std::fs::write(path, json)?; | ||
| 854 | |||
| 855 | tracing::info!( | ||
| 856 | path = %path.display(), | ||
| 857 | state_events = state.state_events.len(), | ||
| 858 | pr_events = state.pr_events.len(), | ||
| 859 | expired_events = state.expired_events.len(), | ||
| 860 | "Saved purgatory state to disk" | ||
| 861 | ); | ||
| 862 | |||
| 863 | Ok(()) | ||
| 864 | } | ||
| 865 | |||
| 866 | /// Restore purgatory state from disk. | ||
| 867 | /// | ||
| 868 | /// Loads a previously saved purgatory state from the specified path and populates | ||
| 869 | /// the current purgatory instance. Adjusts time-based fields to account for downtime | ||
| 870 | /// between save and restore. | ||
| 871 | /// | ||
| 872 | /// After successful restore, the state file is deleted to prevent accidental | ||
| 873 | /// double-restore. | ||
| 874 | /// | ||
| 875 | /// # Arguments | ||
| 876 | /// * `path` - Path to the saved state file | ||
| 877 | /// | ||
| 878 | /// # Returns | ||
| 879 | /// Ok(()) on success, Err if file doesn't exist or is corrupted | ||
| 880 | /// | ||
| 881 | /// # Example | ||
| 882 | /// ```no_run | ||
| 883 | /// use ngit_grasp::purgatory::Purgatory; | ||
| 884 | /// use std::path::PathBuf; | ||
| 885 | /// | ||
| 886 | /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); | ||
| 887 | /// match purgatory.restore_from_disk(&PathBuf::from("/tmp/purgatory.json")) { | ||
| 888 | /// Ok(()) => println!("State restored successfully"), | ||
| 889 | /// Err(e) => eprintln!("Failed to restore state: {}", e), | ||
| 890 | /// } | ||
| 891 | /// ``` | ||
| 892 | pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 893 | // Read and parse state file | ||
| 894 | let json = std::fs::read_to_string(path)?; | ||
| 895 | let state: PurgatoryState = serde_json::from_str(&json)?; | ||
| 896 | |||
| 897 | // Verify version | ||
| 898 | if state.version != 1 { | ||
| 899 | return Err(format!("Unsupported state version: {}", state.version).into()); | ||
| 900 | } | ||
| 901 | |||
| 902 | let now_instant = Instant::now(); | ||
| 903 | |||
| 904 | // Restore state_events | ||
| 905 | for (identifier, entries) in state.state_events { | ||
| 906 | let restored_entries: Vec<StatePurgatoryEntry> = entries | ||
| 907 | .into_iter() | ||
| 908 | .map(|e| { | ||
| 909 | let created_at = persistence::offset_to_instant( | ||
| 910 | Duration::from_secs(e.created_at_offset_secs), | ||
| 911 | state.saved_at, | ||
| 912 | now_instant, | ||
| 913 | ); | ||
| 914 | let expires_at = persistence::offset_to_instant( | ||
| 915 | Duration::from_secs(e.expires_at_offset_secs), | ||
| 916 | state.saved_at, | ||
| 917 | now_instant, | ||
| 918 | ); | ||
| 919 | |||
| 920 | StatePurgatoryEntry { | ||
| 921 | event: e.event, | ||
| 922 | identifier: e.identifier, | ||
| 923 | author: e.author, | ||
| 924 | created_at, | ||
| 925 | expires_at, | ||
| 926 | } | ||
| 927 | }) | ||
| 928 | .collect(); | ||
| 929 | |||
| 930 | self.state_events.insert(identifier, restored_entries); | ||
| 931 | } | ||
| 932 | |||
| 933 | // Restore pr_events | ||
| 934 | for (event_id, e) in state.pr_events { | ||
| 935 | let created_at = persistence::offset_to_instant( | ||
| 936 | Duration::from_secs(e.created_at_offset_secs), | ||
| 937 | state.saved_at, | ||
| 938 | now_instant, | ||
| 939 | ); | ||
| 940 | let expires_at = persistence::offset_to_instant( | ||
| 941 | Duration::from_secs(e.expires_at_offset_secs), | ||
| 942 | state.saved_at, | ||
| 943 | now_instant, | ||
| 944 | ); | ||
| 945 | |||
| 946 | let entry = PrPurgatoryEntry { | ||
| 947 | event: e.event, | ||
| 948 | commit: e.commit, | ||
| 949 | created_at, | ||
| 950 | expires_at, | ||
| 951 | }; | ||
| 952 | |||
| 953 | self.pr_events.insert(event_id, entry); | ||
| 954 | } | ||
| 955 | |||
| 956 | // Restore expired_events | ||
| 957 | for (event_id_hex, expired_at_system) in state.expired_events { | ||
| 958 | if let Ok(event_id) = EventId::from_hex(&event_id_hex) { | ||
| 959 | // Convert SystemTime back to Instant (approximate) | ||
| 960 | let elapsed_since_expire = SystemTime::now() | ||
| 961 | .duration_since(expired_at_system) | ||
| 962 | .unwrap_or(Duration::ZERO); | ||
| 963 | let expired_at_instant = now_instant - elapsed_since_expire; | ||
| 964 | |||
| 965 | self.expired_events.insert(event_id, expired_at_instant); | ||
| 966 | } | ||
| 967 | } | ||
| 968 | |||
| 969 | tracing::info!( | ||
| 970 | path = %path.display(), | ||
| 971 | state_events = self.state_events.len(), | ||
| 972 | pr_events = self.pr_events.len(), | ||
| 973 | expired_events = self.expired_events.len(), | ||
| 974 | saved_at = ?state.saved_at, | ||
| 975 | "Restored purgatory state from disk" | ||
| 976 | ); | ||
| 977 | |||
| 978 | // Delete state file after successful restore | ||
| 979 | std::fs::remove_file(path)?; | ||
| 980 | tracing::debug!(path = %path.display(), "Deleted state file after restore"); | ||
| 981 | |||
| 982 | Ok(()) | ||
| 983 | } | ||
| 670 | } | 984 | } |
| 671 | 985 | ||
| 672 | #[cfg(test)] | 986 | #[cfg(test)] |
| @@ -1249,3 +1563,610 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1249 | // - Skip the expired check for user-submitted events | 1563 | // - Skip the expired check for user-submitted events |
| 1250 | // - Allow the event to be re-added to purgatory or accepted if git data now exists | 1564 | // - Allow the event to be re-added to purgatory or accepted if git data now exists |
| 1251 | } | 1565 | } |
| 1566 | |||
| 1567 | // ============================================================================ | ||
| 1568 | // Persistence Serialization Tests | ||
| 1569 | // ============================================================================ | ||
| 1570 | |||
| 1571 | #[tokio::test] | ||
| 1572 | async fn test_save_and_restore_state_events() { | ||
| 1573 | use tempfile::tempdir; | ||
| 1574 | |||
| 1575 | let temp_dir = tempdir().unwrap(); | ||
| 1576 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1577 | |||
| 1578 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1579 | let keys = Keys::generate(); | ||
| 1580 | |||
| 1581 | // Add multiple state events for the same identifier | ||
| 1582 | let event1 = EventBuilder::text_note("state event 1") | ||
| 1583 | .sign_with_keys(&keys) | ||
| 1584 | .unwrap(); | ||
| 1585 | let event2 = EventBuilder::text_note("state event 2") | ||
| 1586 | .sign_with_keys(&keys) | ||
| 1587 | .unwrap(); | ||
| 1588 | |||
| 1589 | let event1_id = event1.id; | ||
| 1590 | let event2_id = event2.id; | ||
| 1591 | |||
| 1592 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | ||
| 1593 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | ||
| 1594 | |||
| 1595 | // Save to disk | ||
| 1596 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1597 | |||
| 1598 | // Verify file exists | ||
| 1599 | assert!(state_file.exists()); | ||
| 1600 | |||
| 1601 | // Create new purgatory and restore | ||
| 1602 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1603 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1604 | |||
| 1605 | // Verify file was deleted after restore | ||
| 1606 | assert!(!state_file.exists()); | ||
| 1607 | |||
| 1608 | // Verify state events were restored | ||
| 1609 | let (state_count, _) = purgatory2.count(); | ||
| 1610 | assert_eq!(state_count, 2); | ||
| 1611 | |||
| 1612 | let restored_entries = purgatory2.find_state("test-repo"); | ||
| 1613 | assert_eq!(restored_entries.len(), 2); | ||
| 1614 | |||
| 1615 | // Verify event IDs match | ||
| 1616 | let restored_ids: Vec<EventId> = restored_entries.iter().map(|e| e.event.id).collect(); | ||
| 1617 | assert!(restored_ids.contains(&event1_id)); | ||
| 1618 | assert!(restored_ids.contains(&event2_id)); | ||
| 1619 | |||
| 1620 | // Verify identifiers and authors match | ||
| 1621 | for entry in &restored_entries { | ||
| 1622 | assert_eq!(entry.identifier, "test-repo"); | ||
| 1623 | assert_eq!(entry.author, keys.public_key()); | ||
| 1624 | } | ||
| 1625 | } | ||
| 1626 | |||
| 1627 | #[tokio::test] | ||
| 1628 | async fn test_save_and_restore_pr_events() { | ||
| 1629 | use nostr_sdk::{Kind, Tag, TagKind}; | ||
| 1630 | use tempfile::tempdir; | ||
| 1631 | |||
| 1632 | let temp_dir = tempdir().unwrap(); | ||
| 1633 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1634 | |||
| 1635 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1636 | let keys = Keys::generate(); | ||
| 1637 | |||
| 1638 | // Add PR event with actual event | ||
| 1639 | let tags = vec![Tag::custom( | ||
| 1640 | TagKind::Custom("a".into()), | ||
| 1641 | vec!["30617:abc123:test-repo".to_string()], | ||
| 1642 | )]; | ||
| 1643 | |||
| 1644 | let pr_event = EventBuilder::new(Kind::from(1618), "PR content") | ||
| 1645 | .tags(tags) | ||
| 1646 | .sign_with_keys(&keys) | ||
| 1647 | .unwrap(); | ||
| 1648 | |||
| 1649 | let pr_event_id = pr_event.id; | ||
| 1650 | |||
| 1651 | purgatory.add_pr( | ||
| 1652 | pr_event.clone(), | ||
| 1653 | "pr-event-id".to_string(), | ||
| 1654 | "commit-abc".to_string(), | ||
| 1655 | ); | ||
| 1656 | |||
| 1657 | // Save to disk | ||
| 1658 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1659 | |||
| 1660 | // Create new purgatory and restore | ||
| 1661 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1662 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1663 | |||
| 1664 | // Verify PR event was restored | ||
| 1665 | let (_, pr_count) = purgatory2.count(); | ||
| 1666 | assert_eq!(pr_count, 1); | ||
| 1667 | |||
| 1668 | let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); | ||
| 1669 | assert!(restored_entry.event.is_some()); | ||
| 1670 | assert_eq!(restored_entry.event.unwrap().id, pr_event_id); | ||
| 1671 | assert_eq!(restored_entry.commit, "commit-abc"); | ||
| 1672 | } | ||
| 1673 | |||
| 1674 | #[tokio::test] | ||
| 1675 | async fn test_save_and_restore_pr_placeholders() { | ||
| 1676 | use tempfile::tempdir; | ||
| 1677 | |||
| 1678 | let temp_dir = tempdir().unwrap(); | ||
| 1679 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1680 | |||
| 1681 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1682 | |||
| 1683 | // Add PR placeholder (git data arrived first) | ||
| 1684 | purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-def".to_string()); | ||
| 1685 | |||
| 1686 | // Save to disk | ||
| 1687 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1688 | |||
| 1689 | // Create new purgatory and restore | ||
| 1690 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1691 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1692 | |||
| 1693 | // Verify placeholder was restored | ||
| 1694 | let (_, pr_count) = purgatory2.count(); | ||
| 1695 | assert_eq!(pr_count, 1); | ||
| 1696 | |||
| 1697 | let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); | ||
| 1698 | assert!(restored_entry.event.is_none()); // Still a placeholder | ||
| 1699 | assert_eq!(restored_entry.commit, "commit-def"); | ||
| 1700 | |||
| 1701 | // Verify it's findable as a placeholder | ||
| 1702 | assert_eq!( | ||
| 1703 | purgatory2.find_pr_placeholder("placeholder-id"), | ||
| 1704 | Some("commit-def".to_string()) | ||
| 1705 | ); | ||
| 1706 | } | ||
| 1707 | |||
| 1708 | #[tokio::test] | ||
| 1709 | async fn test_save_and_restore_expired_events() { | ||
| 1710 | use tempfile::tempdir; | ||
| 1711 | |||
| 1712 | let temp_dir = tempdir().unwrap(); | ||
| 1713 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1714 | |||
| 1715 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1716 | let keys = Keys::generate(); | ||
| 1717 | |||
| 1718 | let event = EventBuilder::text_note("test") | ||
| 1719 | .sign_with_keys(&keys) | ||
| 1720 | .unwrap(); | ||
| 1721 | let event_id = event.id; | ||
| 1722 | |||
| 1723 | // Add and expire event | ||
| 1724 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | ||
| 1725 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | ||
| 1726 | for entry in entries.iter_mut() { | ||
| 1727 | entry.expires_at = Instant::now() - Duration::from_secs(1); | ||
| 1728 | } | ||
| 1729 | } | ||
| 1730 | purgatory.cleanup(); | ||
| 1731 | |||
| 1732 | // Verify event is marked as expired | ||
| 1733 | assert!(purgatory.is_expired(&event_id)); | ||
| 1734 | assert_eq!(purgatory.expired_count(), 1); | ||
| 1735 | |||
| 1736 | // Save to disk | ||
| 1737 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1738 | |||
| 1739 | // Create new purgatory and restore | ||
| 1740 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1741 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1742 | |||
| 1743 | // Verify expired event was restored | ||
| 1744 | assert!(purgatory2.is_expired(&event_id)); | ||
| 1745 | assert_eq!(purgatory2.expired_count(), 1); | ||
| 1746 | |||
| 1747 | // Verify it's included in event_ids() | ||
| 1748 | let ids = purgatory2.event_ids(); | ||
| 1749 | assert!(ids.contains(&event_id)); | ||
| 1750 | } | ||
| 1751 | |||
| 1752 | #[tokio::test] | ||
| 1753 | async fn test_save_and_restore_empty_purgatory() { | ||
| 1754 | use tempfile::tempdir; | ||
| 1755 | |||
| 1756 | let temp_dir = tempdir().unwrap(); | ||
| 1757 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1758 | |||
| 1759 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1760 | |||
| 1761 | // Save empty purgatory | ||
| 1762 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1763 | |||
| 1764 | // Verify file exists | ||
| 1765 | assert!(state_file.exists()); | ||
| 1766 | |||
| 1767 | // Create new purgatory and restore | ||
| 1768 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1769 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1770 | |||
| 1771 | // Verify purgatory is still empty | ||
| 1772 | let (state_count, pr_count) = purgatory2.count(); | ||
| 1773 | assert_eq!(state_count, 0); | ||
| 1774 | assert_eq!(pr_count, 0); | ||
| 1775 | assert_eq!(purgatory2.expired_count(), 0); | ||
| 1776 | } | ||
| 1777 | |||
| 1778 | #[tokio::test] | ||
| 1779 | async fn test_restore_missing_file() { | ||
| 1780 | use tempfile::tempdir; | ||
| 1781 | |||
| 1782 | let temp_dir = tempdir().unwrap(); | ||
| 1783 | let state_file = temp_dir.path().join("nonexistent.json"); | ||
| 1784 | |||
| 1785 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1786 | |||
| 1787 | // Attempting to restore from missing file should error | ||
| 1788 | let result = purgatory.restore_from_disk(&state_file); | ||
| 1789 | assert!(result.is_err()); | ||
| 1790 | |||
| 1791 | // Purgatory should remain empty | ||
| 1792 | let (state_count, pr_count) = purgatory.count(); | ||
| 1793 | assert_eq!(state_count, 0); | ||
| 1794 | assert_eq!(pr_count, 0); | ||
| 1795 | } | ||
| 1796 | |||
| 1797 | #[tokio::test] | ||
| 1798 | async fn test_restore_corrupted_json() { | ||
| 1799 | use tempfile::tempdir; | ||
| 1800 | |||
| 1801 | let temp_dir = tempdir().unwrap(); | ||
| 1802 | let state_file = temp_dir.path().join("corrupted.json"); | ||
| 1803 | |||
| 1804 | // Write invalid JSON | ||
| 1805 | std::fs::write(&state_file, "{ this is not valid json }").unwrap(); | ||
| 1806 | |||
| 1807 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1808 | |||
| 1809 | // Attempting to restore corrupted file should error | ||
| 1810 | let result = purgatory.restore_from_disk(&state_file); | ||
| 1811 | assert!(result.is_err()); | ||
| 1812 | |||
| 1813 | // Purgatory should remain empty | ||
| 1814 | let (state_count, pr_count) = purgatory.count(); | ||
| 1815 | assert_eq!(state_count, 0); | ||
| 1816 | assert_eq!(pr_count, 0); | ||
| 1817 | } | ||
| 1818 | |||
| 1819 | #[tokio::test] | ||
| 1820 | async fn test_restore_unsupported_version() { | ||
| 1821 | use tempfile::tempdir; | ||
| 1822 | |||
| 1823 | let temp_dir = tempdir().unwrap(); | ||
| 1824 | let state_file = temp_dir.path().join("wrong_version.json"); | ||
| 1825 | |||
| 1826 | // Write state with unsupported version | ||
| 1827 | let state = r#"{ | ||
| 1828 | "version": 999, | ||
| 1829 | "saved_at": {"secs_since_epoch": 1000000000, "nanos_since_epoch": 0}, | ||
| 1830 | "state_events": {}, | ||
| 1831 | "pr_events": {}, | ||
| 1832 | "expired_events": {} | ||
| 1833 | }"#; | ||
| 1834 | std::fs::write(&state_file, state).unwrap(); | ||
| 1835 | |||
| 1836 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1837 | |||
| 1838 | // Attempting to restore unsupported version should error | ||
| 1839 | let result = purgatory.restore_from_disk(&state_file); | ||
| 1840 | assert!(result.is_err()); | ||
| 1841 | assert!(result | ||
| 1842 | .unwrap_err() | ||
| 1843 | .to_string() | ||
| 1844 | .contains("Unsupported state version")); | ||
| 1845 | } | ||
| 1846 | |||
| 1847 | #[tokio::test] | ||
| 1848 | async fn test_downtime_calculation() { | ||
| 1849 | use tempfile::tempdir; | ||
| 1850 | use tokio::time::sleep; | ||
| 1851 | |||
| 1852 | let temp_dir = tempdir().unwrap(); | ||
| 1853 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1854 | |||
| 1855 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1856 | let keys = Keys::generate(); | ||
| 1857 | |||
| 1858 | // Add state event | ||
| 1859 | let event = EventBuilder::text_note("test") | ||
| 1860 | .sign_with_keys(&keys) | ||
| 1861 | .unwrap(); | ||
| 1862 | |||
| 1863 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | ||
| 1864 | |||
| 1865 | // Get original expiry time | ||
| 1866 | let original_entries = purgatory.find_state("repo"); | ||
| 1867 | let original_entry = &original_entries[0]; | ||
| 1868 | let original_expires_at = original_entry.expires_at; | ||
| 1869 | let original_remaining = original_expires_at.saturating_duration_since(Instant::now()); | ||
| 1870 | |||
| 1871 | // Save to disk | ||
| 1872 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1873 | |||
| 1874 | // Simulate downtime (100ms) | ||
| 1875 | sleep(Duration::from_millis(100)).await; | ||
| 1876 | |||
| 1877 | // Create new purgatory and restore | ||
| 1878 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1879 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1880 | |||
| 1881 | // Get restored expiry time | ||
| 1882 | let restored_entries = purgatory2.find_state("repo"); | ||
| 1883 | let restored_entry = &restored_entries[0]; | ||
| 1884 | let restored_expires_at = restored_entry.expires_at; | ||
| 1885 | let restored_remaining = restored_expires_at.saturating_duration_since(Instant::now()); | ||
| 1886 | |||
| 1887 | // Remaining time should be approximately the same (accounting for downtime) | ||
| 1888 | // Allow 2000ms tolerance for test execution time and sleep duration | ||
| 1889 | let diff = if restored_remaining > original_remaining { | ||
| 1890 | restored_remaining.as_millis() - original_remaining.as_millis() | ||
| 1891 | } else { | ||
| 1892 | original_remaining.as_millis() - restored_remaining.as_millis() | ||
| 1893 | }; | ||
| 1894 | |||
| 1895 | assert!( | ||
| 1896 | diff < 2000, | ||
| 1897 | "Downtime calculation should preserve remaining TTL. Original: {}ms, Restored: {}ms, Diff: {}ms", | ||
| 1898 | original_remaining.as_millis(), | ||
| 1899 | restored_remaining.as_millis(), | ||
| 1900 | diff | ||
| 1901 | ); | ||
| 1902 | } | ||
| 1903 | |||
| 1904 | #[tokio::test] | ||
| 1905 | async fn test_expiry_times_preserved() { | ||
| 1906 | use tempfile::tempdir; | ||
| 1907 | |||
| 1908 | let temp_dir = tempdir().unwrap(); | ||
| 1909 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1910 | |||
| 1911 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1912 | let keys = Keys::generate(); | ||
| 1913 | |||
| 1914 | // Add state event | ||
| 1915 | let event = EventBuilder::text_note("test") | ||
| 1916 | .sign_with_keys(&keys) | ||
| 1917 | .unwrap(); | ||
| 1918 | |||
| 1919 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | ||
| 1920 | |||
| 1921 | // Manually set expiry to a specific time in the future | ||
| 1922 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | ||
| 1923 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | ||
| 1924 | for entry in entries.iter_mut() { | ||
| 1925 | entry.expires_at = custom_expiry; | ||
| 1926 | } | ||
| 1927 | } | ||
| 1928 | |||
| 1929 | // Save to disk | ||
| 1930 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1931 | |||
| 1932 | // Create new purgatory and restore | ||
| 1933 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1934 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1935 | |||
| 1936 | // Get restored expiry time | ||
| 1937 | let restored_entries = purgatory2.find_state("repo"); | ||
| 1938 | let restored_entry = &restored_entries[0]; | ||
| 1939 | let restored_remaining = restored_entry | ||
| 1940 | .expires_at | ||
| 1941 | .saturating_duration_since(Instant::now()); | ||
| 1942 | |||
| 1943 | // Should be approximately 600 seconds (allow 3 second tolerance for test execution) | ||
| 1944 | assert!( | ||
| 1945 | restored_remaining.as_secs() >= 597 && restored_remaining.as_secs() <= 603, | ||
| 1946 | "Expected ~600s remaining, got {}s", | ||
| 1947 | restored_remaining.as_secs() | ||
| 1948 | ); | ||
| 1949 | } | ||
| 1950 | |||
| 1951 | #[tokio::test] | ||
| 1952 | async fn test_multiple_state_events_same_identifier() { | ||
| 1953 | use tempfile::tempdir; | ||
| 1954 | |||
| 1955 | let temp_dir = tempdir().unwrap(); | ||
| 1956 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 1957 | |||
| 1958 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1959 | let keys1 = Keys::generate(); | ||
| 1960 | let keys2 = Keys::generate(); | ||
| 1961 | let keys3 = Keys::generate(); | ||
| 1962 | |||
| 1963 | // Add multiple state events for the same identifier from different authors | ||
| 1964 | let event1 = EventBuilder::text_note("maintainer 1") | ||
| 1965 | .sign_with_keys(&keys1) | ||
| 1966 | .unwrap(); | ||
| 1967 | let event2 = EventBuilder::text_note("maintainer 2") | ||
| 1968 | .sign_with_keys(&keys2) | ||
| 1969 | .unwrap(); | ||
| 1970 | let event3 = EventBuilder::text_note("maintainer 3") | ||
| 1971 | .sign_with_keys(&keys3) | ||
| 1972 | .unwrap(); | ||
| 1973 | |||
| 1974 | purgatory.add_state( | ||
| 1975 | event1.clone(), | ||
| 1976 | "shared-repo".to_string(), | ||
| 1977 | keys1.public_key(), | ||
| 1978 | ); | ||
| 1979 | purgatory.add_state( | ||
| 1980 | event2.clone(), | ||
| 1981 | "shared-repo".to_string(), | ||
| 1982 | keys2.public_key(), | ||
| 1983 | ); | ||
| 1984 | purgatory.add_state( | ||
| 1985 | event3.clone(), | ||
| 1986 | "shared-repo".to_string(), | ||
| 1987 | keys3.public_key(), | ||
| 1988 | ); | ||
| 1989 | |||
| 1990 | // Save to disk | ||
| 1991 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 1992 | |||
| 1993 | // Create new purgatory and restore | ||
| 1994 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 1995 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 1996 | |||
| 1997 | // Verify all three events were restored | ||
| 1998 | let restored_entries = purgatory2.find_state("shared-repo"); | ||
| 1999 | assert_eq!(restored_entries.len(), 3); | ||
| 2000 | |||
| 2001 | // Verify all authors are present | ||
| 2002 | let authors: Vec<PublicKey> = restored_entries.iter().map(|e| e.author).collect(); | ||
| 2003 | assert!(authors.contains(&keys1.public_key())); | ||
| 2004 | assert!(authors.contains(&keys2.public_key())); | ||
| 2005 | assert!(authors.contains(&keys3.public_key())); | ||
| 2006 | } | ||
| 2007 | |||
| 2008 | #[tokio::test] | ||
| 2009 | async fn test_mixed_pr_events_and_placeholders() { | ||
| 2010 | use nostr_sdk::{Kind, Tag, TagKind}; | ||
| 2011 | use tempfile::tempdir; | ||
| 2012 | |||
| 2013 | let temp_dir = tempdir().unwrap(); | ||
| 2014 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2015 | |||
| 2016 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2017 | let keys = Keys::generate(); | ||
| 2018 | |||
| 2019 | // Add PR event with actual event | ||
| 2020 | let tags = vec![Tag::custom( | ||
| 2021 | TagKind::Custom("a".into()), | ||
| 2022 | vec!["30617:abc123:test-repo".to_string()], | ||
| 2023 | )]; | ||
| 2024 | |||
| 2025 | let pr_event = EventBuilder::new(Kind::from(1618), "PR content") | ||
| 2026 | .tags(tags) | ||
| 2027 | .sign_with_keys(&keys) | ||
| 2028 | .unwrap(); | ||
| 2029 | |||
| 2030 | purgatory.add_pr( | ||
| 2031 | pr_event.clone(), | ||
| 2032 | "pr-with-event".to_string(), | ||
| 2033 | "commit-abc".to_string(), | ||
| 2034 | ); | ||
| 2035 | |||
| 2036 | // Add PR placeholder | ||
| 2037 | purgatory.add_pr_placeholder("pr-placeholder".to_string(), "commit-def".to_string()); | ||
| 2038 | |||
| 2039 | // Save to disk | ||
| 2040 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2041 | |||
| 2042 | // Create new purgatory and restore | ||
| 2043 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2044 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2045 | |||
| 2046 | // Verify both were restored correctly | ||
| 2047 | let (_, pr_count) = purgatory2.count(); | ||
| 2048 | assert_eq!(pr_count, 2); | ||
| 2049 | |||
| 2050 | // Verify PR event | ||
| 2051 | let pr_entry = purgatory2.find_pr("pr-with-event").unwrap(); | ||
| 2052 | assert!(pr_entry.event.is_some()); | ||
| 2053 | assert_eq!(pr_entry.commit, "commit-abc"); | ||
| 2054 | |||
| 2055 | // Verify placeholder | ||
| 2056 | let placeholder_entry = purgatory2.find_pr("pr-placeholder").unwrap(); | ||
| 2057 | assert!(placeholder_entry.event.is_none()); | ||
| 2058 | assert_eq!(placeholder_entry.commit, "commit-def"); | ||
| 2059 | assert_eq!( | ||
| 2060 | purgatory2.find_pr_placeholder("pr-placeholder"), | ||
| 2061 | Some("commit-def".to_string()) | ||
| 2062 | ); | ||
| 2063 | } | ||
| 2064 | |||
| 2065 | #[tokio::test] | ||
| 2066 | async fn test_file_cleanup_after_successful_restore() { | ||
| 2067 | use tempfile::tempdir; | ||
| 2068 | |||
| 2069 | let temp_dir = tempdir().unwrap(); | ||
| 2070 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2071 | |||
| 2072 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2073 | let keys = Keys::generate(); | ||
| 2074 | |||
| 2075 | // Add some data | ||
| 2076 | let event = EventBuilder::text_note("test") | ||
| 2077 | .sign_with_keys(&keys) | ||
| 2078 | .unwrap(); | ||
| 2079 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | ||
| 2080 | |||
| 2081 | // Save to disk | ||
| 2082 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2083 | assert!(state_file.exists()); | ||
| 2084 | |||
| 2085 | // Restore | ||
| 2086 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2087 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2088 | |||
| 2089 | // File should be deleted after successful restore | ||
| 2090 | assert!(!state_file.exists()); | ||
| 2091 | } | ||
| 2092 | |||
| 2093 | #[tokio::test] | ||
| 2094 | async fn test_comprehensive_roundtrip() { | ||
| 2095 | use nostr_sdk::{Kind, Tag, TagKind}; | ||
| 2096 | use tempfile::tempdir; | ||
| 2097 | |||
| 2098 | let temp_dir = tempdir().unwrap(); | ||
| 2099 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2100 | |||
| 2101 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2102 | let keys1 = Keys::generate(); | ||
| 2103 | let keys2 = Keys::generate(); | ||
| 2104 | |||
| 2105 | // Add multiple state events | ||
| 2106 | let state1 = EventBuilder::text_note("state 1") | ||
| 2107 | .sign_with_keys(&keys1) | ||
| 2108 | .unwrap(); | ||
| 2109 | let state2 = EventBuilder::text_note("state 2") | ||
| 2110 | .sign_with_keys(&keys2) | ||
| 2111 | .unwrap(); | ||
| 2112 | |||
| 2113 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | ||
| 2114 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | ||
| 2115 | |||
| 2116 | // Add PR event | ||
| 2117 | let tags = vec![Tag::custom( | ||
| 2118 | TagKind::Custom("a".into()), | ||
| 2119 | vec!["30617:abc123:repo1".to_string()], | ||
| 2120 | )]; | ||
| 2121 | let pr_event = EventBuilder::new(Kind::from(1618), "PR") | ||
| 2122 | .tags(tags) | ||
| 2123 | .sign_with_keys(&keys1) | ||
| 2124 | .unwrap(); | ||
| 2125 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | ||
| 2126 | |||
| 2127 | // Add PR placeholder | ||
| 2128 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | ||
| 2129 | |||
| 2130 | // Add and expire an event | ||
| 2131 | let expired_event = EventBuilder::text_note("expired") | ||
| 2132 | .sign_with_keys(&keys1) | ||
| 2133 | .unwrap(); | ||
| 2134 | let expired_id = expired_event.id; | ||
| 2135 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | ||
| 2136 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | ||
| 2137 | for entry in entries.iter_mut() { | ||
| 2138 | entry.expires_at = Instant::now() - Duration::from_secs(1); | ||
| 2139 | } | ||
| 2140 | } | ||
| 2141 | purgatory.cleanup(); | ||
| 2142 | |||
| 2143 | // Verify initial state | ||
| 2144 | let (state_count, pr_count) = purgatory.count(); | ||
| 2145 | assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) | ||
| 2146 | assert_eq!(pr_count, 2); // pr-1, pr-2 | ||
| 2147 | assert_eq!(purgatory.expired_count(), 1); // expired_event | ||
| 2148 | |||
| 2149 | // Save to disk | ||
| 2150 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2151 | |||
| 2152 | // Create new purgatory and restore | ||
| 2153 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2154 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2155 | |||
| 2156 | // Verify all data was restored correctly | ||
| 2157 | let (state_count2, pr_count2) = purgatory2.count(); | ||
| 2158 | assert_eq!(state_count2, 2); | ||
| 2159 | assert_eq!(pr_count2, 2); | ||
| 2160 | assert_eq!(purgatory2.expired_count(), 1); | ||
| 2161 | |||
| 2162 | // Verify state events | ||
| 2163 | assert_eq!(purgatory2.find_state("repo1").len(), 1); | ||
| 2164 | assert_eq!(purgatory2.find_state("repo2").len(), 1); | ||
| 2165 | |||
| 2166 | // Verify PR events | ||
| 2167 | assert!(purgatory2.find_pr("pr-1").unwrap().event.is_some()); | ||
| 2168 | assert!(purgatory2.find_pr("pr-2").unwrap().event.is_none()); | ||
| 2169 | |||
| 2170 | // Verify expired event | ||
| 2171 | assert!(purgatory2.is_expired(&expired_id)); | ||
| 2172 | } | ||
diff --git a/src/purgatory/persistence.rs b/src/purgatory/persistence.rs new file mode 100644 index 0000000..7fca2cf --- /dev/null +++ b/src/purgatory/persistence.rs | |||
| @@ -0,0 +1,198 @@ | |||
| 1 | //! Persistence utilities for purgatory state. | ||
| 2 | //! | ||
| 3 | //! This module provides conversion functions between `Instant` (which cannot be | ||
| 4 | //! serialized) and `Duration` offsets from a reference `SystemTime`. This allows | ||
| 5 | //! purgatory state to be persisted to disk and restored across restarts. | ||
| 6 | //! | ||
| 7 | //! ## Time Handling | ||
| 8 | //! | ||
| 9 | //! - `Instant` is monotonic but cannot be serialized | ||
| 10 | //! - `SystemTime` can be serialized but may go backwards (NTP, user changes) | ||
| 11 | //! - We use `SystemTime` for persistence and convert to/from `Instant` at runtime | ||
| 12 | //! - Downtime is accounted for when restoring state (elapsed time is preserved) | ||
| 13 | |||
| 14 | use std::time::{Duration, Instant, SystemTime}; | ||
| 15 | |||
| 16 | /// Convert an `Instant` to a `Duration` offset from a reference `SystemTime`. | ||
| 17 | /// | ||
| 18 | /// This allows storing an `Instant` as a serializable offset that can be | ||
| 19 | /// restored later, accounting for system downtime. | ||
| 20 | /// | ||
| 21 | /// # Arguments | ||
| 22 | /// * `instant` - The `Instant` to convert | ||
| 23 | /// * `reference_time` - The reference `SystemTime` (typically SystemTime::now()) | ||
| 24 | /// * `reference_instant` - The corresponding `Instant` (typically Instant::now()) | ||
| 25 | /// | ||
| 26 | /// # Returns | ||
| 27 | /// Duration offset from the reference time | ||
| 28 | /// | ||
| 29 | /// # Example | ||
| 30 | /// ``` | ||
| 31 | /// use std::time::{Duration, Instant, SystemTime}; | ||
| 32 | /// use ngit_grasp::purgatory::persistence::instant_to_offset; | ||
| 33 | /// | ||
| 34 | /// let now_system = SystemTime::now(); | ||
| 35 | /// let now_instant = Instant::now(); | ||
| 36 | /// let future = now_instant + Duration::from_secs(60); | ||
| 37 | /// | ||
| 38 | /// let offset = instant_to_offset(future, now_system, now_instant); | ||
| 39 | /// assert!(offset.as_secs() >= 60); | ||
| 40 | /// ``` | ||
| 41 | pub fn instant_to_offset( | ||
| 42 | instant: Instant, | ||
| 43 | _reference_time: SystemTime, | ||
| 44 | reference_instant: Instant, | ||
| 45 | ) -> Duration { | ||
| 46 | if instant >= reference_instant { | ||
| 47 | // Future instant - return positive offset | ||
| 48 | instant.duration_since(reference_instant) | ||
| 49 | } else { | ||
| 50 | // Past instant - this shouldn't happen in normal operation, | ||
| 51 | // but we handle it by returning zero duration | ||
| 52 | Duration::ZERO | ||
| 53 | } | ||
| 54 | } | ||
| 55 | |||
| 56 | /// Convert a `Duration` offset back to an `Instant`, accounting for downtime. | ||
| 57 | /// | ||
| 58 | /// This restores an `Instant` from a serialized offset, adjusting for the time | ||
| 59 | /// that has elapsed since the state was saved. | ||
| 60 | /// | ||
| 61 | /// # Arguments | ||
| 62 | /// * `offset` - The duration offset from the saved reference time | ||
| 63 | /// * `saved_at` - The `SystemTime` when the state was saved | ||
| 64 | /// * `reference_instant` - The current `Instant` (typically Instant::now()) | ||
| 65 | /// | ||
| 66 | /// # Returns | ||
| 67 | /// The restored `Instant`, adjusted for downtime | ||
| 68 | /// | ||
| 69 | /// # Example | ||
| 70 | /// ``` | ||
| 71 | /// use std::time::{Duration, Instant, SystemTime}; | ||
| 72 | /// use ngit_grasp::purgatory::persistence::offset_to_instant; | ||
| 73 | /// | ||
| 74 | /// let saved_at = SystemTime::now(); | ||
| 75 | /// let offset = Duration::from_secs(60); | ||
| 76 | /// let now_instant = Instant::now(); | ||
| 77 | /// | ||
| 78 | /// let restored = offset_to_instant(offset, saved_at, now_instant); | ||
| 79 | /// // restored will be approximately now_instant + 60 seconds | ||
| 80 | /// ``` | ||
| 81 | pub fn offset_to_instant( | ||
| 82 | offset: Duration, | ||
| 83 | saved_at: SystemTime, | ||
| 84 | reference_instant: Instant, | ||
| 85 | ) -> Instant { | ||
| 86 | // Calculate how much time has elapsed since the state was saved | ||
| 87 | let now_system = SystemTime::now(); | ||
| 88 | let elapsed_since_save = now_system | ||
| 89 | .duration_since(saved_at) | ||
| 90 | .unwrap_or(Duration::ZERO); | ||
| 91 | |||
| 92 | // The original deadline was: saved_at + offset | ||
| 93 | // Time remaining = (saved_at + offset) - now_system | ||
| 94 | // = offset - elapsed_since_save | ||
| 95 | |||
| 96 | if offset > elapsed_since_save { | ||
| 97 | // Deadline is still in the future | ||
| 98 | let remaining = offset - elapsed_since_save; | ||
| 99 | reference_instant + remaining | ||
| 100 | } else { | ||
| 101 | // Deadline has already passed or is right now | ||
| 102 | reference_instant | ||
| 103 | } | ||
| 104 | } | ||
| 105 | |||
| 106 | #[cfg(test)] | ||
| 107 | mod tests { | ||
| 108 | use super::*; | ||
| 109 | use std::thread; | ||
| 110 | use std::time::Duration; | ||
| 111 | |||
| 112 | #[test] | ||
| 113 | fn test_instant_to_offset_future() { | ||
| 114 | let now_system = SystemTime::now(); | ||
| 115 | let now_instant = Instant::now(); | ||
| 116 | let future = now_instant + Duration::from_secs(60); | ||
| 117 | |||
| 118 | let offset = instant_to_offset(future, now_system, now_instant); | ||
| 119 | |||
| 120 | // Should be approximately 60 seconds (within tolerance) | ||
| 121 | assert!(offset.as_secs() >= 59 && offset.as_secs() <= 61); | ||
| 122 | } | ||
| 123 | |||
| 124 | #[test] | ||
| 125 | fn test_instant_to_offset_past() { | ||
| 126 | let now_system = SystemTime::now(); | ||
| 127 | let past_instant = Instant::now(); | ||
| 128 | // Simulate some time passing | ||
| 129 | thread::sleep(Duration::from_millis(10)); | ||
| 130 | let now_instant = Instant::now(); | ||
| 131 | |||
| 132 | let offset = instant_to_offset(past_instant, now_system, now_instant); | ||
| 133 | |||
| 134 | // Past instants return zero duration | ||
| 135 | assert_eq!(offset, Duration::ZERO); | ||
| 136 | } | ||
| 137 | |||
| 138 | #[test] | ||
| 139 | fn test_offset_to_instant_with_time_remaining() { | ||
| 140 | let saved_at = SystemTime::now(); | ||
| 141 | let offset = Duration::from_secs(60); | ||
| 142 | |||
| 143 | // Simulate a very short downtime (< 10ms) | ||
| 144 | thread::sleep(Duration::from_millis(5)); | ||
| 145 | |||
| 146 | let now_instant = Instant::now(); | ||
| 147 | let restored = offset_to_instant(offset, saved_at, now_instant); | ||
| 148 | |||
| 149 | // Should be approximately 60 seconds in the future | ||
| 150 | let remaining = restored.duration_since(now_instant); | ||
| 151 | assert!( | ||
| 152 | remaining.as_secs() >= 59 && remaining.as_secs() <= 61, | ||
| 153 | "Expected ~60s, got {}s", | ||
| 154 | remaining.as_secs() | ||
| 155 | ); | ||
| 156 | } | ||
| 157 | |||
| 158 | #[test] | ||
| 159 | fn test_offset_to_instant_deadline_passed() { | ||
| 160 | // Simulate state saved 70 seconds ago with 60 second offset | ||
| 161 | let saved_at = SystemTime::now() - Duration::from_secs(70); | ||
| 162 | let offset = Duration::from_secs(60); | ||
| 163 | |||
| 164 | let now_instant = Instant::now(); | ||
| 165 | let restored = offset_to_instant(offset, saved_at, now_instant); | ||
| 166 | |||
| 167 | // Deadline has passed, should be now or in the past | ||
| 168 | let remaining = restored.saturating_duration_since(now_instant); | ||
| 169 | assert_eq!(remaining, Duration::ZERO); | ||
| 170 | } | ||
| 171 | |||
| 172 | #[test] | ||
| 173 | fn test_round_trip_conversion() { | ||
| 174 | let now_system = SystemTime::now(); | ||
| 175 | let now_instant = Instant::now(); | ||
| 176 | let future = now_instant + Duration::from_secs(120); | ||
| 177 | |||
| 178 | // Convert to offset | ||
| 179 | let offset = instant_to_offset(future, now_system, now_instant); | ||
| 180 | |||
| 181 | // Immediately convert back (minimal downtime) | ||
| 182 | let restored = offset_to_instant(offset, now_system, now_instant); | ||
| 183 | |||
| 184 | // Should be very close to the original future instant | ||
| 185 | let diff = if restored > future { | ||
| 186 | restored.duration_since(future) | ||
| 187 | } else { | ||
| 188 | future.duration_since(restored) | ||
| 189 | }; | ||
| 190 | |||
| 191 | // Allow for small timing differences (< 100ms) | ||
| 192 | assert!( | ||
| 193 | diff < Duration::from_millis(100), | ||
| 194 | "Round trip should preserve instant within 100ms, got {}ms", | ||
| 195 | diff.as_millis() | ||
| 196 | ); | ||
| 197 | } | ||
| 198 | } | ||
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 9c47616..919504b 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -5,8 +5,14 @@ | |||
| 5 | //! problem where either the nostr event or git push can arrive first. | 5 | //! problem where either the nostr event or git push can arrive first. |
| 6 | 6 | ||
| 7 | use nostr_sdk::prelude::*; | 7 | use nostr_sdk::prelude::*; |
| 8 | use serde::{Deserialize, Serialize}; | ||
| 8 | use std::time::Instant; | 9 | use std::time::Instant; |
| 9 | 10 | ||
| 11 | /// Default value for Instant fields during deserialization | ||
| 12 | fn instant_now() -> Instant { | ||
| 13 | Instant::now() | ||
| 14 | } | ||
| 15 | |||
| 10 | /// A reference name and its target object. | 16 | /// A reference name and its target object. |
| 11 | /// | 17 | /// |
| 12 | /// Used to identify specific git refs (branches, tags) that a state event | 18 | /// Used to identify specific git refs (branches, tags) that a state event |
| @@ -59,7 +65,10 @@ impl RefUpdate { | |||
| 59 | /// State events declare the current state of a repository but may arrive | 65 | /// State events declare the current state of a repository but may arrive |
| 60 | /// before the corresponding git data has been pushed. This entry holds | 66 | /// before the corresponding git data has been pushed. This entry holds |
| 61 | /// the event and associated metadata until the git data arrives. | 67 | /// the event and associated metadata until the git data arrives. |
| 62 | #[derive(Debug, Clone)] | 68 | /// |
| 69 | /// Note: `Instant` fields cannot be serialized directly. Use the `persistence` | ||
| 70 | /// module to convert to/from serializable wrapper types. | ||
| 71 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 63 | pub struct StatePurgatoryEntry { | 72 | pub struct StatePurgatoryEntry { |
| 64 | /// The nostr state event (kind 30618) awaiting git data | 73 | /// The nostr state event (kind 30618) awaiting git data |
| 65 | pub event: Event, | 74 | pub event: Event, |
| @@ -71,9 +80,11 @@ pub struct StatePurgatoryEntry { | |||
| 71 | pub author: PublicKey, | 80 | pub author: PublicKey, |
| 72 | 81 | ||
| 73 | /// When this entry was added to purgatory | 82 | /// When this entry was added to purgatory |
| 83 | #[serde(skip, default = "instant_now")] | ||
| 74 | pub created_at: Instant, | 84 | pub created_at: Instant, |
| 75 | 85 | ||
| 76 | /// Expiry deadline (30 min from creation, may be extended) | 86 | /// Expiry deadline (30 min from creation, may be extended) |
| 87 | #[serde(skip, default = "instant_now")] | ||
| 77 | pub expires_at: Instant, | 88 | pub expires_at: Instant, |
| 78 | } | 89 | } |
| 79 | 90 | ||
| @@ -82,7 +93,10 @@ pub struct StatePurgatoryEntry { | |||
| 82 | /// PR events reference specific commits but may arrive before the git push | 93 | /// PR events reference specific commits but may arrive before the git push |
| 83 | /// containing those commits. Alternatively, a git push may arrive first, | 94 | /// containing those commits. Alternatively, a git push may arrive first, |
| 84 | /// creating a placeholder entry waiting for the corresponding PR event. | 95 | /// creating a placeholder entry waiting for the corresponding PR event. |
| 85 | #[derive(Debug, Clone)] | 96 | /// |
| 97 | /// Note: `Instant` fields cannot be serialized directly. Use the `persistence` | ||
| 98 | /// module to convert to/from serializable wrapper types. | ||
| 99 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 86 | pub struct PrPurgatoryEntry { | 100 | pub struct PrPurgatoryEntry { |
| 87 | /// The nostr PR event, if received (None = git data arrived first) | 101 | /// The nostr PR event, if received (None = git data arrived first) |
| 88 | pub event: Option<Event>, | 102 | pub event: Option<Event>, |
| @@ -92,8 +106,10 @@ pub struct PrPurgatoryEntry { | |||
| 92 | pub commit: String, | 106 | pub commit: String, |
| 93 | 107 | ||
| 94 | /// When this entry was added to purgatory | 108 | /// When this entry was added to purgatory |
| 109 | #[serde(skip, default = "instant_now")] | ||
| 95 | pub created_at: Instant, | 110 | pub created_at: Instant, |
| 96 | 111 | ||
| 97 | /// Expiry deadline (30 min from creation, may be extended) | 112 | /// Expiry deadline (30 min from creation, may be extended) |
| 113 | #[serde(skip, default = "instant_now")] | ||
| 98 | pub expires_at: Instant, | 114 | pub expires_at: Instant, |
| 99 | } | 115 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e2d55bd..3213dfb 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -43,6 +43,7 @@ pub use health::RelayHealthTracker; | |||
| 43 | use tokio::time::sleep; | 43 | use tokio::time::sleep; |
| 44 | 44 | ||
| 45 | use std::collections::{HashMap, HashSet}; | 45 | use std::collections::{HashMap, HashSet}; |
| 46 | use std::path::{Path, PathBuf}; | ||
| 46 | use std::sync::Arc; | 47 | use std::sync::Arc; |
| 47 | use std::time::Duration; | 48 | use std::time::Duration; |
| 48 | 49 | ||
| @@ -581,6 +582,7 @@ impl SyncManager { | |||
| 581 | /// * `write_policy` - Policy for validating events before storage | 582 | /// * `write_policy` - Policy for validating events before storage |
| 582 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) | 583 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) |
| 583 | /// * `config` - Configuration for sync settings | 584 | /// * `config` - Configuration for sync settings |
| 585 | /// * `data_path` - Path to git data directory (for persistence) | ||
| 584 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) | 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) |
| 585 | pub fn new( | 587 | pub fn new( |
| 586 | bootstrap_relay_url: Option<String>, | 588 | bootstrap_relay_url: Option<String>, |
| @@ -589,11 +591,42 @@ impl SyncManager { | |||
| 589 | write_policy: Nip34WritePolicy, | 591 | write_policy: Nip34WritePolicy, |
| 590 | local_relay: LocalRelay, | 592 | local_relay: LocalRelay, |
| 591 | config: &Config, | 593 | config: &Config, |
| 594 | data_path: PathBuf, | ||
| 592 | sync_metrics: Option<SyncMetrics>, | 595 | sync_metrics: Option<SyncMetrics>, |
| 593 | ) -> Self { | 596 | ) -> Self { |
| 594 | // Extract purgatory from write_policy for read-only access | 597 | // Extract purgatory from write_policy for read-only access |
| 595 | let purgatory = write_policy.purgatory().clone(); | 598 | let purgatory = write_policy.purgatory().clone(); |
| 596 | 599 | ||
| 600 | // Create rejected events index | ||
| 601 | let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 602 | RejectedEventsIndex::with_metrics( | ||
| 603 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 604 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 605 | metrics.clone(), | ||
| 606 | ) | ||
| 607 | } else { | ||
| 608 | RejectedEventsIndex::new( | ||
| 609 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 610 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 611 | ) | ||
| 612 | }); | ||
| 613 | |||
| 614 | // Attempt to restore rejected events index from disk | ||
| 615 | let rejected_index_path = data_path.join("rejected-events-cache.json"); | ||
| 616 | if rejected_index_path.exists() { | ||
| 617 | match rejected_events_index.restore_from_disk(&rejected_index_path) { | ||
| 618 | Ok(()) => { | ||
| 619 | tracing::info!("Restored rejected events index from disk"); | ||
| 620 | } | ||
| 621 | Err(e) => { | ||
| 622 | tracing::warn!( | ||
| 623 | "Failed to restore rejected events index: {}, starting empty", | ||
| 624 | e | ||
| 625 | ); | ||
| 626 | } | ||
| 627 | } | ||
| 628 | } | ||
| 629 | |||
| 597 | Self { | 630 | Self { |
| 598 | bootstrap_relay_url, | 631 | bootstrap_relay_url, |
| 599 | service_domain, | 632 | service_domain, |
| @@ -605,18 +638,7 @@ impl SyncManager { | |||
| 605 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 638 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 606 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 639 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 607 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | 640 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 608 | rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { | 641 | rejected_events_index, |
| 609 | RejectedEventsIndex::with_metrics( | ||
| 610 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 611 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 612 | metrics.clone(), | ||
| 613 | ) | ||
| 614 | } else { | ||
| 615 | RejectedEventsIndex::new( | ||
| 616 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 617 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 618 | ) | ||
| 619 | }), | ||
| 620 | connections: HashMap::new(), | 642 | connections: HashMap::new(), |
| 621 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 643 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 622 | next_batch_id: 0, | 644 | next_batch_id: 0, |
| @@ -637,6 +659,31 @@ impl SyncManager { | |||
| 637 | self.next_batch_id | 659 | self.next_batch_id |
| 638 | } | 660 | } |
| 639 | 661 | ||
| 662 | /// Get a clone of the rejected events index Arc. | ||
| 663 | /// | ||
| 664 | /// This allows access to the rejected events index for persistence | ||
| 665 | /// even after the SyncManager has been moved into a task. | ||
| 666 | /// | ||
| 667 | /// # Returns | ||
| 668 | /// Arc clone of the rejected events index | ||
| 669 | pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> { | ||
| 670 | self.rejected_events_index.clone() | ||
| 671 | } | ||
| 672 | |||
| 673 | /// Save rejected events index to disk. | ||
| 674 | /// | ||
| 675 | /// This is called during shutdown to persist the rejected events cache, | ||
| 676 | /// allowing us to avoid re-downloading rejected events after restart. | ||
| 677 | /// | ||
| 678 | /// # Arguments | ||
| 679 | /// * `path` - Path to save the rejected index file | ||
| 680 | /// | ||
| 681 | /// # Returns | ||
| 682 | /// Ok(()) on success, Err if save fails | ||
| 683 | pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 684 | self.rejected_events_index.save_to_disk(path) | ||
| 685 | } | ||
| 686 | |||
| 640 | /// Handle EOSE (End Of Stored Events) for a subscription | 687 | /// Handle EOSE (End Of Stored Events) for a subscription |
| 641 | /// | 688 | /// |
| 642 | /// This method: | 689 | /// This method: |
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 4d31901..f25f22a 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -86,12 +86,14 @@ | |||
| 86 | //! ``` | 86 | //! ``` |
| 87 | 87 | ||
| 88 | use nostr_sdk::{Event, EventId, PublicKey}; | 88 | use nostr_sdk::{Event, EventId, PublicKey}; |
| 89 | use serde::{Deserialize, Serialize}; | ||
| 89 | use std::collections::{HashMap, HashSet}; | 90 | use std::collections::{HashMap, HashSet}; |
| 91 | use std::path::Path; | ||
| 90 | use std::sync::{Arc, RwLock}; | 92 | use std::sync::{Arc, RwLock}; |
| 91 | use std::time::{Duration, Instant}; | 93 | use std::time::{Duration, Instant, SystemTime}; |
| 92 | 94 | ||
| 93 | /// Type of event stored in the rejected events index | 95 | /// Type of event stored in the rejected events index |
| 94 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 96 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] |
| 95 | pub enum EventType { | 97 | pub enum EventType { |
| 96 | /// Repository announcement (kind 30617) | 98 | /// Repository announcement (kind 30617) |
| 97 | Announcement, | 99 | Announcement, |
| @@ -109,7 +111,7 @@ impl std::fmt::Display for EventType { | |||
| 109 | } | 111 | } |
| 110 | 112 | ||
| 111 | /// Reason why a repository announcement was rejected | 113 | /// Reason why a repository announcement was rejected |
| 112 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 114 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] |
| 113 | pub enum RejectionReason { | 115 | pub enum RejectionReason { |
| 114 | /// Announcement doesn't list this service in clone/web URLs | 116 | /// Announcement doesn't list this service in clone/web URLs |
| 115 | DoesNotListService, | 117 | DoesNotListService, |
| @@ -141,6 +143,20 @@ struct HotCacheEntry { | |||
| 141 | cached_at: Instant, | 143 | cached_at: Instant, |
| 142 | } | 144 | } |
| 143 | 145 | ||
| 146 | /// Serializable version of HotCacheEntry for persistence | ||
| 147 | /// | ||
| 148 | /// Converts Instant to Duration offset from saved_at time | ||
| 149 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 150 | struct SerializableHotCacheEntry { | ||
| 151 | event: Event, | ||
| 152 | pubkey: PublicKey, | ||
| 153 | identifier: String, | ||
| 154 | event_type: EventType, | ||
| 155 | reason: RejectionReason, | ||
| 156 | /// Duration since saved_at when this entry was cached | ||
| 157 | cached_at_offset_secs: u64, | ||
| 158 | } | ||
| 159 | |||
| 144 | /// Entry in the cold index (metadata only) | 160 | /// Entry in the cold index (metadata only) |
| 145 | /// | 161 | /// |
| 146 | /// Note: event_id is stored as the HashMap key, not in this struct | 162 | /// Note: event_id is stored as the HashMap key, not in this struct |
| @@ -154,6 +170,49 @@ struct ColdIndexEntry { | |||
| 154 | rejected_at: Instant, | 170 | rejected_at: Instant, |
| 155 | } | 171 | } |
| 156 | 172 | ||
| 173 | /// Serializable version of ColdIndexEntry for persistence | ||
| 174 | /// | ||
| 175 | /// Converts Instant to Duration offset from saved_at time | ||
| 176 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 177 | struct SerializableColdIndexEntry { | ||
| 178 | pubkey: PublicKey, | ||
| 179 | identifier: String, | ||
| 180 | event_type: EventType, | ||
| 181 | reason: RejectionReason, | ||
| 182 | /// Duration since saved_at when this entry was rejected | ||
| 183 | rejected_at_offset_secs: u64, | ||
| 184 | } | ||
| 185 | |||
| 186 | /// Serializable state for hot cache | ||
| 187 | #[derive(Debug, Serialize, Deserialize)] | ||
| 188 | struct SerializableHotCache { | ||
| 189 | expiry_duration_secs: u64, | ||
| 190 | entries: HashMap<EventId, SerializableHotCacheEntry>, | ||
| 191 | } | ||
| 192 | |||
| 193 | /// Serializable state for cold index | ||
| 194 | #[derive(Debug, Serialize, Deserialize)] | ||
| 195 | struct SerializableColdIndex { | ||
| 196 | expiry_duration_secs: u64, | ||
| 197 | entries: HashMap<EventId, SerializableColdIndexEntry>, | ||
| 198 | } | ||
| 199 | |||
| 200 | /// Complete rejected cache state for persistence | ||
| 201 | /// | ||
| 202 | /// Stores both hot cache and cold index with version and timestamp information. | ||
| 203 | /// All Instant fields are converted to Duration offsets from saved_at. | ||
| 204 | #[derive(Debug, Serialize, Deserialize)] | ||
| 205 | struct RejectedCacheState { | ||
| 206 | /// Version for future compatibility | ||
| 207 | version: u32, | ||
| 208 | /// When this state was saved | ||
| 209 | saved_at: SystemTime, | ||
| 210 | /// Hot cache entries with full events | ||
| 211 | hot_cache: SerializableHotCache, | ||
| 212 | /// Cold index entries with metadata only | ||
| 213 | cold_index: SerializableColdIndex, | ||
| 214 | } | ||
| 215 | |||
| 157 | /// Hot cache: Stores full events for immediate re-processing | 216 | /// Hot cache: Stores full events for immediate re-processing |
| 158 | /// | 217 | /// |
| 159 | /// Events are stored for a short duration (default: 2 minutes) to enable | 218 | /// Events are stored for a short duration (default: 2 minutes) to enable |
| @@ -603,6 +662,168 @@ impl RejectedEventsIndex { | |||
| 603 | 662 | ||
| 604 | ids | 663 | ids |
| 605 | } | 664 | } |
| 665 | |||
| 666 | /// Save rejected events cache to disk | ||
| 667 | /// | ||
| 668 | /// Serializes both hot cache and cold index to JSON, converting Instant timestamps | ||
| 669 | /// to Duration offsets from the save time. This allows timestamps to be adjusted | ||
| 670 | /// for downtime when restored. | ||
| 671 | /// | ||
| 672 | /// # Arguments | ||
| 673 | /// | ||
| 674 | /// * `path` - File path to write the serialized state to | ||
| 675 | /// | ||
| 676 | /// # Returns | ||
| 677 | /// | ||
| 678 | /// Ok(()) on success, or an error if serialization or file write fails | ||
| 679 | pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 680 | let saved_at = SystemTime::now(); | ||
| 681 | let now = Instant::now(); | ||
| 682 | |||
| 683 | // Lock both caches for consistent snapshot | ||
| 684 | let hot_entries = self.hot_cache.entries.read().unwrap(); | ||
| 685 | let cold_entries = self.cold_index.entries.read().unwrap(); | ||
| 686 | |||
| 687 | // Convert hot cache entries to serializable format | ||
| 688 | let serializable_hot_entries: HashMap<EventId, SerializableHotCacheEntry> = hot_entries | ||
| 689 | .iter() | ||
| 690 | .map(|(event_id, entry)| { | ||
| 691 | let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs(); | ||
| 692 | |||
| 693 | let serializable_entry = SerializableHotCacheEntry { | ||
| 694 | event: entry.event.clone(), | ||
| 695 | pubkey: entry.pubkey, | ||
| 696 | identifier: entry.identifier.clone(), | ||
| 697 | event_type: entry.event_type, | ||
| 698 | reason: entry.reason, | ||
| 699 | cached_at_offset_secs, | ||
| 700 | }; | ||
| 701 | |||
| 702 | (*event_id, serializable_entry) | ||
| 703 | }) | ||
| 704 | .collect(); | ||
| 705 | |||
| 706 | // Convert cold index entries to serializable format | ||
| 707 | let serializable_cold_entries: HashMap<EventId, SerializableColdIndexEntry> = cold_entries | ||
| 708 | .iter() | ||
| 709 | .map(|(event_id, entry)| { | ||
| 710 | let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs(); | ||
| 711 | |||
| 712 | let serializable_entry = SerializableColdIndexEntry { | ||
| 713 | pubkey: entry.pubkey, | ||
| 714 | identifier: entry.identifier.clone(), | ||
| 715 | event_type: entry.event_type, | ||
| 716 | reason: entry.reason, | ||
| 717 | rejected_at_offset_secs, | ||
| 718 | }; | ||
| 719 | |||
| 720 | (*event_id, serializable_entry) | ||
| 721 | }) | ||
| 722 | .collect(); | ||
| 723 | |||
| 724 | // Create complete state | ||
| 725 | let state = RejectedCacheState { | ||
| 726 | version: 1, | ||
| 727 | saved_at, | ||
| 728 | hot_cache: SerializableHotCache { | ||
| 729 | expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(), | ||
| 730 | entries: serializable_hot_entries, | ||
| 731 | }, | ||
| 732 | cold_index: SerializableColdIndex { | ||
| 733 | expiry_duration_secs: self.cold_index.expiry_duration.as_secs(), | ||
| 734 | entries: serializable_cold_entries, | ||
| 735 | }, | ||
| 736 | }; | ||
| 737 | |||
| 738 | // Serialize to JSON and write to file | ||
| 739 | let json = serde_json::to_string_pretty(&state)?; | ||
| 740 | std::fs::write(path, json)?; | ||
| 741 | |||
| 742 | Ok(()) | ||
| 743 | } | ||
| 744 | |||
| 745 | /// Restore rejected events cache from disk | ||
| 746 | /// | ||
| 747 | /// Loads the serialized state from disk and populates both hot cache and cold index. | ||
| 748 | /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain | ||
| 749 | /// correct expiry behavior. Deletes the state file after successful restore. | ||
| 750 | /// | ||
| 751 | /// # Arguments | ||
| 752 | /// | ||
| 753 | /// * `path` - File path to read the serialized state from | ||
| 754 | /// | ||
| 755 | /// # Returns | ||
| 756 | /// | ||
| 757 | /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails | ||
| 758 | pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 759 | // Load and parse JSON | ||
| 760 | let json = std::fs::read_to_string(path)?; | ||
| 761 | let state: RejectedCacheState = serde_json::from_str(&json)?; | ||
| 762 | |||
| 763 | // Calculate downtime (how long the relay was offline) | ||
| 764 | let now_system = SystemTime::now(); | ||
| 765 | let downtime = now_system | ||
| 766 | .duration_since(state.saved_at) | ||
| 767 | .unwrap_or(Duration::ZERO); | ||
| 768 | |||
| 769 | let now_instant = Instant::now(); | ||
| 770 | |||
| 771 | // Lock both caches for restoration | ||
| 772 | let mut hot_entries = self.hot_cache.entries.write().unwrap(); | ||
| 773 | let mut cold_entries = self.cold_index.entries.write().unwrap(); | ||
| 774 | |||
| 775 | // Restore hot cache entries | ||
| 776 | for (event_id, serializable_entry) in state.hot_cache.entries { | ||
| 777 | // Reconstruct cached_at by extending the offset by downtime | ||
| 778 | // Original offset (how long ago it was cached when saved) | ||
| 779 | let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs); | ||
| 780 | // Total offset including downtime | ||
| 781 | let total_offset = original_offset + downtime; | ||
| 782 | |||
| 783 | // cached_at = now - total_offset | ||
| 784 | let cached_at = now_instant - total_offset; | ||
| 785 | |||
| 786 | let entry = HotCacheEntry { | ||
| 787 | event: serializable_entry.event, | ||
| 788 | pubkey: serializable_entry.pubkey, | ||
| 789 | identifier: serializable_entry.identifier, | ||
| 790 | event_type: serializable_entry.event_type, | ||
| 791 | reason: serializable_entry.reason, | ||
| 792 | cached_at, | ||
| 793 | }; | ||
| 794 | |||
| 795 | hot_entries.insert(event_id, entry); | ||
| 796 | } | ||
| 797 | |||
| 798 | // Restore cold index entries | ||
| 799 | for (event_id, serializable_entry) in state.cold_index.entries { | ||
| 800 | // Reconstruct rejected_at by extending the offset by downtime | ||
| 801 | let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs); | ||
| 802 | let total_offset = original_offset + downtime; | ||
| 803 | |||
| 804 | // rejected_at = now - total_offset | ||
| 805 | let rejected_at = now_instant - total_offset; | ||
| 806 | |||
| 807 | let entry = ColdIndexEntry { | ||
| 808 | pubkey: serializable_entry.pubkey, | ||
| 809 | identifier: serializable_entry.identifier, | ||
| 810 | event_type: serializable_entry.event_type, | ||
| 811 | reason: serializable_entry.reason, | ||
| 812 | rejected_at, | ||
| 813 | }; | ||
| 814 | |||
| 815 | cold_entries.insert(event_id, entry); | ||
| 816 | } | ||
| 817 | |||
| 818 | // Release locks before deleting file | ||
| 819 | drop(hot_entries); | ||
| 820 | drop(cold_entries); | ||
| 821 | |||
| 822 | // Delete the state file after successful restore | ||
| 823 | std::fs::remove_file(path)?; | ||
| 824 | |||
| 825 | Ok(()) | ||
| 826 | } | ||
| 606 | } | 827 | } |
| 607 | 828 | ||
| 608 | #[cfg(test)] | 829 | #[cfg(test)] |
| @@ -956,4 +1177,503 @@ mod tests { | |||
| 956 | // Cold index now empty | 1177 | // Cold index now empty |
| 957 | assert_eq!(index.cold_index_len(), 0); | 1178 | assert_eq!(index.cold_index_len(), 0); |
| 958 | } | 1179 | } |
| 1180 | |||
| 1181 | // ======================================================================== | ||
| 1182 | // Persistence Serialization Tests | ||
| 1183 | // ======================================================================== | ||
| 1184 | |||
| 1185 | #[tokio::test] | ||
| 1186 | async fn test_save_and_restore_hot_cache_roundtrip() { | ||
| 1187 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1188 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1189 | |||
| 1190 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1191 | let event = create_test_event().await; | ||
| 1192 | let pubkey = event.pubkey; | ||
| 1193 | let identifier = "test-repo".to_string(); | ||
| 1194 | |||
| 1195 | // Add event to hot cache | ||
| 1196 | index.add_announcement( | ||
| 1197 | event.clone(), | ||
| 1198 | pubkey, | ||
| 1199 | identifier.clone(), | ||
| 1200 | RejectionReason::DoesNotListService, | ||
| 1201 | ); | ||
| 1202 | |||
| 1203 | assert_eq!(index.hot_cache_len(), 1); | ||
| 1204 | assert_eq!(index.cold_index_len(), 1); | ||
| 1205 | |||
| 1206 | // Save to disk | ||
| 1207 | index.save_to_disk(&state_path).unwrap(); | ||
| 1208 | assert!(state_path.exists()); | ||
| 1209 | |||
| 1210 | // Create new index and restore | ||
| 1211 | let index2 = | ||
| 1212 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1213 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1214 | |||
| 1215 | // Verify state file was deleted after restore | ||
| 1216 | assert!(!state_path.exists()); | ||
| 1217 | |||
| 1218 | // Verify hot cache restored | ||
| 1219 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1220 | assert!(index2.hot_cache.contains(&event.id)); | ||
| 1221 | |||
| 1222 | // Verify cold index restored | ||
| 1223 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1224 | assert!(index2.cold_index.contains(&event.id)); | ||
| 1225 | |||
| 1226 | // Verify we can retrieve the event | ||
| 1227 | let events = index2 | ||
| 1228 | .hot_cache | ||
| 1229 | .get_maintainer_events(&pubkey, &identifier, None); | ||
| 1230 | assert_eq!(events.len(), 1); | ||
| 1231 | assert_eq!(events[0].id, event.id); | ||
| 1232 | } | ||
| 1233 | |||
| 1234 | #[tokio::test] | ||
| 1235 | async fn test_save_and_restore_cold_index_only() { | ||
| 1236 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1237 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1238 | |||
| 1239 | let index = RejectedEventsIndex::new( | ||
| 1240 | Duration::from_millis(50), // Hot cache expires quickly | ||
| 1241 | Duration::from_secs(604800), // Cold index lasts long | ||
| 1242 | ); | ||
| 1243 | let event = create_test_event().await; | ||
| 1244 | |||
| 1245 | // Add event | ||
| 1246 | index.add_announcement( | ||
| 1247 | event.clone(), | ||
| 1248 | event.pubkey, | ||
| 1249 | "test-repo".to_string(), | ||
| 1250 | RejectionReason::MaintainerNotYetValid, | ||
| 1251 | ); | ||
| 1252 | |||
| 1253 | // Wait for hot cache to expire | ||
| 1254 | std::thread::sleep(Duration::from_millis(60)); | ||
| 1255 | index.cleanup_expired_for_type("announcement"); | ||
| 1256 | |||
| 1257 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1258 | assert_eq!(index.cold_index_len(), 1); | ||
| 1259 | |||
| 1260 | // Save to disk | ||
| 1261 | index.save_to_disk(&state_path).unwrap(); | ||
| 1262 | |||
| 1263 | // Restore into new index | ||
| 1264 | let index2 = | ||
| 1265 | RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800)); | ||
| 1266 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1267 | |||
| 1268 | // Verify only cold index restored (hot cache was empty) | ||
| 1269 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 1270 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1271 | assert!(index2.cold_index.contains(&event.id)); | ||
| 1272 | } | ||
| 1273 | |||
| 1274 | #[tokio::test] | ||
| 1275 | async fn test_save_and_restore_both_hot_and_cold() { | ||
| 1276 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1277 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1278 | |||
| 1279 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1280 | let keys = Keys::generate(); | ||
| 1281 | |||
| 1282 | // Create two events | ||
| 1283 | let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); | ||
| 1284 | let event1 = keys.sign_event(unsigned1).await.unwrap(); | ||
| 1285 | |||
| 1286 | let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); | ||
| 1287 | let event2 = keys.sign_event(unsigned2).await.unwrap(); | ||
| 1288 | |||
| 1289 | // Add both events | ||
| 1290 | index.add_announcement( | ||
| 1291 | event1.clone(), | ||
| 1292 | event1.pubkey, | ||
| 1293 | "repo1".to_string(), | ||
| 1294 | RejectionReason::DoesNotListService, | ||
| 1295 | ); | ||
| 1296 | |||
| 1297 | index.add_state( | ||
| 1298 | event2.clone(), | ||
| 1299 | event2.pubkey, | ||
| 1300 | "repo2".to_string(), | ||
| 1301 | RejectionReason::Other, | ||
| 1302 | ); | ||
| 1303 | |||
| 1304 | assert_eq!(index.hot_cache_len(), 2); | ||
| 1305 | assert_eq!(index.cold_index_len(), 2); | ||
| 1306 | |||
| 1307 | // Save to disk | ||
| 1308 | index.save_to_disk(&state_path).unwrap(); | ||
| 1309 | |||
| 1310 | // Restore into new index | ||
| 1311 | let index2 = | ||
| 1312 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1313 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1314 | |||
| 1315 | // Verify both caches restored | ||
| 1316 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1317 | assert_eq!(index2.cold_index_len(), 2); | ||
| 1318 | assert!(index2.contains(&event1.id)); | ||
| 1319 | assert!(index2.contains(&event2.id)); | ||
| 1320 | } | ||
| 1321 | |||
| 1322 | #[tokio::test] | ||
| 1323 | async fn test_save_and_restore_empty_cache() { | ||
| 1324 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1325 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1326 | |||
| 1327 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1328 | |||
| 1329 | // Save empty cache | ||
| 1330 | index.save_to_disk(&state_path).unwrap(); | ||
| 1331 | assert!(state_path.exists()); | ||
| 1332 | |||
| 1333 | // Restore into new index | ||
| 1334 | let index2 = | ||
| 1335 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1336 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1337 | |||
| 1338 | // Verify empty state restored | ||
| 1339 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 1340 | assert_eq!(index2.cold_index_len(), 0); | ||
| 1341 | } | ||
| 1342 | |||
| 1343 | #[tokio::test] | ||
| 1344 | async fn test_restore_missing_file() { | ||
| 1345 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1346 | let state_path = temp_dir.path().join("nonexistent.json"); | ||
| 1347 | |||
| 1348 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1349 | |||
| 1350 | // Attempting to restore missing file should return error | ||
| 1351 | let result = index.restore_from_disk(&state_path); | ||
| 1352 | assert!(result.is_err()); | ||
| 1353 | |||
| 1354 | // Index should remain empty | ||
| 1355 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1356 | assert_eq!(index.cold_index_len(), 0); | ||
| 1357 | } | ||
| 1358 | |||
| 1359 | #[tokio::test] | ||
| 1360 | async fn test_restore_corrupted_json() { | ||
| 1361 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1362 | let state_path = temp_dir.path().join("corrupted.json"); | ||
| 1363 | |||
| 1364 | // Write corrupted JSON | ||
| 1365 | std::fs::write(&state_path, "{ invalid json !!!").unwrap(); | ||
| 1366 | |||
| 1367 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1368 | |||
| 1369 | // Attempting to restore corrupted file should return error | ||
| 1370 | let result = index.restore_from_disk(&state_path); | ||
| 1371 | assert!(result.is_err()); | ||
| 1372 | |||
| 1373 | // Index should remain empty | ||
| 1374 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1375 | assert_eq!(index.cold_index_len(), 0); | ||
| 1376 | } | ||
| 1377 | |||
| 1378 | #[tokio::test] | ||
| 1379 | async fn test_file_cleanup_after_successful_restore() { | ||
| 1380 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1381 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1382 | |||
| 1383 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1384 | let event = create_test_event().await; | ||
| 1385 | |||
| 1386 | index.add_announcement( | ||
| 1387 | event.clone(), | ||
| 1388 | event.pubkey, | ||
| 1389 | "test-repo".to_string(), | ||
| 1390 | RejectionReason::DoesNotListService, | ||
| 1391 | ); | ||
| 1392 | |||
| 1393 | // Save to disk | ||
| 1394 | index.save_to_disk(&state_path).unwrap(); | ||
| 1395 | assert!(state_path.exists()); | ||
| 1396 | |||
| 1397 | // Restore | ||
| 1398 | let index2 = | ||
| 1399 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1400 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1401 | |||
| 1402 | // File should be deleted after successful restore | ||
| 1403 | assert!(!state_path.exists()); | ||
| 1404 | } | ||
| 1405 | |||
| 1406 | #[tokio::test] | ||
| 1407 | async fn test_downtime_calculation_preserves_expiry() { | ||
| 1408 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1409 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1410 | |||
| 1411 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1412 | let event = create_test_event().await; | ||
| 1413 | |||
| 1414 | index.add_announcement( | ||
| 1415 | event.clone(), | ||
| 1416 | event.pubkey, | ||
| 1417 | "test-repo".to_string(), | ||
| 1418 | RejectionReason::DoesNotListService, | ||
| 1419 | ); | ||
| 1420 | |||
| 1421 | // Save to disk | ||
| 1422 | index.save_to_disk(&state_path).unwrap(); | ||
| 1423 | |||
| 1424 | // Simulate downtime by sleeping | ||
| 1425 | std::thread::sleep(Duration::from_millis(100)); | ||
| 1426 | |||
| 1427 | // Restore | ||
| 1428 | let index2 = | ||
| 1429 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1430 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1431 | |||
| 1432 | // Event should still be in both caches (downtime accounted for) | ||
| 1433 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1434 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1435 | assert!(index2.contains(&event.id)); | ||
| 1436 | } | ||
| 1437 | |||
| 1438 | #[tokio::test] | ||
| 1439 | async fn test_entries_expired_during_downtime() { | ||
| 1440 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1441 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1442 | |||
| 1443 | // Create index with very short expiry | ||
| 1444 | let index = RejectedEventsIndex::new( | ||
| 1445 | Duration::from_millis(100), // Hot cache: 100ms | ||
| 1446 | Duration::from_millis(200), // Cold index: 200ms | ||
| 1447 | ); | ||
| 1448 | let event = create_test_event().await; | ||
| 1449 | |||
| 1450 | index.add_announcement( | ||
| 1451 | event.clone(), | ||
| 1452 | event.pubkey, | ||
| 1453 | "test-repo".to_string(), | ||
| 1454 | RejectionReason::DoesNotListService, | ||
| 1455 | ); | ||
| 1456 | |||
| 1457 | // Save to disk | ||
| 1458 | index.save_to_disk(&state_path).unwrap(); | ||
| 1459 | |||
| 1460 | // Simulate downtime longer than hot cache expiry | ||
| 1461 | std::thread::sleep(Duration::from_millis(150)); | ||
| 1462 | |||
| 1463 | // Restore | ||
| 1464 | let index2 = | ||
| 1465 | RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200)); | ||
| 1466 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1467 | |||
| 1468 | // Hot cache entry should have expired during downtime | ||
| 1469 | // Cold index should still have it (200ms expiry) | ||
| 1470 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1471 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1472 | |||
| 1473 | // But when we try to get it, hot cache will see it's expired | ||
| 1474 | let events = index2 | ||
| 1475 | .hot_cache | ||
| 1476 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1477 | assert_eq!(events.len(), 0); // Expired! | ||
| 1478 | |||
| 1479 | // Cleanup should remove it | ||
| 1480 | let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement"); | ||
| 1481 | assert_eq!(hot_expired, 1); | ||
| 1482 | assert_eq!(cold_expired, 0); // Not expired yet | ||
| 1483 | } | ||
| 1484 | |||
| 1485 | #[tokio::test] | ||
| 1486 | async fn test_hot_cache_different_event_types() { | ||
| 1487 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1488 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1489 | |||
| 1490 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1491 | let keys = Keys::generate(); | ||
| 1492 | |||
| 1493 | // Create announcement event | ||
| 1494 | let unsigned_ann = | ||
| 1495 | nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); | ||
| 1496 | let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); | ||
| 1497 | |||
| 1498 | // Create state event | ||
| 1499 | let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); | ||
| 1500 | let event_state = keys.sign_event(unsigned_state).await.unwrap(); | ||
| 1501 | |||
| 1502 | // Add both types | ||
| 1503 | index.add_announcement( | ||
| 1504 | event_ann.clone(), | ||
| 1505 | event_ann.pubkey, | ||
| 1506 | "test-repo".to_string(), | ||
| 1507 | RejectionReason::DoesNotListService, | ||
| 1508 | ); | ||
| 1509 | |||
| 1510 | index.add_state( | ||
| 1511 | event_state.clone(), | ||
| 1512 | event_state.pubkey, | ||
| 1513 | "test-repo".to_string(), | ||
| 1514 | RejectionReason::Other, | ||
| 1515 | ); | ||
| 1516 | |||
| 1517 | // Save and restore | ||
| 1518 | index.save_to_disk(&state_path).unwrap(); | ||
| 1519 | let index2 = | ||
| 1520 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1521 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1522 | |||
| 1523 | // Verify both event types restored | ||
| 1524 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1525 | assert!(index2.contains(&event_ann.id)); | ||
| 1526 | assert!(index2.contains(&event_state.id)); | ||
| 1527 | |||
| 1528 | // Verify we can filter by type | ||
| 1529 | let (removed, events) = index2.invalidate_and_get( | ||
| 1530 | &event_ann.pubkey, | ||
| 1531 | "test-repo", | ||
| 1532 | Some(EventType::Announcement), | ||
| 1533 | ); | ||
| 1534 | assert_eq!(removed, 1); | ||
| 1535 | assert_eq!(events.len(), 1); | ||
| 1536 | assert_eq!(events[0].id, event_ann.id); | ||
| 1537 | } | ||
| 1538 | |||
| 1539 | #[tokio::test] | ||
| 1540 | async fn test_cold_index_different_rejection_reasons() { | ||
| 1541 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1542 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1543 | |||
| 1544 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1545 | let keys = Keys::generate(); | ||
| 1546 | |||
| 1547 | // Create events with different rejection reasons | ||
| 1548 | let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); | ||
| 1549 | let event1 = keys.sign_event(unsigned1).await.unwrap(); | ||
| 1550 | |||
| 1551 | let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); | ||
| 1552 | let event2 = keys.sign_event(unsigned2).await.unwrap(); | ||
| 1553 | |||
| 1554 | let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key()); | ||
| 1555 | let event3 = keys.sign_event(unsigned3).await.unwrap(); | ||
| 1556 | |||
| 1557 | // Add with different rejection reasons | ||
| 1558 | index.add_announcement( | ||
| 1559 | event1.clone(), | ||
| 1560 | event1.pubkey, | ||
| 1561 | "repo1".to_string(), | ||
| 1562 | RejectionReason::DoesNotListService, | ||
| 1563 | ); | ||
| 1564 | |||
| 1565 | index.add_announcement( | ||
| 1566 | event2.clone(), | ||
| 1567 | event2.pubkey, | ||
| 1568 | "repo2".to_string(), | ||
| 1569 | RejectionReason::MaintainerNotYetValid, | ||
| 1570 | ); | ||
| 1571 | |||
| 1572 | index.add_announcement( | ||
| 1573 | event3.clone(), | ||
| 1574 | event3.pubkey, | ||
| 1575 | "repo3".to_string(), | ||
| 1576 | RejectionReason::Other, | ||
| 1577 | ); | ||
| 1578 | |||
| 1579 | // Save and restore | ||
| 1580 | index.save_to_disk(&state_path).unwrap(); | ||
| 1581 | let index2 = | ||
| 1582 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1583 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1584 | |||
| 1585 | // Verify all entries restored with their rejection reasons | ||
| 1586 | assert_eq!(index2.cold_index_len(), 3); | ||
| 1587 | assert!(index2.contains(&event1.id)); | ||
| 1588 | assert!(index2.contains(&event2.id)); | ||
| 1589 | assert!(index2.contains(&event3.id)); | ||
| 1590 | } | ||
| 1591 | |||
| 1592 | #[tokio::test] | ||
| 1593 | async fn test_multiple_save_restore_cycles() { | ||
| 1594 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1595 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1596 | |||
| 1597 | // First cycle | ||
| 1598 | let index1 = | ||
| 1599 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1600 | let event1 = create_test_event().await; | ||
| 1601 | |||
| 1602 | index1.add_announcement( | ||
| 1603 | event1.clone(), | ||
| 1604 | event1.pubkey, | ||
| 1605 | "repo1".to_string(), | ||
| 1606 | RejectionReason::DoesNotListService, | ||
| 1607 | ); | ||
| 1608 | |||
| 1609 | index1.save_to_disk(&state_path).unwrap(); | ||
| 1610 | |||
| 1611 | // Second cycle - restore and add more | ||
| 1612 | let index2 = | ||
| 1613 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1614 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1615 | |||
| 1616 | let event2 = create_test_event().await; | ||
| 1617 | index2.add_announcement( | ||
| 1618 | event2.clone(), | ||
| 1619 | event2.pubkey, | ||
| 1620 | "repo2".to_string(), | ||
| 1621 | RejectionReason::MaintainerNotYetValid, | ||
| 1622 | ); | ||
| 1623 | |||
| 1624 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1625 | index2.save_to_disk(&state_path).unwrap(); | ||
| 1626 | |||
| 1627 | // Third cycle - restore again | ||
| 1628 | let index3 = | ||
| 1629 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1630 | index3.restore_from_disk(&state_path).unwrap(); | ||
| 1631 | |||
| 1632 | // Verify both events survived multiple cycles | ||
| 1633 | assert_eq!(index3.hot_cache_len(), 2); | ||
| 1634 | assert!(index3.contains(&event1.id)); | ||
| 1635 | assert!(index3.contains(&event2.id)); | ||
| 1636 | } | ||
| 1637 | |||
| 1638 | #[tokio::test] | ||
| 1639 | async fn test_restore_preserves_remaining_ttl() { | ||
| 1640 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1641 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1642 | |||
| 1643 | // Create index with 2 second hot cache expiry | ||
| 1644 | let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); | ||
| 1645 | let event = create_test_event().await; | ||
| 1646 | |||
| 1647 | index.add_announcement( | ||
| 1648 | event.clone(), | ||
| 1649 | event.pubkey, | ||
| 1650 | "test-repo".to_string(), | ||
| 1651 | RejectionReason::DoesNotListService, | ||
| 1652 | ); | ||
| 1653 | |||
| 1654 | // Wait 200ms (small fraction of TTL) | ||
| 1655 | std::thread::sleep(Duration::from_millis(200)); | ||
| 1656 | |||
| 1657 | // Save to disk | ||
| 1658 | index.save_to_disk(&state_path).unwrap(); | ||
| 1659 | |||
| 1660 | // Immediately restore (minimal downtime) | ||
| 1661 | let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); | ||
| 1662 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1663 | |||
| 1664 | // Event should still be retrievable (has ~1.8s remaining) | ||
| 1665 | let events = index2 | ||
| 1666 | .hot_cache | ||
| 1667 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1668 | assert_eq!(events.len(), 1); | ||
| 1669 | |||
| 1670 | // Wait 2 seconds (total 2.2s > 2s expiry) | ||
| 1671 | std::thread::sleep(Duration::from_secs(2)); | ||
| 1672 | |||
| 1673 | // Now it should be expired | ||
| 1674 | let events = index2 | ||
| 1675 | .hot_cache | ||
| 1676 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1677 | assert_eq!(events.len(), 0); | ||
| 1678 | } | ||
| 959 | } | 1679 | } |
diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs new file mode 100644 index 0000000..acefb41 --- /dev/null +++ b/tests/purgatory_persistence.rs | |||
| @@ -0,0 +1,755 @@ | |||
| 1 | //! Purgatory Persistence Integration Tests | ||
| 2 | //! | ||
| 3 | //! Tests that verify the full purgatory persistence save/restore cycle: | ||
| 4 | //! - Purgatory save/restore with state events, PR events, and expired events | ||
| 5 | //! - Rejected cache save/restore with hot cache and cold index entries | ||
| 6 | //! - Integration with shutdown/startup hooks | ||
| 7 | //! - Graceful degradation with missing or corrupted files | ||
| 8 | //! - Time adjustment for downtime | ||
| 9 | //! | ||
| 10 | //! # Test Strategy | ||
| 11 | //! | ||
| 12 | //! These tests verify end-to-end persistence functionality: | ||
| 13 | //! 1. Create purgatory/rejected cache instances with various entries | ||
| 14 | //! 2. Save state to disk | ||
| 15 | //! 3. Create new instances and restore from disk | ||
| 16 | //! 4. Verify all data is restored correctly | ||
| 17 | //! 5. Verify system continues to work after restore | ||
| 18 | //! | ||
| 19 | //! # Running Tests | ||
| 20 | //! | ||
| 21 | //! ```bash | ||
| 22 | //! # Run all purgatory persistence tests | ||
| 23 | //! cargo test --test purgatory_persistence | ||
| 24 | //! | ||
| 25 | //! # Run specific test | ||
| 26 | //! cargo test --test purgatory_persistence test_full_purgatory_save_restore_cycle | ||
| 27 | //! | ||
| 28 | //! # With output for debugging | ||
| 29 | //! cargo test --test purgatory_persistence -- --nocapture | ||
| 30 | //! ``` | ||
| 31 | |||
| 32 | mod common; | ||
| 33 | |||
| 34 | use ngit_grasp::purgatory::Purgatory; | ||
| 35 | use ngit_grasp::sync::rejected_index::{EventType, RejectedEventsIndex, RejectionReason}; | ||
| 36 | use nostr_sdk::prelude::*; | ||
| 37 | use std::time::Duration; | ||
| 38 | |||
| 39 | /// Helper to create a test event | ||
| 40 | async fn create_test_event(keys: &Keys, content: &str) -> Event { | ||
| 41 | EventBuilder::text_note(content) | ||
| 42 | .sign_with_keys(keys) | ||
| 43 | .unwrap() | ||
| 44 | } | ||
| 45 | |||
| 46 | /// Helper to create a state event with specific refs | ||
| 47 | fn create_state_event_with_refs( | ||
| 48 | keys: &Keys, | ||
| 49 | identifier: &str, | ||
| 50 | refs: &[(&str, &str)], | ||
| 51 | ) -> Result<Event, Box<dyn std::error::Error>> { | ||
| 52 | let mut tags = vec![Tag::identifier(identifier)]; | ||
| 53 | |||
| 54 | // Add ref tags | ||
| 55 | for (ref_name, commit_hash) in refs { | ||
| 56 | tags.push(Tag::custom( | ||
| 57 | TagKind::custom("ref"), | ||
| 58 | vec![ref_name.to_string(), commit_hash.to_string()], | ||
| 59 | )); | ||
| 60 | } | ||
| 61 | |||
| 62 | let event = EventBuilder::new(Kind::from(30618), "") | ||
| 63 | .tags(tags) | ||
| 64 | .sign_with_keys(keys)?; | ||
| 65 | |||
| 66 | Ok(event) | ||
| 67 | } | ||
| 68 | |||
| 69 | /// Test 1: Full save/restore cycle with state events, PR events, and expired events | ||
| 70 | #[tokio::test] | ||
| 71 | async fn test_full_purgatory_save_restore_cycle() { | ||
| 72 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 73 | let git_data_path = temp_dir.path().join("git"); | ||
| 74 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 75 | |||
| 76 | // Create purgatory instance | ||
| 77 | let purgatory = Purgatory::new(&git_data_path); | ||
| 78 | |||
| 79 | // Create test keys and events | ||
| 80 | let keys1 = Keys::generate(); | ||
| 81 | let keys2 = Keys::generate(); | ||
| 82 | let keys3 = Keys::generate(); | ||
| 83 | |||
| 84 | let state_event1 = | ||
| 85 | create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")]).unwrap(); | ||
| 86 | let state_event2 = | ||
| 87 | create_state_event_with_refs(&keys2, "repo2", &[("main", "def456")]).unwrap(); | ||
| 88 | |||
| 89 | let pr_event1 = create_test_event(&keys3, "PR 1").await; | ||
| 90 | let pr_event2 = create_test_event(&keys3, "PR 2").await; | ||
| 91 | |||
| 92 | // Add state events to purgatory | ||
| 93 | purgatory.add_state( | ||
| 94 | state_event1.clone(), | ||
| 95 | "repo1".to_string(), | ||
| 96 | keys1.public_key(), | ||
| 97 | ); | ||
| 98 | purgatory.add_state( | ||
| 99 | state_event2.clone(), | ||
| 100 | "repo2".to_string(), | ||
| 101 | keys2.public_key(), | ||
| 102 | ); | ||
| 103 | |||
| 104 | // Add PR events to purgatory | ||
| 105 | purgatory.add_pr( | ||
| 106 | pr_event1.clone(), | ||
| 107 | pr_event1.id.to_hex(), | ||
| 108 | "commit-abc".to_string(), | ||
| 109 | ); | ||
| 110 | purgatory.add_pr( | ||
| 111 | pr_event2.clone(), | ||
| 112 | pr_event2.id.to_hex(), | ||
| 113 | "commit-def".to_string(), | ||
| 114 | ); | ||
| 115 | |||
| 116 | // Add a PR placeholder (git-data-first scenario) | ||
| 117 | purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-xyz".to_string()); | ||
| 118 | |||
| 119 | // Note: We can't directly test expired events without accessing private fields, | ||
| 120 | // so we'll focus on testing state and PR events persistence | ||
| 121 | |||
| 122 | // Verify initial counts | ||
| 123 | let (state_count, pr_count) = purgatory.count(); | ||
| 124 | assert_eq!(state_count, 2, "Should have 2 state events"); | ||
| 125 | assert_eq!( | ||
| 126 | pr_count, 3, | ||
| 127 | "Should have 3 PR events (2 events + 1 placeholder)" | ||
| 128 | ); | ||
| 129 | |||
| 130 | // Save to disk | ||
| 131 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 132 | assert!(state_path.exists(), "State file should exist after save"); | ||
| 133 | |||
| 134 | // Create new purgatory instance and restore | ||
| 135 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 136 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 137 | |||
| 138 | // Verify state file was deleted after restore | ||
| 139 | assert!( | ||
| 140 | !state_path.exists(), | ||
| 141 | "State file should be deleted after restore" | ||
| 142 | ); | ||
| 143 | |||
| 144 | // Verify all data was restored | ||
| 145 | let (state_count2, pr_count2) = purgatory2.count(); | ||
| 146 | assert_eq!(state_count2, 2, "Should have 2 state events after restore"); | ||
| 147 | assert_eq!( | ||
| 148 | pr_count2, 3, | ||
| 149 | "Should have 3 PR events after restore (2 events + 1 placeholder)" | ||
| 150 | ); | ||
| 151 | |||
| 152 | // Verify specific state events | ||
| 153 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 154 | assert_eq!(repo1_states.len(), 1); | ||
| 155 | assert_eq!(repo1_states[0].event.id, state_event1.id); | ||
| 156 | |||
| 157 | let repo2_states = purgatory2.find_state("repo2"); | ||
| 158 | assert_eq!(repo2_states.len(), 1); | ||
| 159 | assert_eq!(repo2_states[0].event.id, state_event2.id); | ||
| 160 | |||
| 161 | // Verify PR events | ||
| 162 | let pr1 = purgatory2.find_pr(&pr_event1.id.to_hex()); | ||
| 163 | assert!(pr1.is_some()); | ||
| 164 | assert_eq!(pr1.unwrap().commit, "commit-abc"); | ||
| 165 | |||
| 166 | let pr2 = purgatory2.find_pr(&pr_event2.id.to_hex()); | ||
| 167 | assert!(pr2.is_some()); | ||
| 168 | assert_eq!(pr2.unwrap().commit, "commit-def"); | ||
| 169 | |||
| 170 | // Verify placeholder | ||
| 171 | let placeholder = purgatory2.find_pr_placeholder("placeholder-id"); | ||
| 172 | assert_eq!(placeholder, Some("commit-xyz".to_string())); | ||
| 173 | |||
| 174 | // Verify re-queueing works - get all identifiers | ||
| 175 | let identifiers = purgatory2.get_all_identifiers(); | ||
| 176 | assert_eq!(identifiers.len(), 2); | ||
| 177 | assert!(identifiers.contains(&"repo1".to_string())); | ||
| 178 | assert!(identifiers.contains(&"repo2".to_string())); | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Test 2: Rejected cache integration - save/restore hot cache and cold index | ||
| 182 | #[tokio::test] | ||
| 183 | async fn test_rejected_cache_save_restore_cycle() { | ||
| 184 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 185 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 186 | |||
| 187 | // Create rejected events index | ||
| 188 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 189 | |||
| 190 | // Create test events | ||
| 191 | let keys1 = Keys::generate(); | ||
| 192 | let keys2 = Keys::generate(); | ||
| 193 | |||
| 194 | let event1 = create_test_event(&keys1, "announcement 1").await; | ||
| 195 | let event2 = create_test_event(&keys2, "announcement 2").await; | ||
| 196 | let event3 = create_test_event(&keys1, "state 1").await; | ||
| 197 | |||
| 198 | // Add announcements to rejected cache | ||
| 199 | index.add_announcement( | ||
| 200 | event1.clone(), | ||
| 201 | event1.pubkey, | ||
| 202 | "repo1".to_string(), | ||
| 203 | RejectionReason::DoesNotListService, | ||
| 204 | ); | ||
| 205 | |||
| 206 | index.add_announcement( | ||
| 207 | event2.clone(), | ||
| 208 | event2.pubkey, | ||
| 209 | "repo2".to_string(), | ||
| 210 | RejectionReason::MaintainerNotYetValid, | ||
| 211 | ); | ||
| 212 | |||
| 213 | // Add state event to rejected cache | ||
| 214 | index.add_state( | ||
| 215 | event3.clone(), | ||
| 216 | event3.pubkey, | ||
| 217 | "repo1".to_string(), | ||
| 218 | RejectionReason::Other, | ||
| 219 | ); | ||
| 220 | |||
| 221 | // Verify initial counts | ||
| 222 | assert_eq!(index.hot_cache_len(), 3); | ||
| 223 | assert_eq!(index.cold_index_len(), 3); | ||
| 224 | |||
| 225 | // Save to disk | ||
| 226 | index.save_to_disk(&state_path).unwrap(); | ||
| 227 | assert!(state_path.exists()); | ||
| 228 | |||
| 229 | // Create new index and restore | ||
| 230 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 231 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 232 | |||
| 233 | // Verify state file was deleted | ||
| 234 | assert!(!state_path.exists()); | ||
| 235 | |||
| 236 | // Verify all entries restored | ||
| 237 | assert_eq!(index2.hot_cache_len(), 3); | ||
| 238 | assert_eq!(index2.cold_index_len(), 3); | ||
| 239 | |||
| 240 | // Verify specific entries | ||
| 241 | assert!(index2.contains(&event1.id)); | ||
| 242 | assert!(index2.contains(&event2.id)); | ||
| 243 | assert!(index2.contains(&event3.id)); | ||
| 244 | |||
| 245 | // Verify we can invalidate and get events | ||
| 246 | let (removed, hot_events) = | ||
| 247 | index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement)); | ||
| 248 | assert_eq!(removed, 1); | ||
| 249 | assert_eq!(hot_events.len(), 1); | ||
| 250 | assert_eq!(hot_events[0].id, event1.id); | ||
| 251 | } | ||
| 252 | |||
| 253 | /// Test 3: Simulated downtime - verify expiry times are adjusted correctly | ||
| 254 | #[tokio::test] | ||
| 255 | async fn test_purgatory_downtime_adjustment() { | ||
| 256 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 257 | let git_data_path = temp_dir.path().join("git"); | ||
| 258 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 259 | |||
| 260 | let purgatory = Purgatory::new(&git_data_path); | ||
| 261 | let keys = Keys::generate(); | ||
| 262 | |||
| 263 | let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 264 | .unwrap(); | ||
| 265 | |||
| 266 | purgatory.add_state(state_event.clone(), "repo1".to_string(), keys.public_key()); | ||
| 267 | |||
| 268 | // Save to disk | ||
| 269 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 270 | |||
| 271 | // Simulate downtime | ||
| 272 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 273 | |||
| 274 | // Restore | ||
| 275 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 276 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 277 | |||
| 278 | // Verify event is still there (downtime was accounted for) | ||
| 279 | let (state_count, _) = purgatory2.count(); | ||
| 280 | assert_eq!(state_count, 1); | ||
| 281 | |||
| 282 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 283 | assert_eq!(repo1_states.len(), 1); | ||
| 284 | assert_eq!(repo1_states[0].event.id, state_event.id); | ||
| 285 | |||
| 286 | // Verify the event hasn't expired yet (expiry time was adjusted) | ||
| 287 | // The event should have ~30 minutes minus the downtime | ||
| 288 | let entry = &repo1_states[0]; | ||
| 289 | let remaining = entry | ||
| 290 | .expires_at | ||
| 291 | .saturating_duration_since(std::time::Instant::now()); | ||
| 292 | assert!( | ||
| 293 | remaining > Duration::from_secs(1700), | ||
| 294 | "Event should have most of its 30min expiry remaining" | ||
| 295 | ); | ||
| 296 | } | ||
| 297 | |||
| 298 | /// Test 4: Rejected cache downtime adjustment | ||
| 299 | #[tokio::test] | ||
| 300 | async fn test_rejected_cache_downtime_adjustment() { | ||
| 301 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 302 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 303 | |||
| 304 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 305 | let keys = Keys::generate(); | ||
| 306 | |||
| 307 | let event = create_test_event(&keys, "test").await; | ||
| 308 | |||
| 309 | index.add_announcement( | ||
| 310 | event.clone(), | ||
| 311 | event.pubkey, | ||
| 312 | "repo1".to_string(), | ||
| 313 | RejectionReason::DoesNotListService, | ||
| 314 | ); | ||
| 315 | |||
| 316 | // Save to disk | ||
| 317 | index.save_to_disk(&state_path).unwrap(); | ||
| 318 | |||
| 319 | // Simulate downtime | ||
| 320 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 321 | |||
| 322 | // Restore | ||
| 323 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 324 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 325 | |||
| 326 | // Verify event is still in both caches (downtime was accounted for) | ||
| 327 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 328 | assert_eq!(index2.cold_index_len(), 1); | ||
| 329 | assert!(index2.contains(&event.id)); | ||
| 330 | } | ||
| 331 | |||
| 332 | /// Test 5: File cleanup - verify state files are deleted after successful restore | ||
| 333 | #[tokio::test] | ||
| 334 | async fn test_purgatory_file_cleanup_after_restore() { | ||
| 335 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 336 | let git_data_path = temp_dir.path().join("git"); | ||
| 337 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 338 | |||
| 339 | let purgatory = Purgatory::new(&git_data_path); | ||
| 340 | let keys = Keys::generate(); | ||
| 341 | |||
| 342 | let state_event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 343 | .unwrap(); | ||
| 344 | |||
| 345 | purgatory.add_state(state_event, "repo1".to_string(), keys.public_key()); | ||
| 346 | |||
| 347 | // Save to disk | ||
| 348 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 349 | assert!(state_path.exists(), "State file should exist after save"); | ||
| 350 | |||
| 351 | // Restore | ||
| 352 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 353 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 354 | |||
| 355 | // Verify file was deleted | ||
| 356 | assert!( | ||
| 357 | !state_path.exists(), | ||
| 358 | "State file should be deleted after successful restore" | ||
| 359 | ); | ||
| 360 | } | ||
| 361 | |||
| 362 | /// Test 6: Rejected cache file cleanup | ||
| 363 | #[tokio::test] | ||
| 364 | async fn test_rejected_cache_file_cleanup_after_restore() { | ||
| 365 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 366 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 367 | |||
| 368 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 369 | let keys = Keys::generate(); | ||
| 370 | |||
| 371 | let event = create_test_event(&keys, "test").await; | ||
| 372 | |||
| 373 | index.add_announcement( | ||
| 374 | event, | ||
| 375 | keys.public_key(), | ||
| 376 | "repo1".to_string(), | ||
| 377 | RejectionReason::DoesNotListService, | ||
| 378 | ); | ||
| 379 | |||
| 380 | // Save to disk | ||
| 381 | index.save_to_disk(&state_path).unwrap(); | ||
| 382 | assert!(state_path.exists()); | ||
| 383 | |||
| 384 | // Restore | ||
| 385 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 386 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 387 | |||
| 388 | // Verify file was deleted | ||
| 389 | assert!(!state_path.exists()); | ||
| 390 | } | ||
| 391 | |||
| 392 | /// Test 7: Graceful degradation - missing purgatory file | ||
| 393 | #[tokio::test] | ||
| 394 | async fn test_purgatory_restore_missing_file() { | ||
| 395 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 396 | let git_data_path = temp_dir.path().join("git"); | ||
| 397 | let state_path = temp_dir.path().join("nonexistent.json"); | ||
| 398 | |||
| 399 | let purgatory = Purgatory::new(&git_data_path); | ||
| 400 | |||
| 401 | // Attempting to restore missing file should return error | ||
| 402 | let result = purgatory.restore_from_disk(&state_path); | ||
| 403 | assert!(result.is_err(), "Should error on missing file"); | ||
| 404 | |||
| 405 | // Purgatory should still be usable (empty state) | ||
| 406 | let (state_count, pr_count) = purgatory.count(); | ||
| 407 | assert_eq!(state_count, 0); | ||
| 408 | assert_eq!(pr_count, 0); | ||
| 409 | |||
| 410 | // Should be able to add events normally | ||
| 411 | let keys = Keys::generate(); | ||
| 412 | let event = create_test_event(&keys, "test").await; | ||
| 413 | purgatory.add_state(event, "repo1".to_string(), keys.public_key()); | ||
| 414 | |||
| 415 | let (state_count, _) = purgatory.count(); | ||
| 416 | assert_eq!(state_count, 1); | ||
| 417 | } | ||
| 418 | |||
| 419 | /// Test 8: Graceful degradation - missing rejected cache file | ||
| 420 | #[tokio::test] | ||
| 421 | async fn test_rejected_cache_restore_missing_file() { | ||
| 422 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 423 | let state_path = temp_dir.path().join("nonexistent.json"); | ||
| 424 | |||
| 425 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 426 | |||
| 427 | // Attempting to restore missing file should return error | ||
| 428 | let result = index.restore_from_disk(&state_path); | ||
| 429 | assert!(result.is_err()); | ||
| 430 | |||
| 431 | // Index should still be usable (empty state) | ||
| 432 | assert_eq!(index.hot_cache_len(), 0); | ||
| 433 | assert_eq!(index.cold_index_len(), 0); | ||
| 434 | |||
| 435 | // Should be able to add events normally | ||
| 436 | let keys = Keys::generate(); | ||
| 437 | let event = create_test_event(&keys, "test").await; | ||
| 438 | index.add_announcement( | ||
| 439 | event, | ||
| 440 | keys.public_key(), | ||
| 441 | "repo1".to_string(), | ||
| 442 | RejectionReason::DoesNotListService, | ||
| 443 | ); | ||
| 444 | |||
| 445 | assert_eq!(index.hot_cache_len(), 1); | ||
| 446 | assert_eq!(index.cold_index_len(), 1); | ||
| 447 | } | ||
| 448 | |||
| 449 | /// Test 9: Graceful degradation - corrupted purgatory file | ||
| 450 | #[tokio::test] | ||
| 451 | async fn test_purgatory_restore_corrupted_file() { | ||
| 452 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 453 | let git_data_path = temp_dir.path().join("git"); | ||
| 454 | let state_path = temp_dir.path().join("corrupted.json"); | ||
| 455 | |||
| 456 | // Write corrupted JSON | ||
| 457 | std::fs::write(&state_path, "{ invalid json !!!").unwrap(); | ||
| 458 | |||
| 459 | let purgatory = Purgatory::new(&git_data_path); | ||
| 460 | |||
| 461 | // Attempting to restore corrupted file should return error | ||
| 462 | let result = purgatory.restore_from_disk(&state_path); | ||
| 463 | assert!(result.is_err(), "Should error on corrupted file"); | ||
| 464 | |||
| 465 | // Purgatory should still be usable | ||
| 466 | let (state_count, pr_count) = purgatory.count(); | ||
| 467 | assert_eq!(state_count, 0); | ||
| 468 | assert_eq!(pr_count, 0); | ||
| 469 | } | ||
| 470 | |||
| 471 | /// Test 10: Graceful degradation - corrupted rejected cache file | ||
| 472 | #[tokio::test] | ||
| 473 | async fn test_rejected_cache_restore_corrupted_file() { | ||
| 474 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 475 | let state_path = temp_dir.path().join("corrupted.json"); | ||
| 476 | |||
| 477 | // Write corrupted JSON | ||
| 478 | std::fs::write(&state_path, "{ invalid json !!!").unwrap(); | ||
| 479 | |||
| 480 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 481 | |||
| 482 | // Attempting to restore corrupted file should return error | ||
| 483 | let result = index.restore_from_disk(&state_path); | ||
| 484 | assert!(result.is_err()); | ||
| 485 | |||
| 486 | // Index should still be usable | ||
| 487 | assert_eq!(index.hot_cache_len(), 0); | ||
| 488 | assert_eq!(index.cold_index_len(), 0); | ||
| 489 | } | ||
| 490 | |||
| 491 | /// Test 11: Empty purgatory save/restore | ||
| 492 | #[tokio::test] | ||
| 493 | async fn test_empty_purgatory_save_restore() { | ||
| 494 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 495 | let git_data_path = temp_dir.path().join("git"); | ||
| 496 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 497 | |||
| 498 | let purgatory = Purgatory::new(&git_data_path); | ||
| 499 | |||
| 500 | // Save empty purgatory | ||
| 501 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 502 | assert!(state_path.exists()); | ||
| 503 | |||
| 504 | // Restore | ||
| 505 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 506 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 507 | |||
| 508 | // Verify empty state | ||
| 509 | let (state_count, pr_count) = purgatory2.count(); | ||
| 510 | assert_eq!(state_count, 0); | ||
| 511 | assert_eq!(pr_count, 0); | ||
| 512 | assert_eq!(purgatory2.expired_count(), 0); | ||
| 513 | } | ||
| 514 | |||
| 515 | /// Test 12: Empty rejected cache save/restore | ||
| 516 | #[tokio::test] | ||
| 517 | async fn test_empty_rejected_cache_save_restore() { | ||
| 518 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 519 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 520 | |||
| 521 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 522 | |||
| 523 | // Save empty cache | ||
| 524 | index.save_to_disk(&state_path).unwrap(); | ||
| 525 | assert!(state_path.exists()); | ||
| 526 | |||
| 527 | // Restore | ||
| 528 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 529 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 530 | |||
| 531 | // Verify empty state | ||
| 532 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 533 | assert_eq!(index2.cold_index_len(), 0); | ||
| 534 | } | ||
| 535 | |||
| 536 | /// Test 13: Multiple state events for same identifier | ||
| 537 | #[tokio::test] | ||
| 538 | async fn test_purgatory_multiple_state_events_same_identifier() { | ||
| 539 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 540 | let git_data_path = temp_dir.path().join("git"); | ||
| 541 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 542 | |||
| 543 | let purgatory = Purgatory::new(&git_data_path); | ||
| 544 | |||
| 545 | // Create multiple state events for same identifier (different maintainers) | ||
| 546 | let keys1 = Keys::generate(); | ||
| 547 | let keys2 = Keys::generate(); | ||
| 548 | |||
| 549 | let event1 = create_state_event_with_refs(&keys1, "repo1", &[("main", "abc123")]) | ||
| 550 | .unwrap(); | ||
| 551 | let event2 = create_state_event_with_refs(&keys2, "repo1", &[("main", "def456")]) | ||
| 552 | .unwrap(); | ||
| 553 | |||
| 554 | purgatory.add_state(event1.clone(), "repo1".to_string(), keys1.public_key()); | ||
| 555 | purgatory.add_state(event2.clone(), "repo1".to_string(), keys2.public_key()); | ||
| 556 | |||
| 557 | // Save and restore | ||
| 558 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 559 | |||
| 560 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 561 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 562 | |||
| 563 | // Verify both events restored | ||
| 564 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 565 | assert_eq!(repo1_states.len(), 2); | ||
| 566 | |||
| 567 | let event_ids: Vec<_> = repo1_states.iter().map(|e| e.event.id).collect(); | ||
| 568 | assert!(event_ids.contains(&event1.id)); | ||
| 569 | assert!(event_ids.contains(&event2.id)); | ||
| 570 | } | ||
| 571 | |||
| 572 | /// Test 14: Verify system continues to work after restore | ||
| 573 | #[tokio::test] | ||
| 574 | async fn test_purgatory_continues_working_after_restore() { | ||
| 575 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 576 | let git_data_path = temp_dir.path().join("git"); | ||
| 577 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 578 | |||
| 579 | let purgatory = Purgatory::new(&git_data_path); | ||
| 580 | let keys = Keys::generate(); | ||
| 581 | |||
| 582 | let event1 = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 583 | .unwrap(); | ||
| 584 | |||
| 585 | purgatory.add_state(event1.clone(), "repo1".to_string(), keys.public_key()); | ||
| 586 | |||
| 587 | // Save and restore | ||
| 588 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 589 | |||
| 590 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 591 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 592 | |||
| 593 | // Add new events after restore | ||
| 594 | let event2 = create_state_event_with_refs(&keys, "repo2", &[("main", "xyz789")]) | ||
| 595 | .unwrap(); | ||
| 596 | |||
| 597 | purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key()); | ||
| 598 | |||
| 599 | // Verify both old and new events work | ||
| 600 | let (state_count, _) = purgatory2.count(); | ||
| 601 | assert_eq!(state_count, 2); | ||
| 602 | |||
| 603 | let repo1_states = purgatory2.find_state("repo1"); | ||
| 604 | assert_eq!(repo1_states.len(), 1); | ||
| 605 | assert_eq!(repo1_states[0].event.id, event1.id); | ||
| 606 | |||
| 607 | let repo2_states = purgatory2.find_state("repo2"); | ||
| 608 | assert_eq!(repo2_states.len(), 1); | ||
| 609 | assert_eq!(repo2_states[0].event.id, event2.id); | ||
| 610 | |||
| 611 | // Verify cleanup still works | ||
| 612 | let (state_removed, pr_removed) = purgatory2.cleanup(); | ||
| 613 | // Nothing should be expired yet | ||
| 614 | assert_eq!(state_removed, 0); | ||
| 615 | assert_eq!(pr_removed, 0); | ||
| 616 | } | ||
| 617 | |||
| 618 | /// Test 15: Verify rejected cache continues working after restore | ||
| 619 | #[tokio::test] | ||
| 620 | async fn test_rejected_cache_continues_working_after_restore() { | ||
| 621 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 622 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 623 | |||
| 624 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 625 | let keys = Keys::generate(); | ||
| 626 | |||
| 627 | let event1 = create_test_event(&keys, "event1").await; | ||
| 628 | |||
| 629 | index.add_announcement( | ||
| 630 | event1.clone(), | ||
| 631 | event1.pubkey, | ||
| 632 | "repo1".to_string(), | ||
| 633 | RejectionReason::DoesNotListService, | ||
| 634 | ); | ||
| 635 | |||
| 636 | // Save and restore | ||
| 637 | index.save_to_disk(&state_path).unwrap(); | ||
| 638 | |||
| 639 | let index2 = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 640 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 641 | |||
| 642 | // Add new events after restore | ||
| 643 | let event2 = create_test_event(&keys, "event2").await; | ||
| 644 | |||
| 645 | index2.add_announcement( | ||
| 646 | event2.clone(), | ||
| 647 | event2.pubkey, | ||
| 648 | "repo2".to_string(), | ||
| 649 | RejectionReason::MaintainerNotYetValid, | ||
| 650 | ); | ||
| 651 | |||
| 652 | // Verify both old and new events work | ||
| 653 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 654 | assert_eq!(index2.cold_index_len(), 2); | ||
| 655 | assert!(index2.contains(&event1.id)); | ||
| 656 | assert!(index2.contains(&event2.id)); | ||
| 657 | |||
| 658 | // Verify invalidation still works | ||
| 659 | let (removed, hot_events) = | ||
| 660 | index2.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement)); | ||
| 661 | assert_eq!(removed, 1); | ||
| 662 | assert_eq!(hot_events.len(), 1); | ||
| 663 | assert_eq!(hot_events[0].id, event1.id); | ||
| 664 | } | ||
| 665 | |||
| 666 | /// Test 16: Entries that expired during downtime are properly handled | ||
| 667 | #[tokio::test] | ||
| 668 | async fn test_purgatory_entries_expired_during_downtime() { | ||
| 669 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 670 | let git_data_path = temp_dir.path().join("git"); | ||
| 671 | let state_path = temp_dir.path().join("purgatory.json"); | ||
| 672 | |||
| 673 | let purgatory = Purgatory::new(&git_data_path); | ||
| 674 | let keys = Keys::generate(); | ||
| 675 | |||
| 676 | let event = create_state_event_with_refs(&keys, "repo1", &[("main", "abc123")]) | ||
| 677 | .unwrap(); | ||
| 678 | |||
| 679 | purgatory.add_state(event.clone(), "repo1".to_string(), keys.public_key()); | ||
| 680 | |||
| 681 | // Save to disk | ||
| 682 | purgatory.save_to_disk(&state_path).unwrap(); | ||
| 683 | |||
| 684 | // Simulate very long downtime (longer than the 30min default expiry) | ||
| 685 | // Note: We can't manually set expiry without accessing private fields, | ||
| 686 | // so this test verifies that the system handles already-expired entries gracefully | ||
| 687 | // In a real scenario, if downtime > 30 minutes, entries would be expired on restore | ||
| 688 | |||
| 689 | // For this test, we'll just verify the restore works and cleanup can be called | ||
| 690 | let purgatory2 = Purgatory::new(&git_data_path); | ||
| 691 | purgatory2.restore_from_disk(&state_path).unwrap(); | ||
| 692 | |||
| 693 | // Event should be restored | ||
| 694 | let (state_count, _) = purgatory2.count(); | ||
| 695 | assert_eq!(state_count, 1); | ||
| 696 | |||
| 697 | // Cleanup should work (even if nothing is expired yet) | ||
| 698 | let (state_removed, _) = purgatory2.cleanup(); | ||
| 699 | // Nothing expired yet since we didn't wait 30 minutes | ||
| 700 | assert_eq!(state_removed, 0); | ||
| 701 | |||
| 702 | let (state_count, _) = purgatory2.count(); | ||
| 703 | assert_eq!(state_count, 1); | ||
| 704 | } | ||
| 705 | |||
| 706 | /// Test 17: Rejected cache entries that expired during downtime | ||
| 707 | #[tokio::test] | ||
| 708 | async fn test_rejected_cache_entries_expired_during_downtime() { | ||
| 709 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 710 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 711 | |||
| 712 | // Create index with very short expiry | ||
| 713 | let index = RejectedEventsIndex::new( | ||
| 714 | Duration::from_millis(50), // Hot cache: 50ms | ||
| 715 | Duration::from_millis(100), // Cold index: 100ms | ||
| 716 | ); | ||
| 717 | let keys = Keys::generate(); | ||
| 718 | |||
| 719 | let event = create_test_event(&keys, "test").await; | ||
| 720 | |||
| 721 | index.add_announcement( | ||
| 722 | event.clone(), | ||
| 723 | event.pubkey, | ||
| 724 | "repo1".to_string(), | ||
| 725 | RejectionReason::DoesNotListService, | ||
| 726 | ); | ||
| 727 | |||
| 728 | // Save to disk | ||
| 729 | index.save_to_disk(&state_path).unwrap(); | ||
| 730 | |||
| 731 | // Simulate downtime longer than hot cache expiry | ||
| 732 | tokio::time::sleep(Duration::from_millis(75)).await; | ||
| 733 | |||
| 734 | // Restore | ||
| 735 | let index2 = RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_millis(100)); | ||
| 736 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 737 | |||
| 738 | // Both should be restored initially | ||
| 739 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 740 | assert_eq!(index2.cold_index_len(), 1); | ||
| 741 | |||
| 742 | // Note: We can't directly access hot_cache.get_maintainer_events (private method) | ||
| 743 | // But we can verify the entry is there via contains() and test cleanup | ||
| 744 | |||
| 745 | // Verify entry is still tracked | ||
| 746 | assert!(index2.contains(&event.id)); | ||
| 747 | |||
| 748 | // Cleanup should remove expired hot cache entry | ||
| 749 | let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement"); | ||
| 750 | assert_eq!(hot_expired, 1); | ||
| 751 | assert_eq!(cold_expired, 0); // Cold index still valid | ||
| 752 | |||
| 753 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 754 | assert_eq!(index2.cold_index_len(), 1); | ||
| 755 | } | ||