From b6c70f765dd02fb0297888d671e455df33d6fcb4 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 14 Jan 2026 10:18:47 +0000 Subject: feat(purgatory): add persistence to survive relay restarts Implement save/restore functionality for purgatory state to prevent event loss during relay restarts. Events in purgatory (state events, PR events, and expired events) are now saved to disk on graceful shutdown and restored on startup. Key features: - Serialize purgatory state to JSON (purgatory-state.json) - Time conversion helpers for Instant <-> Duration serialization - Restore with downtime adjustment (preserves remaining TTL) - Graceful degradation (missing/corrupted files don't crash) - File cleanup after successful restore - get_all_identifiers() for re-queueing after restore Files: - src/purgatory/persistence.rs: Time conversion helpers - src/purgatory/types.rs: Serialization derives - src/purgatory/mod.rs: save_to_disk/restore_from_disk methods Tests: 15 unit tests covering serialization, downtime, edge cases --- src/purgatory/mod.rs | 925 ++++++++++++++++++++++++++++++++++++++++++- src/purgatory/persistence.rs | 198 +++++++++ src/purgatory/types.rs | 20 +- 3 files changed, 1139 insertions(+), 4 deletions(-) create mode 100644 src/purgatory/persistence.rs diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 20df19b..47798a6 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -12,6 +12,7 @@ //! - **Separate stores**: State events and PR events use different indexing strategies mod helpers; +pub mod persistence; pub mod sync; mod types; @@ -20,10 +21,12 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; use dashmap::DashMap; use nostr_sdk::prelude::*; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::collections::HashSet; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; pub use sync::SyncQueueEntry; @@ -38,6 +41,63 @@ const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); /// Used for batching burst arrivals during negentropy sync. const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); +/// Serializable wrapper for `StatePurgatoryEntry` with time offsets. +/// +/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp +/// in `PurgatoryState`, allowing state to be persisted and restored across restarts. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SerializableStatePurgatoryEntry { + /// The nostr state event (kind 30618) awaiting git data + event: Event, + /// The repository identifier from the event's 'd' tag + identifier: String, + /// Event author pubkey + author: PublicKey, + /// Duration offset from saved_at for created_at + created_at_offset_secs: u64, + /// Duration offset from saved_at for expires_at + expires_at_offset_secs: u64, +} + +/// Serializable wrapper for `PrPurgatoryEntry` with time offsets. +/// +/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp +/// in `PurgatoryState`, allowing state to be persisted and restored across restarts. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SerializablePrPurgatoryEntry { + /// The nostr PR event, if received (None = git data arrived first) + event: Option, + /// The expected commit SHA from 'c' tag (if event exists) + /// or the actual commit pushed (if git arrived first) + commit: String, + /// Duration offset from saved_at for created_at + created_at_offset_secs: u64, + /// Duration offset from saved_at for expires_at + expires_at_offset_secs: u64, +} + +/// Serializable purgatory state for disk persistence. +/// +/// Contains all purgatory data needed to restore state across restarts: +/// - State events (indexed by identifier) +/// - PR events (indexed by event ID) +/// - Expired events (to prevent re-sync loops) +/// - Version number for future compatibility +/// - Saved timestamp for downtime calculation +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PurgatoryState { + /// Version number for state format (currently 1) + version: u32, + /// When this state was saved to disk + saved_at: SystemTime, + /// State events indexed by repository identifier + state_events: HashMap>, + /// PR events indexed by event ID (hex string) + pr_events: HashMap, + /// Expired event IDs with their expiry timestamps + expired_events: HashMap, +} + /// Main purgatory structure holding events awaiting git data. /// /// Provides thread-safe concurrent access to two separate stores: @@ -667,6 +727,260 @@ impl Purgatory { pub fn sync_queue_size(&self) -> usize { self.sync_queue.len() } + + /// Get all repository identifiers currently in purgatory. + /// + /// Returns a list of all unique repository identifiers that have state events + /// in purgatory. This is useful for re-queueing repositories after restore. + /// + /// # Returns + /// Vector of repository identifiers (e.g., "owner/repo") + /// + /// # Example + /// ```no_run + /// use ngit_grasp::purgatory::Purgatory; + /// use std::path::PathBuf; + /// + /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); + /// let identifiers = purgatory.get_all_identifiers(); + /// for id in identifiers { + /// println!("Repository in purgatory: {}", id); + /// } + /// ``` + pub fn get_all_identifiers(&self) -> Vec { + self.state_events + .iter() + .map(|entry| entry.key().clone()) + .collect() + } + + /// Save purgatory state to disk. + /// + /// Serializes the current purgatory state (state_events, pr_events, expired_events) + /// to JSON and saves it to the specified path. Time-based fields (`Instant`) are + /// converted to duration offsets from the current `SystemTime` for persistence. + /// + /// Note: The sync_queue is NOT persisted - it will be rebuilt when events are + /// restored from disk. + /// + /// # Arguments + /// * `path` - Path to save the state file + /// + /// # Returns + /// Ok(()) on success, Err on failure + /// + /// # Example + /// ```no_run + /// use ngit_grasp::purgatory::Purgatory; + /// use std::path::PathBuf; + /// + /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); + /// purgatory.save_to_disk(&PathBuf::from("/tmp/purgatory.json")).unwrap(); + /// ``` + pub fn save_to_disk(&self, path: &Path) -> Result<(), Box> { + let saved_at = SystemTime::now(); + let now_instant = Instant::now(); + + // Convert state_events to serializable format + let mut state_events = HashMap::new(); + for entry in self.state_events.iter() { + let identifier = entry.key().clone(); + let entries: Vec = entry + .value() + .iter() + .map(|e| { + let created_offset = + persistence::instant_to_offset(e.created_at, saved_at, now_instant); + let expires_offset = + persistence::instant_to_offset(e.expires_at, saved_at, now_instant); + + SerializableStatePurgatoryEntry { + event: e.event.clone(), + identifier: e.identifier.clone(), + author: e.author, + created_at_offset_secs: created_offset.as_secs(), + expires_at_offset_secs: expires_offset.as_secs(), + } + }) + .collect(); + state_events.insert(identifier, entries); + } + + // Convert pr_events to serializable format + let mut pr_events = HashMap::new(); + for entry in self.pr_events.iter() { + let event_id = entry.key().clone(); + let e = entry.value(); + + let created_offset = + persistence::instant_to_offset(e.created_at, saved_at, now_instant); + let expires_offset = + persistence::instant_to_offset(e.expires_at, saved_at, now_instant); + + let serializable = SerializablePrPurgatoryEntry { + event: e.event.clone(), + commit: e.commit.clone(), + created_at_offset_secs: created_offset.as_secs(), + expires_at_offset_secs: expires_offset.as_secs(), + }; + pr_events.insert(event_id, serializable); + } + + // Convert expired_events to serializable format + // We use SystemTime instead of Instant offsets for expired events since + // we don't need high precision for cleanup timing + let mut expired_events = HashMap::new(); + for entry in self.expired_events.iter() { + let event_id = entry.key().to_hex(); + // Convert Instant to SystemTime (approximate) + let expired_at_instant = *entry.value(); + let elapsed_since_expire = now_instant.saturating_duration_since(expired_at_instant); + let expired_at_system = saved_at - elapsed_since_expire; + expired_events.insert(event_id, expired_at_system); + } + + // Create state structure + let state = PurgatoryState { + version: 1, + saved_at, + state_events, + pr_events, + expired_events, + }; + + // Serialize to JSON and write to file + let json = serde_json::to_string_pretty(&state)?; + std::fs::write(path, json)?; + + tracing::info!( + path = %path.display(), + state_events = state.state_events.len(), + pr_events = state.pr_events.len(), + expired_events = state.expired_events.len(), + "Saved purgatory state to disk" + ); + + Ok(()) + } + + /// Restore purgatory state from disk. + /// + /// Loads a previously saved purgatory state from the specified path and populates + /// the current purgatory instance. Adjusts time-based fields to account for downtime + /// between save and restore. + /// + /// After successful restore, the state file is deleted to prevent accidental + /// double-restore. + /// + /// # Arguments + /// * `path` - Path to the saved state file + /// + /// # Returns + /// Ok(()) on success, Err if file doesn't exist or is corrupted + /// + /// # Example + /// ```no_run + /// use ngit_grasp::purgatory::Purgatory; + /// use std::path::PathBuf; + /// + /// let purgatory = Purgatory::new(PathBuf::from("/tmp/git")); + /// match purgatory.restore_from_disk(&PathBuf::from("/tmp/purgatory.json")) { + /// Ok(()) => println!("State restored successfully"), + /// Err(e) => eprintln!("Failed to restore state: {}", e), + /// } + /// ``` + pub fn restore_from_disk(&self, path: &Path) -> Result<(), Box> { + // Read and parse state file + let json = std::fs::read_to_string(path)?; + let state: PurgatoryState = serde_json::from_str(&json)?; + + // Verify version + if state.version != 1 { + return Err(format!("Unsupported state version: {}", state.version).into()); + } + + let now_instant = Instant::now(); + + // Restore state_events + for (identifier, entries) in state.state_events { + let restored_entries: Vec = entries + .into_iter() + .map(|e| { + let created_at = persistence::offset_to_instant( + Duration::from_secs(e.created_at_offset_secs), + state.saved_at, + now_instant, + ); + let expires_at = persistence::offset_to_instant( + Duration::from_secs(e.expires_at_offset_secs), + state.saved_at, + now_instant, + ); + + StatePurgatoryEntry { + event: e.event, + identifier: e.identifier, + author: e.author, + created_at, + expires_at, + } + }) + .collect(); + + self.state_events.insert(identifier, restored_entries); + } + + // Restore pr_events + for (event_id, e) in state.pr_events { + let created_at = persistence::offset_to_instant( + Duration::from_secs(e.created_at_offset_secs), + state.saved_at, + now_instant, + ); + let expires_at = persistence::offset_to_instant( + Duration::from_secs(e.expires_at_offset_secs), + state.saved_at, + now_instant, + ); + + let entry = PrPurgatoryEntry { + event: e.event, + commit: e.commit, + created_at, + expires_at, + }; + + self.pr_events.insert(event_id, entry); + } + + // Restore expired_events + for (event_id_hex, expired_at_system) in state.expired_events { + if let Ok(event_id) = EventId::from_hex(&event_id_hex) { + // Convert SystemTime back to Instant (approximate) + let elapsed_since_expire = SystemTime::now() + .duration_since(expired_at_system) + .unwrap_or(Duration::ZERO); + let expired_at_instant = now_instant - elapsed_since_expire; + + self.expired_events.insert(event_id, expired_at_instant); + } + } + + tracing::info!( + path = %path.display(), + state_events = self.state_events.len(), + pr_events = self.pr_events.len(), + expired_events = self.expired_events.len(), + saved_at = ?state.saved_at, + "Restored purgatory state from disk" + ); + + // Delete state file after successful restore + std::fs::remove_file(path)?; + tracing::debug!(path = %path.display(), "Deleted state file after restore"); + + Ok(()) + } } #[cfg(test)] @@ -1249,3 +1563,610 @@ fn test_user_can_resubmit_expired_event() { // - Skip the expired check for user-submitted events // - Allow the event to be re-added to purgatory or accepted if git data now exists } + +// ============================================================================ +// Persistence Serialization Tests +// ============================================================================ + +#[tokio::test] +async fn test_save_and_restore_state_events() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // Add multiple state events for the same identifier + let event1 = EventBuilder::text_note("state event 1") + .sign_with_keys(&keys) + .unwrap(); + let event2 = EventBuilder::text_note("state event 2") + .sign_with_keys(&keys) + .unwrap(); + + let event1_id = event1.id; + let event2_id = event2.id; + + purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); + purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Verify file exists + assert!(state_file.exists()); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify file was deleted after restore + assert!(!state_file.exists()); + + // Verify state events were restored + let (state_count, _) = purgatory2.count(); + assert_eq!(state_count, 2); + + let restored_entries = purgatory2.find_state("test-repo"); + assert_eq!(restored_entries.len(), 2); + + // Verify event IDs match + let restored_ids: Vec = restored_entries.iter().map(|e| e.event.id).collect(); + assert!(restored_ids.contains(&event1_id)); + assert!(restored_ids.contains(&event2_id)); + + // Verify identifiers and authors match + for entry in &restored_entries { + assert_eq!(entry.identifier, "test-repo"); + assert_eq!(entry.author, keys.public_key()); + } +} + +#[tokio::test] +async fn test_save_and_restore_pr_events() { + use nostr_sdk::{Kind, Tag, TagKind}; + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // Add PR event with actual event + let tags = vec![Tag::custom( + TagKind::Custom("a".into()), + vec!["30617:abc123:test-repo".to_string()], + )]; + + let pr_event = EventBuilder::new(Kind::from(1618), "PR content") + .tags(tags) + .sign_with_keys(&keys) + .unwrap(); + + let pr_event_id = pr_event.id; + + purgatory.add_pr( + pr_event.clone(), + "pr-event-id".to_string(), + "commit-abc".to_string(), + ); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify PR event was restored + let (_, pr_count) = purgatory2.count(); + assert_eq!(pr_count, 1); + + let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); + assert!(restored_entry.event.is_some()); + assert_eq!(restored_entry.event.unwrap().id, pr_event_id); + assert_eq!(restored_entry.commit, "commit-abc"); +} + +#[tokio::test] +async fn test_save_and_restore_pr_placeholders() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + + // Add PR placeholder (git data arrived first) + purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-def".to_string()); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify placeholder was restored + let (_, pr_count) = purgatory2.count(); + assert_eq!(pr_count, 1); + + let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); + assert!(restored_entry.event.is_none()); // Still a placeholder + assert_eq!(restored_entry.commit, "commit-def"); + + // Verify it's findable as a placeholder + assert_eq!( + purgatory2.find_pr_placeholder("placeholder-id"), + Some("commit-def".to_string()) + ); +} + +#[tokio::test] +async fn test_save_and_restore_expired_events() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + let event = EventBuilder::text_note("test") + .sign_with_keys(&keys) + .unwrap(); + let event_id = event.id; + + // Add and expire event + purgatory.add_state(event, "repo".to_string(), keys.public_key()); + if let Some(mut entries) = purgatory.state_events.get_mut("repo") { + for entry in entries.iter_mut() { + entry.expires_at = Instant::now() - Duration::from_secs(1); + } + } + purgatory.cleanup(); + + // Verify event is marked as expired + assert!(purgatory.is_expired(&event_id)); + assert_eq!(purgatory.expired_count(), 1); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify expired event was restored + assert!(purgatory2.is_expired(&event_id)); + assert_eq!(purgatory2.expired_count(), 1); + + // Verify it's included in event_ids() + let ids = purgatory2.event_ids(); + assert!(ids.contains(&event_id)); +} + +#[tokio::test] +async fn test_save_and_restore_empty_purgatory() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + + // Save empty purgatory + purgatory.save_to_disk(&state_file).unwrap(); + + // Verify file exists + assert!(state_file.exists()); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify purgatory is still empty + let (state_count, pr_count) = purgatory2.count(); + assert_eq!(state_count, 0); + assert_eq!(pr_count, 0); + assert_eq!(purgatory2.expired_count(), 0); +} + +#[tokio::test] +async fn test_restore_missing_file() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("nonexistent.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + + // Attempting to restore from missing file should error + let result = purgatory.restore_from_disk(&state_file); + assert!(result.is_err()); + + // Purgatory should remain empty + let (state_count, pr_count) = purgatory.count(); + assert_eq!(state_count, 0); + assert_eq!(pr_count, 0); +} + +#[tokio::test] +async fn test_restore_corrupted_json() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("corrupted.json"); + + // Write invalid JSON + std::fs::write(&state_file, "{ this is not valid json }").unwrap(); + + let purgatory = Purgatory::new(PathBuf::new()); + + // Attempting to restore corrupted file should error + let result = purgatory.restore_from_disk(&state_file); + assert!(result.is_err()); + + // Purgatory should remain empty + let (state_count, pr_count) = purgatory.count(); + assert_eq!(state_count, 0); + assert_eq!(pr_count, 0); +} + +#[tokio::test] +async fn test_restore_unsupported_version() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("wrong_version.json"); + + // Write state with unsupported version + let state = r#"{ + "version": 999, + "saved_at": {"secs_since_epoch": 1000000000, "nanos_since_epoch": 0}, + "state_events": {}, + "pr_events": {}, + "expired_events": {} + }"#; + std::fs::write(&state_file, state).unwrap(); + + let purgatory = Purgatory::new(PathBuf::new()); + + // Attempting to restore unsupported version should error + let result = purgatory.restore_from_disk(&state_file); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Unsupported state version")); +} + +#[tokio::test] +async fn test_downtime_calculation() { + use tempfile::tempdir; + use tokio::time::sleep; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // Add state event + let event = EventBuilder::text_note("test") + .sign_with_keys(&keys) + .unwrap(); + + purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); + + // Get original expiry time + let original_entries = purgatory.find_state("repo"); + let original_entry = &original_entries[0]; + let original_expires_at = original_entry.expires_at; + let original_remaining = original_expires_at.saturating_duration_since(Instant::now()); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Simulate downtime (100ms) + sleep(Duration::from_millis(100)).await; + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Get restored expiry time + let restored_entries = purgatory2.find_state("repo"); + let restored_entry = &restored_entries[0]; + let restored_expires_at = restored_entry.expires_at; + let restored_remaining = restored_expires_at.saturating_duration_since(Instant::now()); + + // Remaining time should be approximately the same (accounting for downtime) + // Allow 2000ms tolerance for test execution time and sleep duration + let diff = if restored_remaining > original_remaining { + restored_remaining.as_millis() - original_remaining.as_millis() + } else { + original_remaining.as_millis() - restored_remaining.as_millis() + }; + + assert!( + diff < 2000, + "Downtime calculation should preserve remaining TTL. Original: {}ms, Restored: {}ms, Diff: {}ms", + original_remaining.as_millis(), + restored_remaining.as_millis(), + diff + ); +} + +#[tokio::test] +async fn test_expiry_times_preserved() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // Add state event + let event = EventBuilder::text_note("test") + .sign_with_keys(&keys) + .unwrap(); + + purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); + + // Manually set expiry to a specific time in the future + let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes + if let Some(mut entries) = purgatory.state_events.get_mut("repo") { + for entry in entries.iter_mut() { + entry.expires_at = custom_expiry; + } + } + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Get restored expiry time + let restored_entries = purgatory2.find_state("repo"); + let restored_entry = &restored_entries[0]; + let restored_remaining = restored_entry + .expires_at + .saturating_duration_since(Instant::now()); + + // Should be approximately 600 seconds (allow 3 second tolerance for test execution) + assert!( + restored_remaining.as_secs() >= 597 && restored_remaining.as_secs() <= 603, + "Expected ~600s remaining, got {}s", + restored_remaining.as_secs() + ); +} + +#[tokio::test] +async fn test_multiple_state_events_same_identifier() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys1 = Keys::generate(); + let keys2 = Keys::generate(); + let keys3 = Keys::generate(); + + // Add multiple state events for the same identifier from different authors + let event1 = EventBuilder::text_note("maintainer 1") + .sign_with_keys(&keys1) + .unwrap(); + let event2 = EventBuilder::text_note("maintainer 2") + .sign_with_keys(&keys2) + .unwrap(); + let event3 = EventBuilder::text_note("maintainer 3") + .sign_with_keys(&keys3) + .unwrap(); + + purgatory.add_state( + event1.clone(), + "shared-repo".to_string(), + keys1.public_key(), + ); + purgatory.add_state( + event2.clone(), + "shared-repo".to_string(), + keys2.public_key(), + ); + purgatory.add_state( + event3.clone(), + "shared-repo".to_string(), + keys3.public_key(), + ); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify all three events were restored + let restored_entries = purgatory2.find_state("shared-repo"); + assert_eq!(restored_entries.len(), 3); + + // Verify all authors are present + let authors: Vec = restored_entries.iter().map(|e| e.author).collect(); + assert!(authors.contains(&keys1.public_key())); + assert!(authors.contains(&keys2.public_key())); + assert!(authors.contains(&keys3.public_key())); +} + +#[tokio::test] +async fn test_mixed_pr_events_and_placeholders() { + use nostr_sdk::{Kind, Tag, TagKind}; + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // Add PR event with actual event + let tags = vec![Tag::custom( + TagKind::Custom("a".into()), + vec!["30617:abc123:test-repo".to_string()], + )]; + + let pr_event = EventBuilder::new(Kind::from(1618), "PR content") + .tags(tags) + .sign_with_keys(&keys) + .unwrap(); + + purgatory.add_pr( + pr_event.clone(), + "pr-with-event".to_string(), + "commit-abc".to_string(), + ); + + // Add PR placeholder + purgatory.add_pr_placeholder("pr-placeholder".to_string(), "commit-def".to_string()); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify both were restored correctly + let (_, pr_count) = purgatory2.count(); + assert_eq!(pr_count, 2); + + // Verify PR event + let pr_entry = purgatory2.find_pr("pr-with-event").unwrap(); + assert!(pr_entry.event.is_some()); + assert_eq!(pr_entry.commit, "commit-abc"); + + // Verify placeholder + let placeholder_entry = purgatory2.find_pr("pr-placeholder").unwrap(); + assert!(placeholder_entry.event.is_none()); + assert_eq!(placeholder_entry.commit, "commit-def"); + assert_eq!( + purgatory2.find_pr_placeholder("pr-placeholder"), + Some("commit-def".to_string()) + ); +} + +#[tokio::test] +async fn test_file_cleanup_after_successful_restore() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // Add some data + let event = EventBuilder::text_note("test") + .sign_with_keys(&keys) + .unwrap(); + purgatory.add_state(event, "repo".to_string(), keys.public_key()); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + assert!(state_file.exists()); + + // Restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // File should be deleted after successful restore + assert!(!state_file.exists()); +} + +#[tokio::test] +async fn test_comprehensive_roundtrip() { + use nostr_sdk::{Kind, Tag, TagKind}; + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys1 = Keys::generate(); + let keys2 = Keys::generate(); + + // Add multiple state events + let state1 = EventBuilder::text_note("state 1") + .sign_with_keys(&keys1) + .unwrap(); + let state2 = EventBuilder::text_note("state 2") + .sign_with_keys(&keys2) + .unwrap(); + + purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); + purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); + + // Add PR event + let tags = vec![Tag::custom( + TagKind::Custom("a".into()), + vec!["30617:abc123:repo1".to_string()], + )]; + let pr_event = EventBuilder::new(Kind::from(1618), "PR") + .tags(tags) + .sign_with_keys(&keys1) + .unwrap(); + purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); + + // Add PR placeholder + purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); + + // Add and expire an event + let expired_event = EventBuilder::text_note("expired") + .sign_with_keys(&keys1) + .unwrap(); + let expired_id = expired_event.id; + purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); + if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { + for entry in entries.iter_mut() { + entry.expires_at = Instant::now() - Duration::from_secs(1); + } + } + purgatory.cleanup(); + + // Verify initial state + let (state_count, pr_count) = purgatory.count(); + assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) + assert_eq!(pr_count, 2); // pr-1, pr-2 + assert_eq!(purgatory.expired_count(), 1); // expired_event + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Verify all data was restored correctly + let (state_count2, pr_count2) = purgatory2.count(); + assert_eq!(state_count2, 2); + assert_eq!(pr_count2, 2); + assert_eq!(purgatory2.expired_count(), 1); + + // Verify state events + assert_eq!(purgatory2.find_state("repo1").len(), 1); + assert_eq!(purgatory2.find_state("repo2").len(), 1); + + // Verify PR events + assert!(purgatory2.find_pr("pr-1").unwrap().event.is_some()); + assert!(purgatory2.find_pr("pr-2").unwrap().event.is_none()); + + // Verify expired event + assert!(purgatory2.is_expired(&expired_id)); +} diff --git a/src/purgatory/persistence.rs b/src/purgatory/persistence.rs new file mode 100644 index 0000000..7fca2cf --- /dev/null +++ b/src/purgatory/persistence.rs @@ -0,0 +1,198 @@ +//! Persistence utilities for purgatory state. +//! +//! This module provides conversion functions between `Instant` (which cannot be +//! serialized) and `Duration` offsets from a reference `SystemTime`. This allows +//! purgatory state to be persisted to disk and restored across restarts. +//! +//! ## Time Handling +//! +//! - `Instant` is monotonic but cannot be serialized +//! - `SystemTime` can be serialized but may go backwards (NTP, user changes) +//! - We use `SystemTime` for persistence and convert to/from `Instant` at runtime +//! - Downtime is accounted for when restoring state (elapsed time is preserved) + +use std::time::{Duration, Instant, SystemTime}; + +/// Convert an `Instant` to a `Duration` offset from a reference `SystemTime`. +/// +/// This allows storing an `Instant` as a serializable offset that can be +/// restored later, accounting for system downtime. +/// +/// # Arguments +/// * `instant` - The `Instant` to convert +/// * `reference_time` - The reference `SystemTime` (typically SystemTime::now()) +/// * `reference_instant` - The corresponding `Instant` (typically Instant::now()) +/// +/// # Returns +/// Duration offset from the reference time +/// +/// # Example +/// ``` +/// use std::time::{Duration, Instant, SystemTime}; +/// use ngit_grasp::purgatory::persistence::instant_to_offset; +/// +/// let now_system = SystemTime::now(); +/// let now_instant = Instant::now(); +/// let future = now_instant + Duration::from_secs(60); +/// +/// let offset = instant_to_offset(future, now_system, now_instant); +/// assert!(offset.as_secs() >= 60); +/// ``` +pub fn instant_to_offset( + instant: Instant, + _reference_time: SystemTime, + reference_instant: Instant, +) -> Duration { + if instant >= reference_instant { + // Future instant - return positive offset + instant.duration_since(reference_instant) + } else { + // Past instant - this shouldn't happen in normal operation, + // but we handle it by returning zero duration + Duration::ZERO + } +} + +/// Convert a `Duration` offset back to an `Instant`, accounting for downtime. +/// +/// This restores an `Instant` from a serialized offset, adjusting for the time +/// that has elapsed since the state was saved. +/// +/// # Arguments +/// * `offset` - The duration offset from the saved reference time +/// * `saved_at` - The `SystemTime` when the state was saved +/// * `reference_instant` - The current `Instant` (typically Instant::now()) +/// +/// # Returns +/// The restored `Instant`, adjusted for downtime +/// +/// # Example +/// ``` +/// use std::time::{Duration, Instant, SystemTime}; +/// use ngit_grasp::purgatory::persistence::offset_to_instant; +/// +/// let saved_at = SystemTime::now(); +/// let offset = Duration::from_secs(60); +/// let now_instant = Instant::now(); +/// +/// let restored = offset_to_instant(offset, saved_at, now_instant); +/// // restored will be approximately now_instant + 60 seconds +/// ``` +pub fn offset_to_instant( + offset: Duration, + saved_at: SystemTime, + reference_instant: Instant, +) -> Instant { + // Calculate how much time has elapsed since the state was saved + let now_system = SystemTime::now(); + let elapsed_since_save = now_system + .duration_since(saved_at) + .unwrap_or(Duration::ZERO); + + // The original deadline was: saved_at + offset + // Time remaining = (saved_at + offset) - now_system + // = offset - elapsed_since_save + + if offset > elapsed_since_save { + // Deadline is still in the future + let remaining = offset - elapsed_since_save; + reference_instant + remaining + } else { + // Deadline has already passed or is right now + reference_instant + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::Duration; + + #[test] + fn test_instant_to_offset_future() { + let now_system = SystemTime::now(); + let now_instant = Instant::now(); + let future = now_instant + Duration::from_secs(60); + + let offset = instant_to_offset(future, now_system, now_instant); + + // Should be approximately 60 seconds (within tolerance) + assert!(offset.as_secs() >= 59 && offset.as_secs() <= 61); + } + + #[test] + fn test_instant_to_offset_past() { + let now_system = SystemTime::now(); + let past_instant = Instant::now(); + // Simulate some time passing + thread::sleep(Duration::from_millis(10)); + let now_instant = Instant::now(); + + let offset = instant_to_offset(past_instant, now_system, now_instant); + + // Past instants return zero duration + assert_eq!(offset, Duration::ZERO); + } + + #[test] + fn test_offset_to_instant_with_time_remaining() { + let saved_at = SystemTime::now(); + let offset = Duration::from_secs(60); + + // Simulate a very short downtime (< 10ms) + thread::sleep(Duration::from_millis(5)); + + let now_instant = Instant::now(); + let restored = offset_to_instant(offset, saved_at, now_instant); + + // Should be approximately 60 seconds in the future + let remaining = restored.duration_since(now_instant); + assert!( + remaining.as_secs() >= 59 && remaining.as_secs() <= 61, + "Expected ~60s, got {}s", + remaining.as_secs() + ); + } + + #[test] + fn test_offset_to_instant_deadline_passed() { + // Simulate state saved 70 seconds ago with 60 second offset + let saved_at = SystemTime::now() - Duration::from_secs(70); + let offset = Duration::from_secs(60); + + let now_instant = Instant::now(); + let restored = offset_to_instant(offset, saved_at, now_instant); + + // Deadline has passed, should be now or in the past + let remaining = restored.saturating_duration_since(now_instant); + assert_eq!(remaining, Duration::ZERO); + } + + #[test] + fn test_round_trip_conversion() { + let now_system = SystemTime::now(); + let now_instant = Instant::now(); + let future = now_instant + Duration::from_secs(120); + + // Convert to offset + let offset = instant_to_offset(future, now_system, now_instant); + + // Immediately convert back (minimal downtime) + let restored = offset_to_instant(offset, now_system, now_instant); + + // Should be very close to the original future instant + let diff = if restored > future { + restored.duration_since(future) + } else { + future.duration_since(restored) + }; + + // Allow for small timing differences (< 100ms) + assert!( + diff < Duration::from_millis(100), + "Round trip should preserve instant within 100ms, got {}ms", + diff.as_millis() + ); + } +} diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 9c47616..919504b 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs @@ -5,8 +5,14 @@ //! problem where either the nostr event or git push can arrive first. use nostr_sdk::prelude::*; +use serde::{Deserialize, Serialize}; use std::time::Instant; +/// Default value for Instant fields during deserialization +fn instant_now() -> Instant { + Instant::now() +} + /// A reference name and its target object. /// /// Used to identify specific git refs (branches, tags) that a state event @@ -59,7 +65,10 @@ impl RefUpdate { /// State events declare the current state of a repository but may arrive /// before the corresponding git data has been pushed. This entry holds /// the event and associated metadata until the git data arrives. -#[derive(Debug, Clone)] +/// +/// Note: `Instant` fields cannot be serialized directly. Use the `persistence` +/// module to convert to/from serializable wrapper types. +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct StatePurgatoryEntry { /// The nostr state event (kind 30618) awaiting git data pub event: Event, @@ -71,9 +80,11 @@ pub struct StatePurgatoryEntry { pub author: PublicKey, /// When this entry was added to purgatory + #[serde(skip, default = "instant_now")] pub created_at: Instant, /// Expiry deadline (30 min from creation, may be extended) + #[serde(skip, default = "instant_now")] pub expires_at: Instant, } @@ -82,7 +93,10 @@ pub struct StatePurgatoryEntry { /// PR events reference specific commits but may arrive before the git push /// containing those commits. Alternatively, a git push may arrive first, /// creating a placeholder entry waiting for the corresponding PR event. -#[derive(Debug, Clone)] +/// +/// Note: `Instant` fields cannot be serialized directly. Use the `persistence` +/// module to convert to/from serializable wrapper types. +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PrPurgatoryEntry { /// The nostr PR event, if received (None = git data arrived first) pub event: Option, @@ -92,8 +106,10 @@ pub struct PrPurgatoryEntry { pub commit: String, /// When this entry was added to purgatory + #[serde(skip, default = "instant_now")] pub created_at: Instant, /// Expiry deadline (30 min from creation, may be extended) + #[serde(skip, default = "instant_now")] pub expires_at: Instant, } -- cgit v1.2.3