From 1d09e4bdea7e328cf2740818df9df660c5532a99 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 13:24:46 +0000 Subject: feat: implement announcement purgatory core (breaks archive sync test) Route new announcements to purgatory instead of accepting immediately. Announcements are promoted to the database when git data arrives, ensuring we only serve announcements for repos with actual content. Implemented: - AnnouncementPurgatoryEntry type and DashMap store - Route new announcements to purgatory (replacement announcements skip) - Promote announcements on git data arrival (process_purgatory_announcements) - Authorization checks purgatory announcements (fetch_repository_data_with_purgatory) - State policy uses purgatory announcements for maintainer validation - Cleanup task handles announcement expiry - Updated count()/cleanup() to 3-tuples Known broken: - test_archive_read_only_creates_bare_repo fails: sync module does not treat purgatory announcements as confirmed repos, so per-repo sync (state events, PRs) is never triggered for purgatory announcements - Announcement persistence (save/restore) not implemented - SyncLevel (StateOnly vs Full) not implemented - Soft expiry two-phase not implemented - Expiry extension on state event / git auth not wired up --- src/purgatory/mod.rs | 260 ++++++++++++++++++++++++++++++++++++------ src/purgatory/sync/context.rs | 7 +- src/purgatory/types.rs | 39 +++++++ 3 files changed, 273 insertions(+), 33 deletions(-) (limited to 'src/purgatory') diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 47798a6..3b5514b 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -17,7 +17,7 @@ pub mod sync; mod types; pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; -pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; +pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; use dashmap::DashMap; use nostr_sdk::prelude::*; @@ -100,7 +100,8 @@ struct PurgatoryState { /// Main purgatory structure holding events awaiting git data. /// -/// Provides thread-safe concurrent access to two separate stores: +/// Provides thread-safe concurrent access to three separate stores: +/// - Announcements indexed by (pubkey, identifier) /// - State events indexed by repository identifier /// - PR events indexed by event ID /// @@ -121,6 +122,10 @@ struct PurgatoryState { /// that we've already determined have no git data available. #[derive(Clone)] pub struct Purgatory { + /// Repository announcements (kind 30617) indexed by (owner pubkey, identifier). + /// Key: (PublicKey, String) where String is the repository identifier. + announcement_purgatory: Arc>, + /// State events (kind 30618) indexed by repository identifier. /// Multiple state events can wait for the same identifier (different maintainers). state_events: Arc>>, @@ -145,6 +150,7 @@ impl Purgatory { /// Create a new empty purgatory. pub fn new(git_data_path: impl Into) -> Self { Self { + announcement_purgatory: Arc::new(DashMap::new()), state_events: Arc::new(DashMap::new()), pr_events: Arc::new(DashMap::new()), sync_queue: Arc::new(DashMap::new()), @@ -513,9 +519,171 @@ impl Purgatory { self.pr_events.remove(event_id); } + // ========================================================================= + // Announcement Purgatory Methods + // ========================================================================= + + /// Add a repository announcement to purgatory. + /// + /// The announcement will be held until git data arrives, at which point + /// it will be promoted to the database and served to clients. + /// + /// # Arguments + /// * `event` - The announcement event (kind 30617) + /// * `identifier` - The repository identifier from the 'd' tag + /// * `owner` - The owner pubkey (event author) + /// * `repo_path` - Path to the bare git repository + /// * `relays` - Relay URLs from the announcement (for sync registration) + pub fn add_announcement( + &self, + event: Event, + identifier: String, + owner: PublicKey, + repo_path: PathBuf, + relays: HashSet, + ) { + let now = Instant::now(); + let entry = AnnouncementPurgatoryEntry { + event, + identifier: identifier.clone(), + owner, + repo_path, + relays, + created_at: now, + expires_at: now + DEFAULT_EXPIRY, + soft_expired: false, + }; + + let key = (owner, identifier); + self.announcement_purgatory.insert(key.clone(), entry); + + tracing::debug!( + owner = %key.0, + identifier = %key.1, + "Added announcement to purgatory" + ); + } + + /// Find an announcement in purgatory by owner and identifier. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// The announcement entry if found, None otherwise + pub fn find_announcement(&self, owner: &PublicKey, identifier: &str) -> Option { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.get(&key).map(|entry| entry.clone()) + } + + /// Get all announcements in purgatory for a given identifier. + /// + /// This is used for authorization - state events and git pushes need to + /// check purgatory announcements for maintainer validation. + /// + /// # Arguments + /// * `identifier` - The repository identifier + /// + /// # Returns + /// Vector of announcement entries for this identifier + pub fn get_announcements_by_identifier(&self, identifier: &str) -> Vec { + self.announcement_purgatory + .iter() + .filter(|entry| entry.key().1 == identifier) + .map(|entry| entry.value().clone()) + .collect() + } + + /// Remove an announcement from purgatory. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + pub fn remove_announcement(&self, owner: &PublicKey, identifier: &str) { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.remove(&key); + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Removed announcement from purgatory" + ); + } + + /// Promote an announcement from purgatory to active status. + /// + /// This is called when git data arrives. The announcement event is returned + /// so it can be saved to the database. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// The announcement event if found, None otherwise + pub fn promote_announcement(&self, owner: &PublicKey, identifier: &str) -> Option { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.remove(&key).map(|(_, entry)| { + tracing::info!( + owner = %owner, + identifier = %identifier, + "Promoted announcement from purgatory to database" + ); + entry.event + }) + } + + /// Check if there's an announcement in purgatory for the given owner and identifier. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// true if an announcement exists in purgatory, false otherwise + pub fn has_purgatory_announcement(&self, owner: &PublicKey, identifier: &str) -> bool { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.contains_key(&key) + } + + /// Extend the expiry for an announcement in purgatory. + /// + /// This is called when state events arrive for a purgatory announcement, + /// indicating the repository is actively receiving metadata. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// * `duration` - Minimum duration to guarantee from now + pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) { + let key = (*owner, identifier.to_string()); + if let Some(mut entry) = self.announcement_purgatory.get_mut(&key) { + let now = Instant::now(); + let new_expiry = now + duration; + if entry.expires_at < new_expiry { + entry.expires_at = new_expiry; + // If soft-expired, revive it + if entry.soft_expired { + entry.soft_expired = false; + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Revived soft-expired announcement" + ); + } + } + } + } + + /// Get count of announcements in purgatory. + pub fn announcement_count(&self) -> usize { + self.announcement_purgatory.len() + } + /// Get all event IDs currently stored in purgatory AND previously expired events. /// /// Returns a HashSet of all event IDs for: + /// - Announcements currently held in purgatory /// - State events currently held in purgatory /// - PR events currently held in purgatory /// - Events that previously expired from purgatory without finding git data @@ -530,6 +698,11 @@ impl Purgatory { pub fn event_ids(&self) -> HashSet { let mut ids = HashSet::new(); + // Collect announcement event IDs + for entry in self.announcement_purgatory.iter() { + ids.insert(entry.value().event.id); + } + // Collect state event IDs for entry in self.state_events.iter() { for state_entry in entry.value().iter() { @@ -609,9 +782,28 @@ impl Purgatory { /// will be filtered out during future negentropy/REQ sync operations. /// /// # Returns - /// Tuple of (num_state_removed, num_pr_removed) - pub fn cleanup(&self) -> (usize, usize) { + /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) + pub fn cleanup(&self) -> (usize, usize, usize) { let now = Instant::now(); + + // Remove expired announcements and mark them as expired + let expired_announcements: Vec<(PublicKey, String, EventId)> = self + .announcement_purgatory + .iter() + .filter(|entry| entry.value().expires_at <= now) + .map(|entry| { + let key = entry.key(); + let event_id = entry.value().event.id; + (key.0.clone(), key.1.clone(), event_id) + }) + .collect(); + + let announcement_removed = expired_announcements.len(); + for (owner, identifier, event_id) in expired_announcements { + self.mark_expired(event_id); + self.announcement_purgatory.remove(&(owner, identifier)); + } + let mut state_removed = 0; // Remove expired state events and mark them as expired @@ -655,17 +847,17 @@ impl Purgatory { self.pr_events.remove(&event_id_str); } - (state_removed, pr_removed) + (announcement_removed, state_removed, pr_removed) } /// Remove expired entries from purgatory (legacy method). /// /// # Returns - /// Total number of entries removed (state + PR events) + /// Total number of entries removed (announcement + state + PR events) #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")] pub fn remove_expired(&self) -> usize { - let (state, pr) = self.cleanup(); - state + pr + let (announcement, state, pr) = self.cleanup(); + announcement + state + pr } /// Remove old expired event records. @@ -699,11 +891,12 @@ impl Purgatory { /// Get current count of entries in purgatory. /// /// # Returns - /// Tuple of (state_event_count, pr_event_count) - pub fn count(&self) -> (usize, usize) { + /// Tuple of (announcement_count, state_event_count, pr_event_count) + pub fn count(&self) -> (usize, usize, usize) { + let announcement_count = self.announcement_purgatory.len(); let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum(); let pr_count = self.pr_events.len(); - (state_count, pr_count) + (announcement_count, state_count, pr_count) } /// Get count of expired events being tracked. @@ -717,6 +910,7 @@ impl Purgatory { /// Clear all entries from purgatory (for testing). #[cfg(test)] pub fn clear(&self) { + self.announcement_purgatory.clear(); self.state_events.clear(); self.pr_events.clear(); self.sync_queue.clear(); @@ -990,7 +1184,8 @@ mod tests { #[test] fn test_purgatory_creation() { let purgatory = Purgatory::new(PathBuf::new()); - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1008,7 +1203,8 @@ mod tests { purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0); assert_eq!(state_count, 1); assert_eq!(pr_count, 1); } @@ -1213,7 +1409,7 @@ fn test_cleanup_removes_expired_entries() { purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); // Verify entries are there - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); assert_eq!(pr_count, 2); @@ -1231,14 +1427,14 @@ fn test_cleanup_removes_expired_entries() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // Verify counts assert_eq!(state_removed, 1); assert_eq!(pr_removed, 2); // Verify entries are gone - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1260,14 +1456,14 @@ fn test_cleanup_preserves_non_expired_entries() { purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // Nothing should be removed assert_eq!(state_removed, 0); assert_eq!(pr_removed, 0); // Verify entries are still there - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); assert_eq!(pr_count, 1); } @@ -1314,14 +1510,14 @@ fn test_cleanup_mixed_expired_and_fresh() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // One of each should be removed assert_eq!(state_removed, 1); assert_eq!(pr_removed, 1); // Verify remaining counts - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); // One state event remains assert_eq!(pr_count, 1); // One PR event remains } @@ -1391,7 +1587,7 @@ fn test_expired_event_tracking() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); assert_eq!(state_removed, 1); assert_eq!(pr_removed, 1); @@ -1501,7 +1697,7 @@ fn test_expired_events_prevent_readdition() { } // Event should NOT be re-added - let (state_count, _) = purgatory.count(); + let (_, state_count, _) = purgatory.count(); assert_eq!(state_count, 0, "Event should not be re-added to purgatory"); } @@ -1520,7 +1716,7 @@ fn test_pr_placeholder_not_marked_expired() { } // Run cleanup - let (_, pr_removed) = purgatory.cleanup(); + let (_, _, pr_removed) = purgatory.cleanup(); assert_eq!(pr_removed, 1); // Expired count should be 0 (placeholders don't have event IDs to track) @@ -1606,7 +1802,7 @@ async fn test_save_and_restore_state_events() { assert!(!state_file.exists()); // Verify state events were restored - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 2); let restored_entries = purgatory2.find_state("test-repo"); @@ -1662,7 +1858,7 @@ async fn test_save_and_restore_pr_events() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify PR event was restored - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 1); let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); @@ -1691,7 +1887,7 @@ async fn test_save_and_restore_pr_placeholders() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify placeholder was restored - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 1); let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); @@ -1769,7 +1965,7 @@ async fn test_save_and_restore_empty_purgatory() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify purgatory is still empty - let (state_count, pr_count) = purgatory2.count(); + let (_, state_count, pr_count) = purgatory2.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); assert_eq!(purgatory2.expired_count(), 0); @@ -1789,7 +1985,7 @@ async fn test_restore_missing_file() { assert!(result.is_err()); // Purgatory should remain empty - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1811,7 +2007,7 @@ async fn test_restore_corrupted_json() { assert!(result.is_err()); // Purgatory should remain empty - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -2044,7 +2240,7 @@ async fn test_mixed_pr_events_and_placeholders() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify both were restored correctly - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 2); // Verify PR event @@ -2141,7 +2337,7 @@ async fn test_comprehensive_roundtrip() { purgatory.cleanup(); // Verify initial state - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) assert_eq!(pr_count, 2); // pr-1, pr-2 assert_eq!(purgatory.expired_count(), 1); // expired_event @@ -2154,7 +2350,7 @@ async fn test_comprehensive_roundtrip() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify all data was restored correctly - let (state_count2, pr_count2) = purgatory2.count(); + let (_, state_count2, pr_count2) = purgatory2.count(); assert_eq!(state_count2, 2); assert_eq!(pr_count2, 2); assert_eq!(purgatory2.expired_count(), 1); diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 33c2d12..778cdb8 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -279,7 +279,12 @@ impl SyncContext for RealSyncContext { } async fn fetch_repository_data(&self, identifier: &str) -> Result { - crate::git::authorization::fetch_repository_data(&self.database, identifier).await + crate::git::authorization::fetch_repository_data_with_purgatory( + &self.database, + &self.purgatory, + identifier, + ) + .await } fn collect_needed_oids(&self, identifier: &str) -> HashSet { diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 919504b..d891bc9 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs @@ -6,6 +6,8 @@ use nostr_sdk::prelude::*; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::path::PathBuf; use std::time::Instant; /// Default value for Instant fields during deserialization @@ -113,3 +115,40 @@ pub struct PrPurgatoryEntry { #[serde(skip, default = "instant_now")] pub expires_at: Instant, } + +/// Entry for a repository announcement (kind 30617) waiting in purgatory. +/// +/// Announcements are held in purgatory until git data arrives, proving +/// the repository has actual content. This prevents serving announcements +/// for empty repositories. +/// +/// Note: `Instant` fields cannot be serialized directly. Use the `persistence` +/// module to convert to/from serializable wrapper types. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AnnouncementPurgatoryEntry { + /// The nostr announcement event (kind 30617) + pub event: Event, + + /// The repository identifier from the event's 'd' tag + pub identifier: String, + + /// The owner pubkey (event author) + pub owner: PublicKey, + + /// Path to the bare git repository + pub repo_path: PathBuf, + + /// Relay URLs from the announcement (for sync registration) + pub relays: HashSet, + + /// When this entry was added to purgatory + #[serde(skip, default = "instant_now")] + pub created_at: Instant, + + /// Expiry deadline (30 min from creation, may be extended) + #[serde(skip, default = "instant_now")] + pub expires_at: Instant, + + /// Whether the bare repo has been deleted (soft expiry) + pub soft_expired: bool, +} -- cgit v1.2.3