diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
| commit | 4c8f1813fada9ce2bfd371095b0721bff68173e3 (patch) | |
| tree | d42869f89e4916bb8dc36fd26c9ac5f888e042ac /src/sync | |
| parent | 7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff) | |
| parent | b101afa00bc28e1b55286145cb81e32a5b3decc9 (diff) | |
Add purgatory persistence to survive relay restarts
Implement save/restore functionality for both purgatory state and
rejected events cache. Events are now saved to disk on graceful
shutdown and restored on startup, preventing data loss during
relay restarts.
Key features:
- Purgatory state persisted to JSON (state events, PR events, expired events)
- Rejected events cache persisted (hot cache + cold index)
- Downtime adjustment preserves remaining TTL
- Graceful degradation on missing/corrupted files
- Automatic re-queueing of restored repositories
- Comprehensive test coverage (45 tests)
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 71 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 726 |
2 files changed, 782 insertions, 15 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e2d55bd..3213dfb 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -43,6 +43,7 @@ pub use health::RelayHealthTracker; | |||
| 43 | use tokio::time::sleep; | 43 | use tokio::time::sleep; |
| 44 | 44 | ||
| 45 | use std::collections::{HashMap, HashSet}; | 45 | use std::collections::{HashMap, HashSet}; |
| 46 | use std::path::{Path, PathBuf}; | ||
| 46 | use std::sync::Arc; | 47 | use std::sync::Arc; |
| 47 | use std::time::Duration; | 48 | use std::time::Duration; |
| 48 | 49 | ||
| @@ -581,6 +582,7 @@ impl SyncManager { | |||
| 581 | /// * `write_policy` - Policy for validating events before storage | 582 | /// * `write_policy` - Policy for validating events before storage |
| 582 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) | 583 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) |
| 583 | /// * `config` - Configuration for sync settings | 584 | /// * `config` - Configuration for sync settings |
| 585 | /// * `data_path` - Path to git data directory (for persistence) | ||
| 584 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) | 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) |
| 585 | pub fn new( | 587 | pub fn new( |
| 586 | bootstrap_relay_url: Option<String>, | 588 | bootstrap_relay_url: Option<String>, |
| @@ -589,11 +591,42 @@ impl SyncManager { | |||
| 589 | write_policy: Nip34WritePolicy, | 591 | write_policy: Nip34WritePolicy, |
| 590 | local_relay: LocalRelay, | 592 | local_relay: LocalRelay, |
| 591 | config: &Config, | 593 | config: &Config, |
| 594 | data_path: PathBuf, | ||
| 592 | sync_metrics: Option<SyncMetrics>, | 595 | sync_metrics: Option<SyncMetrics>, |
| 593 | ) -> Self { | 596 | ) -> Self { |
| 594 | // Extract purgatory from write_policy for read-only access | 597 | // Extract purgatory from write_policy for read-only access |
| 595 | let purgatory = write_policy.purgatory().clone(); | 598 | let purgatory = write_policy.purgatory().clone(); |
| 596 | 599 | ||
| 600 | // Create rejected events index | ||
| 601 | let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 602 | RejectedEventsIndex::with_metrics( | ||
| 603 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 604 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 605 | metrics.clone(), | ||
| 606 | ) | ||
| 607 | } else { | ||
| 608 | RejectedEventsIndex::new( | ||
| 609 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 610 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 611 | ) | ||
| 612 | }); | ||
| 613 | |||
| 614 | // Attempt to restore rejected events index from disk | ||
| 615 | let rejected_index_path = data_path.join("rejected-events-cache.json"); | ||
| 616 | if rejected_index_path.exists() { | ||
| 617 | match rejected_events_index.restore_from_disk(&rejected_index_path) { | ||
| 618 | Ok(()) => { | ||
| 619 | tracing::info!("Restored rejected events index from disk"); | ||
| 620 | } | ||
| 621 | Err(e) => { | ||
| 622 | tracing::warn!( | ||
| 623 | "Failed to restore rejected events index: {}, starting empty", | ||
| 624 | e | ||
| 625 | ); | ||
| 626 | } | ||
| 627 | } | ||
| 628 | } | ||
| 629 | |||
| 597 | Self { | 630 | Self { |
| 598 | bootstrap_relay_url, | 631 | bootstrap_relay_url, |
| 599 | service_domain, | 632 | service_domain, |
| @@ -605,18 +638,7 @@ impl SyncManager { | |||
| 605 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 638 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 606 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 639 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 607 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | 640 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 608 | rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { | 641 | rejected_events_index, |
| 609 | RejectedEventsIndex::with_metrics( | ||
| 610 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 611 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 612 | metrics.clone(), | ||
| 613 | ) | ||
| 614 | } else { | ||
| 615 | RejectedEventsIndex::new( | ||
| 616 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 617 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 618 | ) | ||
| 619 | }), | ||
| 620 | connections: HashMap::new(), | 642 | connections: HashMap::new(), |
| 621 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 643 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 622 | next_batch_id: 0, | 644 | next_batch_id: 0, |
| @@ -637,6 +659,31 @@ impl SyncManager { | |||
| 637 | self.next_batch_id | 659 | self.next_batch_id |
| 638 | } | 660 | } |
| 639 | 661 | ||
| 662 | /// Get a clone of the rejected events index Arc. | ||
| 663 | /// | ||
| 664 | /// This allows access to the rejected events index for persistence | ||
| 665 | /// even after the SyncManager has been moved into a task. | ||
| 666 | /// | ||
| 667 | /// # Returns | ||
| 668 | /// Arc clone of the rejected events index | ||
| 669 | pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> { | ||
| 670 | self.rejected_events_index.clone() | ||
| 671 | } | ||
| 672 | |||
| 673 | /// Save rejected events index to disk. | ||
| 674 | /// | ||
| 675 | /// This is called during shutdown to persist the rejected events cache, | ||
| 676 | /// allowing us to avoid re-downloading rejected events after restart. | ||
| 677 | /// | ||
| 678 | /// # Arguments | ||
| 679 | /// * `path` - Path to save the rejected index file | ||
| 680 | /// | ||
| 681 | /// # Returns | ||
| 682 | /// Ok(()) on success, Err if save fails | ||
| 683 | pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 684 | self.rejected_events_index.save_to_disk(path) | ||
| 685 | } | ||
| 686 | |||
| 640 | /// Handle EOSE (End Of Stored Events) for a subscription | 687 | /// Handle EOSE (End Of Stored Events) for a subscription |
| 641 | /// | 688 | /// |
| 642 | /// This method: | 689 | /// This method: |
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 4d31901..f25f22a 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -86,12 +86,14 @@ | |||
| 86 | //! ``` | 86 | //! ``` |
| 87 | 87 | ||
| 88 | use nostr_sdk::{Event, EventId, PublicKey}; | 88 | use nostr_sdk::{Event, EventId, PublicKey}; |
| 89 | use serde::{Deserialize, Serialize}; | ||
| 89 | use std::collections::{HashMap, HashSet}; | 90 | use std::collections::{HashMap, HashSet}; |
| 91 | use std::path::Path; | ||
| 90 | use std::sync::{Arc, RwLock}; | 92 | use std::sync::{Arc, RwLock}; |
| 91 | use std::time::{Duration, Instant}; | 93 | use std::time::{Duration, Instant, SystemTime}; |
| 92 | 94 | ||
| 93 | /// Type of event stored in the rejected events index | 95 | /// Type of event stored in the rejected events index |
| 94 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 96 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] |
| 95 | pub enum EventType { | 97 | pub enum EventType { |
| 96 | /// Repository announcement (kind 30617) | 98 | /// Repository announcement (kind 30617) |
| 97 | Announcement, | 99 | Announcement, |
| @@ -109,7 +111,7 @@ impl std::fmt::Display for EventType { | |||
| 109 | } | 111 | } |
| 110 | 112 | ||
| 111 | /// Reason why a repository announcement was rejected | 113 | /// Reason why a repository announcement was rejected |
| 112 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 114 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] |
| 113 | pub enum RejectionReason { | 115 | pub enum RejectionReason { |
| 114 | /// Announcement doesn't list this service in clone/web URLs | 116 | /// Announcement doesn't list this service in clone/web URLs |
| 115 | DoesNotListService, | 117 | DoesNotListService, |
| @@ -141,6 +143,20 @@ struct HotCacheEntry { | |||
| 141 | cached_at: Instant, | 143 | cached_at: Instant, |
| 142 | } | 144 | } |
| 143 | 145 | ||
| 146 | /// Serializable version of HotCacheEntry for persistence | ||
| 147 | /// | ||
| 148 | /// Converts Instant to Duration offset from saved_at time | ||
| 149 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 150 | struct SerializableHotCacheEntry { | ||
| 151 | event: Event, | ||
| 152 | pubkey: PublicKey, | ||
| 153 | identifier: String, | ||
| 154 | event_type: EventType, | ||
| 155 | reason: RejectionReason, | ||
| 156 | /// Duration since saved_at when this entry was cached | ||
| 157 | cached_at_offset_secs: u64, | ||
| 158 | } | ||
| 159 | |||
| 144 | /// Entry in the cold index (metadata only) | 160 | /// Entry in the cold index (metadata only) |
| 145 | /// | 161 | /// |
| 146 | /// Note: event_id is stored as the HashMap key, not in this struct | 162 | /// Note: event_id is stored as the HashMap key, not in this struct |
| @@ -154,6 +170,49 @@ struct ColdIndexEntry { | |||
| 154 | rejected_at: Instant, | 170 | rejected_at: Instant, |
| 155 | } | 171 | } |
| 156 | 172 | ||
| 173 | /// Serializable version of ColdIndexEntry for persistence | ||
| 174 | /// | ||
| 175 | /// Converts Instant to Duration offset from saved_at time | ||
| 176 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 177 | struct SerializableColdIndexEntry { | ||
| 178 | pubkey: PublicKey, | ||
| 179 | identifier: String, | ||
| 180 | event_type: EventType, | ||
| 181 | reason: RejectionReason, | ||
| 182 | /// Duration since saved_at when this entry was rejected | ||
| 183 | rejected_at_offset_secs: u64, | ||
| 184 | } | ||
| 185 | |||
| 186 | /// Serializable state for hot cache | ||
| 187 | #[derive(Debug, Serialize, Deserialize)] | ||
| 188 | struct SerializableHotCache { | ||
| 189 | expiry_duration_secs: u64, | ||
| 190 | entries: HashMap<EventId, SerializableHotCacheEntry>, | ||
| 191 | } | ||
| 192 | |||
| 193 | /// Serializable state for cold index | ||
| 194 | #[derive(Debug, Serialize, Deserialize)] | ||
| 195 | struct SerializableColdIndex { | ||
| 196 | expiry_duration_secs: u64, | ||
| 197 | entries: HashMap<EventId, SerializableColdIndexEntry>, | ||
| 198 | } | ||
| 199 | |||
| 200 | /// Complete rejected cache state for persistence | ||
| 201 | /// | ||
| 202 | /// Stores both hot cache and cold index with version and timestamp information. | ||
| 203 | /// All Instant fields are converted to Duration offsets from saved_at. | ||
| 204 | #[derive(Debug, Serialize, Deserialize)] | ||
| 205 | struct RejectedCacheState { | ||
| 206 | /// Version for future compatibility | ||
| 207 | version: u32, | ||
| 208 | /// When this state was saved | ||
| 209 | saved_at: SystemTime, | ||
| 210 | /// Hot cache entries with full events | ||
| 211 | hot_cache: SerializableHotCache, | ||
| 212 | /// Cold index entries with metadata only | ||
| 213 | cold_index: SerializableColdIndex, | ||
| 214 | } | ||
| 215 | |||
| 157 | /// Hot cache: Stores full events for immediate re-processing | 216 | /// Hot cache: Stores full events for immediate re-processing |
| 158 | /// | 217 | /// |
| 159 | /// Events are stored for a short duration (default: 2 minutes) to enable | 218 | /// Events are stored for a short duration (default: 2 minutes) to enable |
| @@ -603,6 +662,168 @@ impl RejectedEventsIndex { | |||
| 603 | 662 | ||
| 604 | ids | 663 | ids |
| 605 | } | 664 | } |
| 665 | |||
| 666 | /// Save rejected events cache to disk | ||
| 667 | /// | ||
| 668 | /// Serializes both hot cache and cold index to JSON, converting Instant timestamps | ||
| 669 | /// to Duration offsets from the save time. This allows timestamps to be adjusted | ||
| 670 | /// for downtime when restored. | ||
| 671 | /// | ||
| 672 | /// # Arguments | ||
| 673 | /// | ||
| 674 | /// * `path` - File path to write the serialized state to | ||
| 675 | /// | ||
| 676 | /// # Returns | ||
| 677 | /// | ||
| 678 | /// Ok(()) on success, or an error if serialization or file write fails | ||
| 679 | pub fn save_to_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 680 | let saved_at = SystemTime::now(); | ||
| 681 | let now = Instant::now(); | ||
| 682 | |||
| 683 | // Lock both caches for consistent snapshot | ||
| 684 | let hot_entries = self.hot_cache.entries.read().unwrap(); | ||
| 685 | let cold_entries = self.cold_index.entries.read().unwrap(); | ||
| 686 | |||
| 687 | // Convert hot cache entries to serializable format | ||
| 688 | let serializable_hot_entries: HashMap<EventId, SerializableHotCacheEntry> = hot_entries | ||
| 689 | .iter() | ||
| 690 | .map(|(event_id, entry)| { | ||
| 691 | let cached_at_offset_secs = now.duration_since(entry.cached_at).as_secs(); | ||
| 692 | |||
| 693 | let serializable_entry = SerializableHotCacheEntry { | ||
| 694 | event: entry.event.clone(), | ||
| 695 | pubkey: entry.pubkey, | ||
| 696 | identifier: entry.identifier.clone(), | ||
| 697 | event_type: entry.event_type, | ||
| 698 | reason: entry.reason, | ||
| 699 | cached_at_offset_secs, | ||
| 700 | }; | ||
| 701 | |||
| 702 | (*event_id, serializable_entry) | ||
| 703 | }) | ||
| 704 | .collect(); | ||
| 705 | |||
| 706 | // Convert cold index entries to serializable format | ||
| 707 | let serializable_cold_entries: HashMap<EventId, SerializableColdIndexEntry> = cold_entries | ||
| 708 | .iter() | ||
| 709 | .map(|(event_id, entry)| { | ||
| 710 | let rejected_at_offset_secs = now.duration_since(entry.rejected_at).as_secs(); | ||
| 711 | |||
| 712 | let serializable_entry = SerializableColdIndexEntry { | ||
| 713 | pubkey: entry.pubkey, | ||
| 714 | identifier: entry.identifier.clone(), | ||
| 715 | event_type: entry.event_type, | ||
| 716 | reason: entry.reason, | ||
| 717 | rejected_at_offset_secs, | ||
| 718 | }; | ||
| 719 | |||
| 720 | (*event_id, serializable_entry) | ||
| 721 | }) | ||
| 722 | .collect(); | ||
| 723 | |||
| 724 | // Create complete state | ||
| 725 | let state = RejectedCacheState { | ||
| 726 | version: 1, | ||
| 727 | saved_at, | ||
| 728 | hot_cache: SerializableHotCache { | ||
| 729 | expiry_duration_secs: self.hot_cache.expiry_duration.as_secs(), | ||
| 730 | entries: serializable_hot_entries, | ||
| 731 | }, | ||
| 732 | cold_index: SerializableColdIndex { | ||
| 733 | expiry_duration_secs: self.cold_index.expiry_duration.as_secs(), | ||
| 734 | entries: serializable_cold_entries, | ||
| 735 | }, | ||
| 736 | }; | ||
| 737 | |||
| 738 | // Serialize to JSON and write to file | ||
| 739 | let json = serde_json::to_string_pretty(&state)?; | ||
| 740 | std::fs::write(path, json)?; | ||
| 741 | |||
| 742 | Ok(()) | ||
| 743 | } | ||
| 744 | |||
| 745 | /// Restore rejected events cache from disk | ||
| 746 | /// | ||
| 747 | /// Loads the serialized state from disk and populates both hot cache and cold index. | ||
| 748 | /// Adjusts all timestamps by adding the downtime duration (time since save) to maintain | ||
| 749 | /// correct expiry behavior. Deletes the state file after successful restore. | ||
| 750 | /// | ||
| 751 | /// # Arguments | ||
| 752 | /// | ||
| 753 | /// * `path` - File path to read the serialized state from | ||
| 754 | /// | ||
| 755 | /// # Returns | ||
| 756 | /// | ||
| 757 | /// Ok(()) on success, or an error if file doesn't exist, is corrupted, or restore fails | ||
| 758 | pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 759 | // Load and parse JSON | ||
| 760 | let json = std::fs::read_to_string(path)?; | ||
| 761 | let state: RejectedCacheState = serde_json::from_str(&json)?; | ||
| 762 | |||
| 763 | // Calculate downtime (how long the relay was offline) | ||
| 764 | let now_system = SystemTime::now(); | ||
| 765 | let downtime = now_system | ||
| 766 | .duration_since(state.saved_at) | ||
| 767 | .unwrap_or(Duration::ZERO); | ||
| 768 | |||
| 769 | let now_instant = Instant::now(); | ||
| 770 | |||
| 771 | // Lock both caches for restoration | ||
| 772 | let mut hot_entries = self.hot_cache.entries.write().unwrap(); | ||
| 773 | let mut cold_entries = self.cold_index.entries.write().unwrap(); | ||
| 774 | |||
| 775 | // Restore hot cache entries | ||
| 776 | for (event_id, serializable_entry) in state.hot_cache.entries { | ||
| 777 | // Reconstruct cached_at by extending the offset by downtime | ||
| 778 | // Original offset (how long ago it was cached when saved) | ||
| 779 | let original_offset = Duration::from_secs(serializable_entry.cached_at_offset_secs); | ||
| 780 | // Total offset including downtime | ||
| 781 | let total_offset = original_offset + downtime; | ||
| 782 | |||
| 783 | // cached_at = now - total_offset | ||
| 784 | let cached_at = now_instant - total_offset; | ||
| 785 | |||
| 786 | let entry = HotCacheEntry { | ||
| 787 | event: serializable_entry.event, | ||
| 788 | pubkey: serializable_entry.pubkey, | ||
| 789 | identifier: serializable_entry.identifier, | ||
| 790 | event_type: serializable_entry.event_type, | ||
| 791 | reason: serializable_entry.reason, | ||
| 792 | cached_at, | ||
| 793 | }; | ||
| 794 | |||
| 795 | hot_entries.insert(event_id, entry); | ||
| 796 | } | ||
| 797 | |||
| 798 | // Restore cold index entries | ||
| 799 | for (event_id, serializable_entry) in state.cold_index.entries { | ||
| 800 | // Reconstruct rejected_at by extending the offset by downtime | ||
| 801 | let original_offset = Duration::from_secs(serializable_entry.rejected_at_offset_secs); | ||
| 802 | let total_offset = original_offset + downtime; | ||
| 803 | |||
| 804 | // rejected_at = now - total_offset | ||
| 805 | let rejected_at = now_instant - total_offset; | ||
| 806 | |||
| 807 | let entry = ColdIndexEntry { | ||
| 808 | pubkey: serializable_entry.pubkey, | ||
| 809 | identifier: serializable_entry.identifier, | ||
| 810 | event_type: serializable_entry.event_type, | ||
| 811 | reason: serializable_entry.reason, | ||
| 812 | rejected_at, | ||
| 813 | }; | ||
| 814 | |||
| 815 | cold_entries.insert(event_id, entry); | ||
| 816 | } | ||
| 817 | |||
| 818 | // Release locks before deleting file | ||
| 819 | drop(hot_entries); | ||
| 820 | drop(cold_entries); | ||
| 821 | |||
| 822 | // Delete the state file after successful restore | ||
| 823 | std::fs::remove_file(path)?; | ||
| 824 | |||
| 825 | Ok(()) | ||
| 826 | } | ||
| 606 | } | 827 | } |
| 607 | 828 | ||
| 608 | #[cfg(test)] | 829 | #[cfg(test)] |
| @@ -956,4 +1177,503 @@ mod tests { | |||
| 956 | // Cold index now empty | 1177 | // Cold index now empty |
| 957 | assert_eq!(index.cold_index_len(), 0); | 1178 | assert_eq!(index.cold_index_len(), 0); |
| 958 | } | 1179 | } |
| 1180 | |||
| 1181 | // ======================================================================== | ||
| 1182 | // Persistence Serialization Tests | ||
| 1183 | // ======================================================================== | ||
| 1184 | |||
| 1185 | #[tokio::test] | ||
| 1186 | async fn test_save_and_restore_hot_cache_roundtrip() { | ||
| 1187 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1188 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1189 | |||
| 1190 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1191 | let event = create_test_event().await; | ||
| 1192 | let pubkey = event.pubkey; | ||
| 1193 | let identifier = "test-repo".to_string(); | ||
| 1194 | |||
| 1195 | // Add event to hot cache | ||
| 1196 | index.add_announcement( | ||
| 1197 | event.clone(), | ||
| 1198 | pubkey, | ||
| 1199 | identifier.clone(), | ||
| 1200 | RejectionReason::DoesNotListService, | ||
| 1201 | ); | ||
| 1202 | |||
| 1203 | assert_eq!(index.hot_cache_len(), 1); | ||
| 1204 | assert_eq!(index.cold_index_len(), 1); | ||
| 1205 | |||
| 1206 | // Save to disk | ||
| 1207 | index.save_to_disk(&state_path).unwrap(); | ||
| 1208 | assert!(state_path.exists()); | ||
| 1209 | |||
| 1210 | // Create new index and restore | ||
| 1211 | let index2 = | ||
| 1212 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1213 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1214 | |||
| 1215 | // Verify state file was deleted after restore | ||
| 1216 | assert!(!state_path.exists()); | ||
| 1217 | |||
| 1218 | // Verify hot cache restored | ||
| 1219 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1220 | assert!(index2.hot_cache.contains(&event.id)); | ||
| 1221 | |||
| 1222 | // Verify cold index restored | ||
| 1223 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1224 | assert!(index2.cold_index.contains(&event.id)); | ||
| 1225 | |||
| 1226 | // Verify we can retrieve the event | ||
| 1227 | let events = index2 | ||
| 1228 | .hot_cache | ||
| 1229 | .get_maintainer_events(&pubkey, &identifier, None); | ||
| 1230 | assert_eq!(events.len(), 1); | ||
| 1231 | assert_eq!(events[0].id, event.id); | ||
| 1232 | } | ||
| 1233 | |||
| 1234 | #[tokio::test] | ||
| 1235 | async fn test_save_and_restore_cold_index_only() { | ||
| 1236 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1237 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1238 | |||
| 1239 | let index = RejectedEventsIndex::new( | ||
| 1240 | Duration::from_millis(50), // Hot cache expires quickly | ||
| 1241 | Duration::from_secs(604800), // Cold index lasts long | ||
| 1242 | ); | ||
| 1243 | let event = create_test_event().await; | ||
| 1244 | |||
| 1245 | // Add event | ||
| 1246 | index.add_announcement( | ||
| 1247 | event.clone(), | ||
| 1248 | event.pubkey, | ||
| 1249 | "test-repo".to_string(), | ||
| 1250 | RejectionReason::MaintainerNotYetValid, | ||
| 1251 | ); | ||
| 1252 | |||
| 1253 | // Wait for hot cache to expire | ||
| 1254 | std::thread::sleep(Duration::from_millis(60)); | ||
| 1255 | index.cleanup_expired_for_type("announcement"); | ||
| 1256 | |||
| 1257 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1258 | assert_eq!(index.cold_index_len(), 1); | ||
| 1259 | |||
| 1260 | // Save to disk | ||
| 1261 | index.save_to_disk(&state_path).unwrap(); | ||
| 1262 | |||
| 1263 | // Restore into new index | ||
| 1264 | let index2 = | ||
| 1265 | RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800)); | ||
| 1266 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1267 | |||
| 1268 | // Verify only cold index restored (hot cache was empty) | ||
| 1269 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 1270 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1271 | assert!(index2.cold_index.contains(&event.id)); | ||
| 1272 | } | ||
| 1273 | |||
| 1274 | #[tokio::test] | ||
| 1275 | async fn test_save_and_restore_both_hot_and_cold() { | ||
| 1276 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1277 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1278 | |||
| 1279 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1280 | let keys = Keys::generate(); | ||
| 1281 | |||
| 1282 | // Create two events | ||
| 1283 | let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); | ||
| 1284 | let event1 = keys.sign_event(unsigned1).await.unwrap(); | ||
| 1285 | |||
| 1286 | let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); | ||
| 1287 | let event2 = keys.sign_event(unsigned2).await.unwrap(); | ||
| 1288 | |||
| 1289 | // Add both events | ||
| 1290 | index.add_announcement( | ||
| 1291 | event1.clone(), | ||
| 1292 | event1.pubkey, | ||
| 1293 | "repo1".to_string(), | ||
| 1294 | RejectionReason::DoesNotListService, | ||
| 1295 | ); | ||
| 1296 | |||
| 1297 | index.add_state( | ||
| 1298 | event2.clone(), | ||
| 1299 | event2.pubkey, | ||
| 1300 | "repo2".to_string(), | ||
| 1301 | RejectionReason::Other, | ||
| 1302 | ); | ||
| 1303 | |||
| 1304 | assert_eq!(index.hot_cache_len(), 2); | ||
| 1305 | assert_eq!(index.cold_index_len(), 2); | ||
| 1306 | |||
| 1307 | // Save to disk | ||
| 1308 | index.save_to_disk(&state_path).unwrap(); | ||
| 1309 | |||
| 1310 | // Restore into new index | ||
| 1311 | let index2 = | ||
| 1312 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1313 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1314 | |||
| 1315 | // Verify both caches restored | ||
| 1316 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1317 | assert_eq!(index2.cold_index_len(), 2); | ||
| 1318 | assert!(index2.contains(&event1.id)); | ||
| 1319 | assert!(index2.contains(&event2.id)); | ||
| 1320 | } | ||
| 1321 | |||
| 1322 | #[tokio::test] | ||
| 1323 | async fn test_save_and_restore_empty_cache() { | ||
| 1324 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1325 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1326 | |||
| 1327 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1328 | |||
| 1329 | // Save empty cache | ||
| 1330 | index.save_to_disk(&state_path).unwrap(); | ||
| 1331 | assert!(state_path.exists()); | ||
| 1332 | |||
| 1333 | // Restore into new index | ||
| 1334 | let index2 = | ||
| 1335 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1336 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1337 | |||
| 1338 | // Verify empty state restored | ||
| 1339 | assert_eq!(index2.hot_cache_len(), 0); | ||
| 1340 | assert_eq!(index2.cold_index_len(), 0); | ||
| 1341 | } | ||
| 1342 | |||
| 1343 | #[tokio::test] | ||
| 1344 | async fn test_restore_missing_file() { | ||
| 1345 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1346 | let state_path = temp_dir.path().join("nonexistent.json"); | ||
| 1347 | |||
| 1348 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1349 | |||
| 1350 | // Attempting to restore missing file should return error | ||
| 1351 | let result = index.restore_from_disk(&state_path); | ||
| 1352 | assert!(result.is_err()); | ||
| 1353 | |||
| 1354 | // Index should remain empty | ||
| 1355 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1356 | assert_eq!(index.cold_index_len(), 0); | ||
| 1357 | } | ||
| 1358 | |||
| 1359 | #[tokio::test] | ||
| 1360 | async fn test_restore_corrupted_json() { | ||
| 1361 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1362 | let state_path = temp_dir.path().join("corrupted.json"); | ||
| 1363 | |||
| 1364 | // Write corrupted JSON | ||
| 1365 | std::fs::write(&state_path, "{ invalid json !!!").unwrap(); | ||
| 1366 | |||
| 1367 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1368 | |||
| 1369 | // Attempting to restore corrupted file should return error | ||
| 1370 | let result = index.restore_from_disk(&state_path); | ||
| 1371 | assert!(result.is_err()); | ||
| 1372 | |||
| 1373 | // Index should remain empty | ||
| 1374 | assert_eq!(index.hot_cache_len(), 0); | ||
| 1375 | assert_eq!(index.cold_index_len(), 0); | ||
| 1376 | } | ||
| 1377 | |||
| 1378 | #[tokio::test] | ||
| 1379 | async fn test_file_cleanup_after_successful_restore() { | ||
| 1380 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1381 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1382 | |||
| 1383 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1384 | let event = create_test_event().await; | ||
| 1385 | |||
| 1386 | index.add_announcement( | ||
| 1387 | event.clone(), | ||
| 1388 | event.pubkey, | ||
| 1389 | "test-repo".to_string(), | ||
| 1390 | RejectionReason::DoesNotListService, | ||
| 1391 | ); | ||
| 1392 | |||
| 1393 | // Save to disk | ||
| 1394 | index.save_to_disk(&state_path).unwrap(); | ||
| 1395 | assert!(state_path.exists()); | ||
| 1396 | |||
| 1397 | // Restore | ||
| 1398 | let index2 = | ||
| 1399 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1400 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1401 | |||
| 1402 | // File should be deleted after successful restore | ||
| 1403 | assert!(!state_path.exists()); | ||
| 1404 | } | ||
| 1405 | |||
| 1406 | #[tokio::test] | ||
| 1407 | async fn test_downtime_calculation_preserves_expiry() { | ||
| 1408 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1409 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1410 | |||
| 1411 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1412 | let event = create_test_event().await; | ||
| 1413 | |||
| 1414 | index.add_announcement( | ||
| 1415 | event.clone(), | ||
| 1416 | event.pubkey, | ||
| 1417 | "test-repo".to_string(), | ||
| 1418 | RejectionReason::DoesNotListService, | ||
| 1419 | ); | ||
| 1420 | |||
| 1421 | // Save to disk | ||
| 1422 | index.save_to_disk(&state_path).unwrap(); | ||
| 1423 | |||
| 1424 | // Simulate downtime by sleeping | ||
| 1425 | std::thread::sleep(Duration::from_millis(100)); | ||
| 1426 | |||
| 1427 | // Restore | ||
| 1428 | let index2 = | ||
| 1429 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1430 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1431 | |||
| 1432 | // Event should still be in both caches (downtime accounted for) | ||
| 1433 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1434 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1435 | assert!(index2.contains(&event.id)); | ||
| 1436 | } | ||
| 1437 | |||
| 1438 | #[tokio::test] | ||
| 1439 | async fn test_entries_expired_during_downtime() { | ||
| 1440 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1441 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1442 | |||
| 1443 | // Create index with very short expiry | ||
| 1444 | let index = RejectedEventsIndex::new( | ||
| 1445 | Duration::from_millis(100), // Hot cache: 100ms | ||
| 1446 | Duration::from_millis(200), // Cold index: 200ms | ||
| 1447 | ); | ||
| 1448 | let event = create_test_event().await; | ||
| 1449 | |||
| 1450 | index.add_announcement( | ||
| 1451 | event.clone(), | ||
| 1452 | event.pubkey, | ||
| 1453 | "test-repo".to_string(), | ||
| 1454 | RejectionReason::DoesNotListService, | ||
| 1455 | ); | ||
| 1456 | |||
| 1457 | // Save to disk | ||
| 1458 | index.save_to_disk(&state_path).unwrap(); | ||
| 1459 | |||
| 1460 | // Simulate downtime longer than hot cache expiry | ||
| 1461 | std::thread::sleep(Duration::from_millis(150)); | ||
| 1462 | |||
| 1463 | // Restore | ||
| 1464 | let index2 = | ||
| 1465 | RejectedEventsIndex::new(Duration::from_millis(100), Duration::from_millis(200)); | ||
| 1466 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1467 | |||
| 1468 | // Hot cache entry should have expired during downtime | ||
| 1469 | // Cold index should still have it (200ms expiry) | ||
| 1470 | assert_eq!(index2.hot_cache_len(), 1); | ||
| 1471 | assert_eq!(index2.cold_index_len(), 1); | ||
| 1472 | |||
| 1473 | // But when we try to get it, hot cache will see it's expired | ||
| 1474 | let events = index2 | ||
| 1475 | .hot_cache | ||
| 1476 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1477 | assert_eq!(events.len(), 0); // Expired! | ||
| 1478 | |||
| 1479 | // Cleanup should remove it | ||
| 1480 | let (hot_expired, cold_expired) = index2.cleanup_expired_for_type("announcement"); | ||
| 1481 | assert_eq!(hot_expired, 1); | ||
| 1482 | assert_eq!(cold_expired, 0); // Not expired yet | ||
| 1483 | } | ||
| 1484 | |||
| 1485 | #[tokio::test] | ||
| 1486 | async fn test_hot_cache_different_event_types() { | ||
| 1487 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1488 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1489 | |||
| 1490 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1491 | let keys = Keys::generate(); | ||
| 1492 | |||
| 1493 | // Create announcement event | ||
| 1494 | let unsigned_ann = | ||
| 1495 | nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); | ||
| 1496 | let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); | ||
| 1497 | |||
| 1498 | // Create state event | ||
| 1499 | let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); | ||
| 1500 | let event_state = keys.sign_event(unsigned_state).await.unwrap(); | ||
| 1501 | |||
| 1502 | // Add both types | ||
| 1503 | index.add_announcement( | ||
| 1504 | event_ann.clone(), | ||
| 1505 | event_ann.pubkey, | ||
| 1506 | "test-repo".to_string(), | ||
| 1507 | RejectionReason::DoesNotListService, | ||
| 1508 | ); | ||
| 1509 | |||
| 1510 | index.add_state( | ||
| 1511 | event_state.clone(), | ||
| 1512 | event_state.pubkey, | ||
| 1513 | "test-repo".to_string(), | ||
| 1514 | RejectionReason::Other, | ||
| 1515 | ); | ||
| 1516 | |||
| 1517 | // Save and restore | ||
| 1518 | index.save_to_disk(&state_path).unwrap(); | ||
| 1519 | let index2 = | ||
| 1520 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1521 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1522 | |||
| 1523 | // Verify both event types restored | ||
| 1524 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1525 | assert!(index2.contains(&event_ann.id)); | ||
| 1526 | assert!(index2.contains(&event_state.id)); | ||
| 1527 | |||
| 1528 | // Verify we can filter by type | ||
| 1529 | let (removed, events) = index2.invalidate_and_get( | ||
| 1530 | &event_ann.pubkey, | ||
| 1531 | "test-repo", | ||
| 1532 | Some(EventType::Announcement), | ||
| 1533 | ); | ||
| 1534 | assert_eq!(removed, 1); | ||
| 1535 | assert_eq!(events.len(), 1); | ||
| 1536 | assert_eq!(events[0].id, event_ann.id); | ||
| 1537 | } | ||
| 1538 | |||
| 1539 | #[tokio::test] | ||
| 1540 | async fn test_cold_index_different_rejection_reasons() { | ||
| 1541 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1542 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1543 | |||
| 1544 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1545 | let keys = Keys::generate(); | ||
| 1546 | |||
| 1547 | // Create events with different rejection reasons | ||
| 1548 | let unsigned1 = nostr_sdk::EventBuilder::text_note("event1").build(keys.public_key()); | ||
| 1549 | let event1 = keys.sign_event(unsigned1).await.unwrap(); | ||
| 1550 | |||
| 1551 | let unsigned2 = nostr_sdk::EventBuilder::text_note("event2").build(keys.public_key()); | ||
| 1552 | let event2 = keys.sign_event(unsigned2).await.unwrap(); | ||
| 1553 | |||
| 1554 | let unsigned3 = nostr_sdk::EventBuilder::text_note("event3").build(keys.public_key()); | ||
| 1555 | let event3 = keys.sign_event(unsigned3).await.unwrap(); | ||
| 1556 | |||
| 1557 | // Add with different rejection reasons | ||
| 1558 | index.add_announcement( | ||
| 1559 | event1.clone(), | ||
| 1560 | event1.pubkey, | ||
| 1561 | "repo1".to_string(), | ||
| 1562 | RejectionReason::DoesNotListService, | ||
| 1563 | ); | ||
| 1564 | |||
| 1565 | index.add_announcement( | ||
| 1566 | event2.clone(), | ||
| 1567 | event2.pubkey, | ||
| 1568 | "repo2".to_string(), | ||
| 1569 | RejectionReason::MaintainerNotYetValid, | ||
| 1570 | ); | ||
| 1571 | |||
| 1572 | index.add_announcement( | ||
| 1573 | event3.clone(), | ||
| 1574 | event3.pubkey, | ||
| 1575 | "repo3".to_string(), | ||
| 1576 | RejectionReason::Other, | ||
| 1577 | ); | ||
| 1578 | |||
| 1579 | // Save and restore | ||
| 1580 | index.save_to_disk(&state_path).unwrap(); | ||
| 1581 | let index2 = | ||
| 1582 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1583 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1584 | |||
| 1585 | // Verify all entries restored with their rejection reasons | ||
| 1586 | assert_eq!(index2.cold_index_len(), 3); | ||
| 1587 | assert!(index2.contains(&event1.id)); | ||
| 1588 | assert!(index2.contains(&event2.id)); | ||
| 1589 | assert!(index2.contains(&event3.id)); | ||
| 1590 | } | ||
| 1591 | |||
| 1592 | #[tokio::test] | ||
| 1593 | async fn test_multiple_save_restore_cycles() { | ||
| 1594 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1595 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1596 | |||
| 1597 | // First cycle | ||
| 1598 | let index1 = | ||
| 1599 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1600 | let event1 = create_test_event().await; | ||
| 1601 | |||
| 1602 | index1.add_announcement( | ||
| 1603 | event1.clone(), | ||
| 1604 | event1.pubkey, | ||
| 1605 | "repo1".to_string(), | ||
| 1606 | RejectionReason::DoesNotListService, | ||
| 1607 | ); | ||
| 1608 | |||
| 1609 | index1.save_to_disk(&state_path).unwrap(); | ||
| 1610 | |||
| 1611 | // Second cycle - restore and add more | ||
| 1612 | let index2 = | ||
| 1613 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1614 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1615 | |||
| 1616 | let event2 = create_test_event().await; | ||
| 1617 | index2.add_announcement( | ||
| 1618 | event2.clone(), | ||
| 1619 | event2.pubkey, | ||
| 1620 | "repo2".to_string(), | ||
| 1621 | RejectionReason::MaintainerNotYetValid, | ||
| 1622 | ); | ||
| 1623 | |||
| 1624 | assert_eq!(index2.hot_cache_len(), 2); | ||
| 1625 | index2.save_to_disk(&state_path).unwrap(); | ||
| 1626 | |||
| 1627 | // Third cycle - restore again | ||
| 1628 | let index3 = | ||
| 1629 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 1630 | index3.restore_from_disk(&state_path).unwrap(); | ||
| 1631 | |||
| 1632 | // Verify both events survived multiple cycles | ||
| 1633 | assert_eq!(index3.hot_cache_len(), 2); | ||
| 1634 | assert!(index3.contains(&event1.id)); | ||
| 1635 | assert!(index3.contains(&event2.id)); | ||
| 1636 | } | ||
| 1637 | |||
| 1638 | #[tokio::test] | ||
| 1639 | async fn test_restore_preserves_remaining_ttl() { | ||
| 1640 | let temp_dir = tempfile::tempdir().unwrap(); | ||
| 1641 | let state_path = temp_dir.path().join("rejected_cache.json"); | ||
| 1642 | |||
| 1643 | // Create index with 2 second hot cache expiry | ||
| 1644 | let index = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); | ||
| 1645 | let event = create_test_event().await; | ||
| 1646 | |||
| 1647 | index.add_announcement( | ||
| 1648 | event.clone(), | ||
| 1649 | event.pubkey, | ||
| 1650 | "test-repo".to_string(), | ||
| 1651 | RejectionReason::DoesNotListService, | ||
| 1652 | ); | ||
| 1653 | |||
| 1654 | // Wait 200ms (small fraction of TTL) | ||
| 1655 | std::thread::sleep(Duration::from_millis(200)); | ||
| 1656 | |||
| 1657 | // Save to disk | ||
| 1658 | index.save_to_disk(&state_path).unwrap(); | ||
| 1659 | |||
| 1660 | // Immediately restore (minimal downtime) | ||
| 1661 | let index2 = RejectedEventsIndex::new(Duration::from_secs(2), Duration::from_secs(604800)); | ||
| 1662 | index2.restore_from_disk(&state_path).unwrap(); | ||
| 1663 | |||
| 1664 | // Event should still be retrievable (has ~1.8s remaining) | ||
| 1665 | let events = index2 | ||
| 1666 | .hot_cache | ||
| 1667 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1668 | assert_eq!(events.len(), 1); | ||
| 1669 | |||
| 1670 | // Wait 2 seconds (total 2.2s > 2s expiry) | ||
| 1671 | std::thread::sleep(Duration::from_secs(2)); | ||
| 1672 | |||
| 1673 | // Now it should be expired | ||
| 1674 | let events = index2 | ||
| 1675 | .hot_cache | ||
| 1676 | .get_maintainer_events(&event.pubkey, "test-repo", None); | ||
| 1677 | assert_eq!(events.len(), 0); | ||
| 1678 | } | ||
| 959 | } | 1679 | } |