From 8fc4078d60f0ccf16318fe7fa765fcdd3627fe1f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 12 Feb 2026 14:02:22 +0000 Subject: chore: fix clippy warnings - Derive Default for config structs instead of manual impl - Fix doc comment formatting in ArchiveConfig::matches - Collapse nested if statement in validate_announcement - Allow too_many_arguments for SyncManager::new --- src/config.rs | 44 +++++--------------------------------------- src/nostr/events.rs | 14 +++++++------- src/sync/mod.rs | 1 + 3 files changed, 13 insertions(+), 46 deletions(-) (limited to 'src') diff --git a/src/config.rs b/src/config.rs index 271a340..7062187 100644 --- a/src/config.rs +++ b/src/config.rs @@ -109,7 +109,7 @@ impl WhitelistEntry { } /// GRASP-05 Archive mode configuration -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ArchiveConfig { /// Accept all repository announcements (no filtering) /// @@ -146,6 +146,7 @@ impl ArchiveConfig { /// Returns true if: /// - archive_all is true, OR /// - announcement matches any whitelist entry + /// /// Note: grasp_services matching is handled via matches_grasp_services() pub fn matches(&self, npub: &str, identifier: &str) -> bool { if self.archive_all { @@ -171,19 +172,8 @@ impl ArchiveConfig { } } -impl Default for ArchiveConfig { - fn default() -> Self { - Self { - archive_all: false, - whitelist: Vec::new(), - grasp_services: Vec::new(), - read_only: false, - } - } -} - /// Repository whitelist configuration -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct RepositoryConfig { /// Whitelist entries for selective repository acceptance /// @@ -207,16 +197,8 @@ impl RepositoryConfig { } } -impl Default for RepositoryConfig { - fn default() -> Self { - Self { - whitelist: Vec::new(), - } - } -} - /// Repository blacklist configuration -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct BlacklistConfig { /// Blacklist entries for blocking specific repositories /// @@ -256,16 +238,8 @@ impl BlacklistConfig { } } -impl Default for BlacklistConfig { - fn default() -> Self { - Self { - blacklist: Vec::new(), - } - } -} - /// Event blacklist configuration for blocking events by author npub -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct EventBlacklistConfig { /// Blacklisted npubs - events from these authors are rejected /// @@ -292,14 +266,6 @@ impl EventBlacklistConfig { } } -impl Default for EventBlacklistConfig { - fn default() -> Self { - Self { - blacklisted_npubs: Vec::new(), - } - } -} - /// Database backend type for the relay #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, ValueEnum)] #[serde(rename_all = "lowercase")] diff --git a/src/nostr/events.rs b/src/nostr/events.rs index 718633e..b9784f7 100644 --- a/src/nostr/events.rs +++ b/src/nostr/events.rs @@ -419,14 +419,14 @@ pub fn validate_announcement( // GRASP-01: Normal mode - accept if announcement lists our service AND matches repository whitelist (if enabled) if lists_service && !archive_config.read_only { // Check repository whitelist if enabled - if repository_config.enabled() { - if !repository_config.matches(&npub, &announcement.identifier) { - return AnnouncementResult::Reject(format!( - "Announcement lists service but does not match repository whitelist. \ + if repository_config.enabled() + && !repository_config.matches(&npub, &announcement.identifier) + { + return AnnouncementResult::Reject(format!( + "Announcement lists service but does not match repository whitelist. \ Repository {}/{} not in whitelist", - npub, announcement.identifier - )); - } + npub, announcement.identifier + )); } return AnnouncementResult::Accept; } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index bc8c428..1ee1872 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -584,6 +584,7 @@ impl SyncManager { /// * `config` - Configuration for sync settings /// * `data_path` - Path to git data directory (for persistence) /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) + #[allow(clippy::too_many_arguments)] pub fn new( bootstrap_relay_url: Option, service_domain: String, -- cgit v1.2.3 From 1d09e4bdea7e328cf2740818df9df660c5532a99 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 13:24:46 +0000 Subject: feat: implement announcement purgatory core (breaks archive sync test) Route new announcements to purgatory instead of accepting immediately. Announcements are promoted to the database when git data arrives, ensuring we only serve announcements for repos with actual content. Implemented: - AnnouncementPurgatoryEntry type and DashMap store - Route new announcements to purgatory (replacement announcements skip) - Promote announcements on git data arrival (process_purgatory_announcements) - Authorization checks purgatory announcements (fetch_repository_data_with_purgatory) - State policy uses purgatory announcements for maintainer validation - Cleanup task handles announcement expiry - Updated count()/cleanup() to 3-tuples Known broken: - test_archive_read_only_creates_bare_repo fails: sync module does not treat purgatory announcements as confirmed repos, so per-repo sync (state events, PRs) is never triggered for purgatory announcements - Announcement persistence (save/restore) not implemented - SyncLevel (StateOnly vs Full) not implemented - Soft expiry two-phase not implemented - Expiry extension on state event / git auth not wired up --- src/git/authorization.rs | 38 +++++- src/git/sync.rs | 110 ++++++++++++++++- src/main.rs | 8 +- src/nostr/builder.rs | 23 ++++ src/nostr/policy/announcement.rs | 117 +++++++++++++++++- src/nostr/policy/state.rs | 10 +- src/purgatory/mod.rs | 260 ++++++++++++++++++++++++++++++++++----- src/purgatory/sync/context.rs | 7 +- src/purgatory/types.rs | 39 ++++++ src/sync/mod.rs | 68 +++++++++- tests/archive_read_only.rs | 59 ++++++--- tests/purgatory.rs | 4 +- tests/purgatory_persistence.rs | 26 ++-- 13 files changed, 691 insertions(+), 78 deletions(-) (limited to 'src') diff --git a/src/git/authorization.rs b/src/git/authorization.rs index e174b51..9d53c4f 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs @@ -287,6 +287,39 @@ pub async fn fetch_repository_data( }) } +/// Fetch repository data including announcements from purgatory +/// +/// This combines database announcements with purgatory announcements, +/// which is needed for authorization when the announcement hasn't been +/// promoted yet (no git data has arrived). +pub async fn fetch_repository_data_with_purgatory( + database: &SharedDatabase, + purgatory: &crate::purgatory::Purgatory, + identifier: &str, +) -> Result { + // First, fetch from database + let mut repo_data = fetch_repository_data(database, identifier).await?; + + // Then, add announcements from purgatory + let purgatory_announcements = purgatory.get_announcements_by_identifier(identifier); + let purgatory_count = purgatory_announcements.len(); + + for entry in purgatory_announcements { + if let Ok(announcement) = RepositoryAnnouncement::from_event(entry.event) { + repo_data.announcements.push(announcement); + } + } + + debug!( + "Fetched repository data with purgatory: {} announcements ({} from purgatory), {} states", + repo_data.announcements.len(), + purgatory_count, + repo_data.states.len() + ); + + Ok(repo_data) +} + pub fn pubkey_authorised_for_repo_owners( pubkey: &PublicKey, db_repo_data: &RepositoryData, @@ -539,8 +572,9 @@ pub async fn get_state_authorization_for_specific_owner_repo( use crate::git::list_refs; use crate::purgatory::RefUpdate; - // Fetch announcements only - we don't need database states - let repo_data = fetch_repository_data(database, identifier).await?; + // Fetch announcements from database AND purgatory - needed for authorization + // when the announcement hasn't been promoted yet (no git data has arrived) + let repo_data = fetch_repository_data_with_purgatory(database, purgatory, identifier).await?; if repo_data.announcements.is_empty() { return Ok(AuthorizationResult::denied( diff --git a/src/git/sync.rs b/src/git/sync.rs index e8e9655..13f30b6 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -51,6 +51,8 @@ use crate::purgatory::{can_apply_state, Purgatory}; /// or from purgatory sync fetching OIDs from remote servers). #[derive(Debug, Default, Clone)] pub struct ProcessResult { + /// Number of announcements released from purgatory + pub announcements_released: usize, /// Number of state events released from purgatory pub states_released: usize, /// Number of PR events released from purgatory @@ -70,11 +72,12 @@ pub struct ProcessResult { impl ProcessResult { /// Check if any events were released pub fn released_any(&self) -> bool { - self.states_released > 0 || self.prs_released > 0 + self.announcements_released > 0 || self.states_released > 0 || self.prs_released > 0 } /// Merge another ProcessResult into this one pub fn merge(&mut self, other: ProcessResult) { + self.announcements_released += other.announcements_released; self.states_released += other.states_released; self.prs_released += other.prs_released; self.repos_synced += other.repos_synced; @@ -836,6 +839,18 @@ pub async fn process_newly_available_git_data( "Processing newly available git data" ); + // Process announcements from purgatory + let announcement_result = process_purgatory_announcements( + &identifier, + source_repo_path, + database, + local_relay, + purgatory, + git_data_path, + ) + .await; + result.merge(announcement_result); + // Process state events from purgatory let state_result = process_purgatory_state_events( &identifier, @@ -863,6 +878,7 @@ pub async fn process_newly_available_git_data( if result.released_any() { info!( identifier = %identifier, + announcements_released = result.announcements_released, states_released = result.states_released, prs_released = result.prs_released, repos_synced = result.repos_synced, @@ -1250,6 +1266,90 @@ async fn process_purgatory_pr_events( result } +/// Process announcements from purgatory that can now be promoted. +/// +/// When git data arrives for a repository, any announcements in purgatory +/// for that repository should be promoted to the database and served to clients. +async fn process_purgatory_announcements( + identifier: &str, + source_repo_path: &Path, + database: &SharedDatabase, + local_relay: Option<&nostr_relay_builder::LocalRelay>, + purgatory: &Purgatory, + git_data_path: &Path, +) -> ProcessResult { + let mut result = ProcessResult::default(); + + // Extract owner pubkey from the source repo path + let owner_pubkey = match extract_owner_from_repo_path(source_repo_path, git_data_path) { + Some(npub) => npub, + None => { + debug!( + identifier = %identifier, + "Could not extract owner from repo path" + ); + return result; + } + }; + + // Parse the npub back to PublicKey + let owner = match nostr_sdk::PublicKey::parse(&owner_pubkey) { + Ok(pk) => pk, + Err(e) => { + warn!( + identifier = %identifier, + owner_pubkey = %owner_pubkey, + error = %e, + "Failed to parse owner pubkey" + ); + result.errors.push(format!("Failed to parse owner pubkey: {}", e)); + return result; + } + }; + + // Check if there's an announcement in purgatory for this owner and identifier + let announcement_event = purgatory.promote_announcement(&owner, identifier); + + if let Some(event) = announcement_event { + // Save to database + match database.save_event(&event).await { + Ok(_) => { + info!( + identifier = %identifier, + event_id = %event.id, + "Promoted announcement from purgatory to database" + ); + + // Notify WebSocket subscribers + if let Some(relay) = local_relay { + if relay.notify_event(event.clone()) { + debug!( + identifier = %identifier, + event_id = %event.id, + "Broadcast announcement event to WebSocket listeners" + ); + } + } + + result.announcements_released += 1; + } + Err(e) => { + warn!( + identifier = %identifier, + event_id = %event.id, + error = %e, + "Failed to save announcement to database" + ); + result + .errors + .push(format!("Failed to save announcement: {}", e)); + } + } + } + + result +} + /// Extract owner pubkey from a repository path. /// /// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. @@ -1271,6 +1371,7 @@ mod tests { #[test] fn test_process_result_default() { let result = ProcessResult::default(); + assert_eq!(result.announcements_released, 0); assert_eq!(result.states_released, 0); assert_eq!(result.prs_released, 0); assert_eq!(result.repos_synced, 0); @@ -1282,6 +1383,10 @@ mod tests { let mut result = ProcessResult::default(); assert!(!result.released_any()); + result.announcements_released = 1; + assert!(result.released_any()); + + result.announcements_released = 0; result.states_released = 1; assert!(result.released_any()); @@ -1293,6 +1398,7 @@ mod tests { #[test] fn test_process_result_merge() { let mut result1 = ProcessResult { + announcements_released: 0, states_released: 1, prs_released: 2, repos_synced: 3, @@ -1303,6 +1409,7 @@ mod tests { }; let result2 = ProcessResult { + announcements_released: 5, states_released: 10, prs_released: 20, repos_synced: 30, @@ -1314,6 +1421,7 @@ mod tests { result1.merge(result2); + assert_eq!(result1.announcements_released, 5); assert_eq!(result1.states_released, 11); assert_eq!(result1.prs_released, 22); assert_eq!(result1.repos_synced, 33); diff --git a/src/main.rs b/src/main.rs index 5e5b83a..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -142,11 +142,11 @@ async fn main() -> Result<()> { let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { interval.tick().await; - let (state_removed, pr_removed) = cleanup_purgatory.cleanup(); - if state_removed > 0 || pr_removed > 0 { + let (announcement_removed, state_removed, pr_removed) = cleanup_purgatory.cleanup(); + if announcement_removed > 0 || state_removed > 0 || pr_removed > 0 { info!( - "Purgatory cleanup: removed {} state events, {} PR events", - state_removed, pr_removed + "Purgatory cleanup: removed {} announcements, {} state events, {} PR events", + announcement_removed, state_removed, pr_removed ); } } diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 34014db..aff12a6 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -138,6 +138,29 @@ impl Nip34WritePolicy { } } } + AnnouncementResult::AcceptPurgatory => { + // New announcement - add to purgatory + match self.announcement_policy.add_to_purgatory(event) { + Ok(()) => { + tracing::info!( + "Accepted announcement to purgatory: {} (waiting for git data)", + event_id_str + ); + WritePolicyResult::Reject { + status: true, // Client sees OK + message: "purgatory: won't be served until git data arrives".into(), + } + } + Err(e) => { + tracing::warn!( + "Failed to add announcement to purgatory {}: {}", + event_id_str, + e + ); + WritePolicyResult::reject(e) + } + } + } AnnouncementResult::AcceptMaintainer => { // Parse announcement to get details for logging match RepositoryAnnouncement::from_event(event.clone()) { diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index 15a6e58..1118497 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -3,6 +3,7 @@ /// Handles validation of NIP-34 repository announcements (kind 30617) /// according to GRASP-01 specification. use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag}; +use std::collections::HashSet; use super::PolicyContext; use crate::config::Config; @@ -11,12 +12,14 @@ use crate::nostr::events::{validate_announcement, RepositoryAnnouncement}; /// Result of announcement policy evaluation #[derive(Debug, Clone, PartialEq)] pub enum AnnouncementResult { - /// Accept: Event lists our service (GRASP-01 compliant) + /// Accept: Event lists our service (GRASP-01 compliant) - replacement announcement Accept, /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer) AcceptMaintainer, /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only) AcceptArchive, + /// Accept to purgatory: New announcement, waiting for git data + AcceptPurgatory, /// Reject: Event fails validation with reason Reject(String), } @@ -35,10 +38,12 @@ impl AnnouncementPolicy { /// Validate a repository announcement event /// - /// Returns `Accept` if the announcement lists the service properly, - /// `AcceptMaintainer` if accepted via maintainer exception, - /// `AcceptArchive` if accepted via GRASP-05 archive config, - /// or `Reject` with reason. + /// Returns: + /// - `Accept` if this is a replacement announcement (active announcement exists) + /// - `AcceptPurgatory` if this is a new announcement (no active announcement exists) + /// - `AcceptMaintainer` if accepted via maintainer exception + /// - `AcceptArchive` if accepted via GRASP-05 archive config + /// - `Reject` with reason if validation fails pub async fn validate(&self, event: &Event) -> AnnouncementResult { // First, try validation (GRASP-01 + GRASP-05) let validation_result = validate_announcement(event, &self.config); @@ -67,11 +72,111 @@ impl AnnouncementPolicy { Err(_) => AnnouncementResult::Reject(reason), } } - // Accept, AcceptArchive, or AcceptMaintainer - return as-is + AnnouncementResult::Accept | AnnouncementResult::AcceptArchive => { + // Parse announcement to check for existing active announcement + match RepositoryAnnouncement::from_event(event.clone()) { + Ok(announcement) => { + // Check if there's already an active announcement for this (pubkey, identifier) + match self + .has_active_announcement(&event.pubkey, &announcement.identifier) + .await + { + Ok(true) => { + // Replacement announcement - accept immediately + tracing::debug!( + identifier = %announcement.identifier, + "Replacement announcement - accepting immediately" + ); + validation_result + } + Ok(false) => { + // New announcement - route to purgatory + tracing::debug!( + identifier = %announcement.identifier, + "New announcement - routing to purgatory" + ); + AnnouncementResult::AcceptPurgatory + } + Err(e) => { + tracing::warn!( + error = %e, + "Failed to check for existing announcement - rejecting" + ); + AnnouncementResult::Reject(format!( + "Database error checking existing announcement: {}", + e + )) + } + } + } + Err(e) => AnnouncementResult::Reject(format!( + "Failed to parse announcement: {}", + e + )), + } + } + // AcceptPurgatory shouldn't come from validate_announcement, but handle it result => result, } } + /// Check if there's an active announcement in the database for this (pubkey, identifier) + async fn has_active_announcement( + &self, + pubkey: &PublicKey, + identifier: &str, + ) -> Result { + let filter = Filter::new() + .kind(Kind::GitRepoAnnouncement) + .author(*pubkey) + .custom_tag( + SingleLetterTag::lowercase(Alphabet::D), + identifier.to_string(), + ); + + let events: Vec = match self.ctx.database.query(filter).await { + Ok(events) => events.into_iter().collect(), + Err(e) => return Err(format!("Database query failed: {}", e)), + }; + + Ok(!events.is_empty()) + } + + /// Add an announcement to purgatory + /// + /// Creates the bare repository and stores the announcement in purgatory + /// until git data arrives. + pub fn add_to_purgatory(&self, event: &Event) -> Result<(), String> { + let announcement = RepositoryAnnouncement::from_event(event.clone()) + .map_err(|e| format!("Failed to parse announcement: {}", e))?; + + // Create bare repository + self.ensure_bare_repository(&announcement)?; + + // Build repo path + let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); + + // Extract relays from announcement + let relays: HashSet = announcement.relays.iter().cloned().collect(); + + // Add to purgatory + self.ctx.purgatory.add_announcement( + event.clone(), + announcement.identifier.clone(), + event.pubkey, + repo_path, + relays, + ); + + tracing::info!( + identifier = %announcement.identifier, + event_id = %event.id, + "Added announcement to purgatory" + ); + + Ok(()) + } + /// Create a bare git repository if it doesn't exist /// Path format: //.git pub fn ensure_bare_repository( diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index f94f004..4bfb513 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -10,7 +10,7 @@ use nostr_relay_builder::prelude::Event; use super::PolicyContext; use crate::git; -use crate::git::authorization::fetch_repository_data; +use crate::git::authorization::fetch_repository_data_with_purgatory; use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; /// Result of state policy evaluation @@ -76,7 +76,13 @@ impl StatePolicy { } // Get all repositories and state events from db with identifier - let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; + // Include purgatory announcements for authorization + let db_repo_data = fetch_repository_data_with_purgatory( + &self.ctx.database, + &self.ctx.purgatory, + &state.identifier, + ) + .await?; // CRITICAL: Check if author is authorized via maintainer set // State events MUST be rejected if author is not in maintainer set of any accepted announcement diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 47798a6..3b5514b 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -17,7 +17,7 @@ pub mod sync; mod types; pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; -pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; +pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; use dashmap::DashMap; use nostr_sdk::prelude::*; @@ -100,7 +100,8 @@ struct PurgatoryState { /// Main purgatory structure holding events awaiting git data. /// -/// Provides thread-safe concurrent access to two separate stores: +/// Provides thread-safe concurrent access to three separate stores: +/// - Announcements indexed by (pubkey, identifier) /// - State events indexed by repository identifier /// - PR events indexed by event ID /// @@ -121,6 +122,10 @@ struct PurgatoryState { /// that we've already determined have no git data available. #[derive(Clone)] pub struct Purgatory { + /// Repository announcements (kind 30617) indexed by (owner pubkey, identifier). + /// Key: (PublicKey, String) where String is the repository identifier. + announcement_purgatory: Arc>, + /// State events (kind 30618) indexed by repository identifier. /// Multiple state events can wait for the same identifier (different maintainers). state_events: Arc>>, @@ -145,6 +150,7 @@ impl Purgatory { /// Create a new empty purgatory. pub fn new(git_data_path: impl Into) -> Self { Self { + announcement_purgatory: Arc::new(DashMap::new()), state_events: Arc::new(DashMap::new()), pr_events: Arc::new(DashMap::new()), sync_queue: Arc::new(DashMap::new()), @@ -513,9 +519,171 @@ impl Purgatory { self.pr_events.remove(event_id); } + // ========================================================================= + // Announcement Purgatory Methods + // ========================================================================= + + /// Add a repository announcement to purgatory. + /// + /// The announcement will be held until git data arrives, at which point + /// it will be promoted to the database and served to clients. + /// + /// # Arguments + /// * `event` - The announcement event (kind 30617) + /// * `identifier` - The repository identifier from the 'd' tag + /// * `owner` - The owner pubkey (event author) + /// * `repo_path` - Path to the bare git repository + /// * `relays` - Relay URLs from the announcement (for sync registration) + pub fn add_announcement( + &self, + event: Event, + identifier: String, + owner: PublicKey, + repo_path: PathBuf, + relays: HashSet, + ) { + let now = Instant::now(); + let entry = AnnouncementPurgatoryEntry { + event, + identifier: identifier.clone(), + owner, + repo_path, + relays, + created_at: now, + expires_at: now + DEFAULT_EXPIRY, + soft_expired: false, + }; + + let key = (owner, identifier); + self.announcement_purgatory.insert(key.clone(), entry); + + tracing::debug!( + owner = %key.0, + identifier = %key.1, + "Added announcement to purgatory" + ); + } + + /// Find an announcement in purgatory by owner and identifier. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// The announcement entry if found, None otherwise + pub fn find_announcement(&self, owner: &PublicKey, identifier: &str) -> Option { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.get(&key).map(|entry| entry.clone()) + } + + /// Get all announcements in purgatory for a given identifier. + /// + /// This is used for authorization - state events and git pushes need to + /// check purgatory announcements for maintainer validation. + /// + /// # Arguments + /// * `identifier` - The repository identifier + /// + /// # Returns + /// Vector of announcement entries for this identifier + pub fn get_announcements_by_identifier(&self, identifier: &str) -> Vec { + self.announcement_purgatory + .iter() + .filter(|entry| entry.key().1 == identifier) + .map(|entry| entry.value().clone()) + .collect() + } + + /// Remove an announcement from purgatory. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + pub fn remove_announcement(&self, owner: &PublicKey, identifier: &str) { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.remove(&key); + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Removed announcement from purgatory" + ); + } + + /// Promote an announcement from purgatory to active status. + /// + /// This is called when git data arrives. The announcement event is returned + /// so it can be saved to the database. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// The announcement event if found, None otherwise + pub fn promote_announcement(&self, owner: &PublicKey, identifier: &str) -> Option { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.remove(&key).map(|(_, entry)| { + tracing::info!( + owner = %owner, + identifier = %identifier, + "Promoted announcement from purgatory to database" + ); + entry.event + }) + } + + /// Check if there's an announcement in purgatory for the given owner and identifier. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// true if an announcement exists in purgatory, false otherwise + pub fn has_purgatory_announcement(&self, owner: &PublicKey, identifier: &str) -> bool { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.contains_key(&key) + } + + /// Extend the expiry for an announcement in purgatory. + /// + /// This is called when state events arrive for a purgatory announcement, + /// indicating the repository is actively receiving metadata. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// * `duration` - Minimum duration to guarantee from now + pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) { + let key = (*owner, identifier.to_string()); + if let Some(mut entry) = self.announcement_purgatory.get_mut(&key) { + let now = Instant::now(); + let new_expiry = now + duration; + if entry.expires_at < new_expiry { + entry.expires_at = new_expiry; + // If soft-expired, revive it + if entry.soft_expired { + entry.soft_expired = false; + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Revived soft-expired announcement" + ); + } + } + } + } + + /// Get count of announcements in purgatory. + pub fn announcement_count(&self) -> usize { + self.announcement_purgatory.len() + } + /// Get all event IDs currently stored in purgatory AND previously expired events. /// /// Returns a HashSet of all event IDs for: + /// - Announcements currently held in purgatory /// - State events currently held in purgatory /// - PR events currently held in purgatory /// - Events that previously expired from purgatory without finding git data @@ -530,6 +698,11 @@ impl Purgatory { pub fn event_ids(&self) -> HashSet { let mut ids = HashSet::new(); + // Collect announcement event IDs + for entry in self.announcement_purgatory.iter() { + ids.insert(entry.value().event.id); + } + // Collect state event IDs for entry in self.state_events.iter() { for state_entry in entry.value().iter() { @@ -609,9 +782,28 @@ impl Purgatory { /// will be filtered out during future negentropy/REQ sync operations. /// /// # Returns - /// Tuple of (num_state_removed, num_pr_removed) - pub fn cleanup(&self) -> (usize, usize) { + /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) + pub fn cleanup(&self) -> (usize, usize, usize) { let now = Instant::now(); + + // Remove expired announcements and mark them as expired + let expired_announcements: Vec<(PublicKey, String, EventId)> = self + .announcement_purgatory + .iter() + .filter(|entry| entry.value().expires_at <= now) + .map(|entry| { + let key = entry.key(); + let event_id = entry.value().event.id; + (key.0.clone(), key.1.clone(), event_id) + }) + .collect(); + + let announcement_removed = expired_announcements.len(); + for (owner, identifier, event_id) in expired_announcements { + self.mark_expired(event_id); + self.announcement_purgatory.remove(&(owner, identifier)); + } + let mut state_removed = 0; // Remove expired state events and mark them as expired @@ -655,17 +847,17 @@ impl Purgatory { self.pr_events.remove(&event_id_str); } - (state_removed, pr_removed) + (announcement_removed, state_removed, pr_removed) } /// Remove expired entries from purgatory (legacy method). /// /// # Returns - /// Total number of entries removed (state + PR events) + /// Total number of entries removed (announcement + state + PR events) #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")] pub fn remove_expired(&self) -> usize { - let (state, pr) = self.cleanup(); - state + pr + let (announcement, state, pr) = self.cleanup(); + announcement + state + pr } /// Remove old expired event records. @@ -699,11 +891,12 @@ impl Purgatory { /// Get current count of entries in purgatory. /// /// # Returns - /// Tuple of (state_event_count, pr_event_count) - pub fn count(&self) -> (usize, usize) { + /// Tuple of (announcement_count, state_event_count, pr_event_count) + pub fn count(&self) -> (usize, usize, usize) { + let announcement_count = self.announcement_purgatory.len(); let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum(); let pr_count = self.pr_events.len(); - (state_count, pr_count) + (announcement_count, state_count, pr_count) } /// Get count of expired events being tracked. @@ -717,6 +910,7 @@ impl Purgatory { /// Clear all entries from purgatory (for testing). #[cfg(test)] pub fn clear(&self) { + self.announcement_purgatory.clear(); self.state_events.clear(); self.pr_events.clear(); self.sync_queue.clear(); @@ -990,7 +1184,8 @@ mod tests { #[test] fn test_purgatory_creation() { let purgatory = Purgatory::new(PathBuf::new()); - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1008,7 +1203,8 @@ mod tests { purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0); assert_eq!(state_count, 1); assert_eq!(pr_count, 1); } @@ -1213,7 +1409,7 @@ fn test_cleanup_removes_expired_entries() { purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); // Verify entries are there - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); assert_eq!(pr_count, 2); @@ -1231,14 +1427,14 @@ fn test_cleanup_removes_expired_entries() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // Verify counts assert_eq!(state_removed, 1); assert_eq!(pr_removed, 2); // Verify entries are gone - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1260,14 +1456,14 @@ fn test_cleanup_preserves_non_expired_entries() { purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // Nothing should be removed assert_eq!(state_removed, 0); assert_eq!(pr_removed, 0); // Verify entries are still there - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); assert_eq!(pr_count, 1); } @@ -1314,14 +1510,14 @@ fn test_cleanup_mixed_expired_and_fresh() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // One of each should be removed assert_eq!(state_removed, 1); assert_eq!(pr_removed, 1); // Verify remaining counts - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); // One state event remains assert_eq!(pr_count, 1); // One PR event remains } @@ -1391,7 +1587,7 @@ fn test_expired_event_tracking() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); assert_eq!(state_removed, 1); assert_eq!(pr_removed, 1); @@ -1501,7 +1697,7 @@ fn test_expired_events_prevent_readdition() { } // Event should NOT be re-added - let (state_count, _) = purgatory.count(); + let (_, state_count, _) = purgatory.count(); assert_eq!(state_count, 0, "Event should not be re-added to purgatory"); } @@ -1520,7 +1716,7 @@ fn test_pr_placeholder_not_marked_expired() { } // Run cleanup - let (_, pr_removed) = purgatory.cleanup(); + let (_, _, pr_removed) = purgatory.cleanup(); assert_eq!(pr_removed, 1); // Expired count should be 0 (placeholders don't have event IDs to track) @@ -1606,7 +1802,7 @@ async fn test_save_and_restore_state_events() { assert!(!state_file.exists()); // Verify state events were restored - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 2); let restored_entries = purgatory2.find_state("test-repo"); @@ -1662,7 +1858,7 @@ async fn test_save_and_restore_pr_events() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify PR event was restored - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 1); let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); @@ -1691,7 +1887,7 @@ async fn test_save_and_restore_pr_placeholders() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify placeholder was restored - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 1); let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); @@ -1769,7 +1965,7 @@ async fn test_save_and_restore_empty_purgatory() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify purgatory is still empty - let (state_count, pr_count) = purgatory2.count(); + let (_, state_count, pr_count) = purgatory2.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); assert_eq!(purgatory2.expired_count(), 0); @@ -1789,7 +1985,7 @@ async fn test_restore_missing_file() { assert!(result.is_err()); // Purgatory should remain empty - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1811,7 +2007,7 @@ async fn test_restore_corrupted_json() { assert!(result.is_err()); // Purgatory should remain empty - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -2044,7 +2240,7 @@ async fn test_mixed_pr_events_and_placeholders() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify both were restored correctly - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 2); // Verify PR event @@ -2141,7 +2337,7 @@ async fn test_comprehensive_roundtrip() { purgatory.cleanup(); // Verify initial state - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) assert_eq!(pr_count, 2); // pr-1, pr-2 assert_eq!(purgatory.expired_count(), 1); // expired_event @@ -2154,7 +2350,7 @@ async fn test_comprehensive_roundtrip() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify all data was restored correctly - let (state_count2, pr_count2) = purgatory2.count(); + let (_, state_count2, pr_count2) = purgatory2.count(); assert_eq!(state_count2, 2); assert_eq!(pr_count2, 2); assert_eq!(purgatory2.expired_count(), 1); diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 33c2d12..778cdb8 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -279,7 +279,12 @@ impl SyncContext for RealSyncContext { } async fn fetch_repository_data(&self, identifier: &str) -> Result { - crate::git::authorization::fetch_repository_data(&self.database, identifier).await + crate::git::authorization::fetch_repository_data_with_purgatory( + &self.database, + &self.purgatory, + identifier, + ) + .await } fn collect_needed_oids(&self, identifier: &str) -> HashSet { diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 919504b..d891bc9 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs @@ -6,6 +6,8 @@ use nostr_sdk::prelude::*; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::path::PathBuf; use std::time::Instant; /// Default value for Instant fields during deserialization @@ -113,3 +115,40 @@ pub struct PrPurgatoryEntry { #[serde(skip, default = "instant_now")] pub expires_at: Instant, } + +/// Entry for a repository announcement (kind 30617) waiting in purgatory. +/// +/// Announcements are held in purgatory until git data arrives, proving +/// the repository has actual content. This prevents serving announcements +/// for empty repositories. +/// +/// Note: `Instant` fields cannot be serialized directly. Use the `persistence` +/// module to convert to/from serializable wrapper types. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AnnouncementPurgatoryEntry { + /// The nostr announcement event (kind 30617) + pub event: Event, + + /// The repository identifier from the event's 'd' tag + pub identifier: String, + + /// The owner pubkey (event author) + pub owner: PublicKey, + + /// Path to the bare git repository + pub repo_path: PathBuf, + + /// Relay URLs from the announcement (for sync registration) + pub relays: HashSet, + + /// When this entry was added to purgatory + #[serde(skip, default = "instant_now")] + pub created_at: Instant, + + /// Expiry deadline (30 min from creation, may be extended) + #[serde(skip, default = "instant_now")] + pub expires_at: Instant, + + /// Whether the bare repo has been deleted (soft expiry) + pub soft_expired: bool, +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1ee1872..872df66 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1719,8 +1719,50 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) if result == ProcessResult::Purgatory { + // Announcements (kind 30617) - re-process rejected state events + // When an announcement goes to purgatory, state events that were + // previously rejected ("no announcement exists") can now be authorized + // via fetch_repository_data_with_purgatory. + if event.kind == Kind::GitRepoAnnouncement { + use crate::nostr::events::RepositoryAnnouncement; + + if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) { + // Re-process rejected state events for this announcement + let (removed, hot_events) = rejected_events_index.invalidate_and_get( + &event.pubkey, + &announcement.identifier, + Some(rejected_index::EventType::State), + ); + + if removed > 0 { + tracing::info!( + pubkey = %event.pubkey, + identifier = %announcement.identifier, + removed_from_cold_index = removed, + hot_cache_events = hot_events.len(), + "Invalidated rejected state events (announcement now in purgatory)" + ); + } + + // Re-process state events from hot cache immediately + if !hot_events.is_empty() { + let _stats = Self::reprocess_events_from_hot_cache( + hot_events, + "state event (announcement in purgatory)", + &event.pubkey, + &announcement.identifier, + &relay_url_clone, + &database, + &write_policy, + &local_relay, + &rejected_events_index, + ) + .await; + } + } + } // State events (kind 30618) - extract identifier and trigger immediate sync - if event.kind.as_u16() == 30618 { + else if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { @@ -1754,7 +1796,9 @@ impl SyncManager { // Track pagination state for this subscription (REQ+EOSE) // and received event IDs for negentropy batches - if result == ProcessResult::Saved || result == ProcessResult::Duplicate { + // Include Purgatory results so announcements in purgatory still trigger + // per-repo sync (state events, PR events) from the source relay. + if result == ProcessResult::Saved || result == ProcessResult::Duplicate || result == ProcessResult::Purgatory { let mut pending = pending_sync_index.write().await; if let Some(batches) = pending.get_mut(&relay_url_clone) { for batch in batches.iter_mut() { @@ -2506,6 +2550,26 @@ impl SyncManager { "{} added to purgatory (waiting for git data)", context ); + // Trigger immediate sync for re-processed events that go to purgatory + // (same as sync-triggered events in the main event loop) + if event.kind.as_u16() == 30618 { + // State event - extract identifier from 'd' tag + if let Some(id) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].clone()) + } else { + None + } + }) { + write_policy.purgatory().enqueue_sync_immediate(&id); + } + } else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 { + // PR event - extract identifier from 'a' tag + if let Some(id) = crate::git::sync::extract_identifier_from_pr_event(&event) { + write_policy.purgatory().enqueue_sync_immediate(&id); + } + } } ProcessResult::Rejected => { stats.rejected += 1; diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs index be6959b..e39b4b2 100644 --- a/tests/archive_read_only.rs +++ b/tests/archive_read_only.rs @@ -165,6 +165,7 @@ async fn test_archive_read_only_creates_bare_repo() { // c) Put state event in purgatory (git data missing on archive relay) // d) Fetch git data from source relay's clone URL // e) Release the state event from purgatory + let found = wait_for_event_served( archive_relay.url(), &state_event_id, @@ -267,11 +268,13 @@ async fn test_archive_read_only_creates_bare_repo() { /// This verifies the security model: archive mode only syncs git data /// when there are state events to validate against. /// -/// Scenario: -/// 1. Start source relay with announcement only (no state events) -/// 2. Start archive relay syncing from source -/// 3. Archive relay syncs announcement (creates bare repo) -/// 4. Verify git data is NOT synced (no state events to trigger purgatory sync) +/// With announcement purgatory, the flow is: +/// 1. Send announcement to source relay (goes to purgatory) +/// 2. Send state event to source relay (goes to purgatory) +/// 3. Push git data to source relay (promotes announcement and state event) +/// 4. Start archive relay with sync from source +/// 5. Archive relay syncs the promoted announcement +/// 6. Verify git data is NOT synced (archive has no state event to authorize git fetch) #[tokio::test] async fn test_archive_without_state_events_does_not_sync_git() { // 1. Start source relay @@ -290,7 +293,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { let npub = keys.public_key().to_bech32().expect("Failed to get npub"); - // 3. Create and send announcement listing BOTH relays (but NO state event) + // 3. Create and send announcement listing BOTH relays let announcement = create_repo_announcement( &keys, &[&source_relay.domain(), &archive_domain], @@ -306,7 +309,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { tokio::time::sleep(Duration::from_millis(500)).await; - // Send announcement to source relay + // Send announcement to source relay (goes to purgatory) source_client .send_event(&announcement) .await @@ -314,11 +317,39 @@ async fn test_archive_without_state_events_does_not_sync_git() { tokio::time::sleep(Duration::from_millis(200)).await; - // 4. Push git data to source relay (but no state event to authorize it) - // This push will fail because there's no state event in purgatory - // That's expected - we're testing that archive mode doesn't blindly fetch git data + // 4. Create and send state event to source relay (goes to purgatory) + let clone_url = format!( + "http://{}/{}/{}.git", + source_relay.domain(), + npub, + identifier + ); + let relay_url = source_relay.url().to_string(); + + let state_event = create_state_event( + &keys, + identifier, + &[("main", &commit_hash)], + &[], + &[&clone_url], + &[&relay_url], + ) + .expect("Failed to create state event"); + + source_client + .send_event(&state_event) + .await + .expect("Failed to send state event to source"); + + tokio::time::sleep(Duration::from_millis(200)).await; + + // 5. Push git data to source relay (promotes announcement and state event) + push_to_relay(temp_dir.path(), &source_relay.domain(), &npub, identifier) + .expect("Push to source should succeed"); + + tokio::time::sleep(Duration::from_millis(500)).await; - // 5. Start archive relay + // 6. Start archive relay (without state event - we don't send state event to archive) let archive_relay = TestRelay::start_with_archive_and_sync( archive_port, Some(source_relay.url().to_string()), @@ -333,10 +364,10 @@ async fn test_archive_without_state_events_does_not_sync_git() { .await .expect("Sync connection should establish"); - // Give time for any potential git sync to happen + // Give time for sync to fetch announcement tokio::time::sleep(Duration::from_secs(3)).await; - // 6. Verify bare repository was created (announcement was accepted) + // 7. Verify bare repository was created (announcement was synced and accepted to purgatory) let repo_path = archive_relay .git_data_path() .join(format!("{}/{}.git", npub, identifier)); @@ -346,7 +377,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { "Bare repository should be created for archive announcement" ); - // 7. Verify git data was NOT synced (no state events to trigger purgatory sync) + // 8. Verify git data was NOT synced (no state events on archive to trigger git fetch) // Check that the commit does NOT exist in the archive relay's repo let output = tokio::process::Command::new("git") .args(["cat-file", "-t", &commit_hash]) diff --git a/tests/purgatory.rs b/tests/purgatory.rs index e99540b..efc28c9 100644 --- a/tests/purgatory.rs +++ b/tests/purgatory.rs @@ -58,10 +58,10 @@ macro_rules! isolated_purgatory_test { } // ============================================================ -// Announcement Purgatory Tests (commented out - feature not yet implemented) +// Announcement Purgatory Tests // ============================================================ -// isolated_purgatory_test!(test_announcement_not_served_before_git_data); +isolated_purgatory_test!(test_announcement_not_served_before_git_data); isolated_purgatory_test!(test_announcement_served_after_git_push); isolated_purgatory_test!(test_bare_repo_exists_for_purgatory_announcement); isolated_purgatory_test!(test_state_event_accepted_for_purgatory_announcement); diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs index fe37c33..5abbf15 100644 --- a/tests/purgatory_persistence.rs +++ b/tests/purgatory_persistence.rs @@ -120,7 +120,8 @@ async fn test_full_purgatory_save_restore_cycle() { // so we'll focus on testing state and PR events persistence // Verify initial counts - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0, "Should have 0 announcements"); assert_eq!(state_count, 2, "Should have 2 state events"); assert_eq!( pr_count, 3, @@ -142,7 +143,8 @@ async fn test_full_purgatory_save_restore_cycle() { ); // Verify all data was restored - let (state_count2, pr_count2) = purgatory2.count(); + let (announcement_count2, state_count2, pr_count2) = purgatory2.count(); + assert_eq!(announcement_count2, 0, "Should have 0 announcements after restore"); assert_eq!(state_count2, 2, "Should have 2 state events after restore"); assert_eq!( pr_count2, 3, @@ -275,7 +277,7 @@ async fn test_purgatory_downtime_adjustment() { purgatory2.restore_from_disk(&state_path).unwrap(); // Verify event is still there (downtime was accounted for) - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 1); let repo1_states = purgatory2.find_state("repo1"); @@ -401,7 +403,7 @@ async fn test_purgatory_restore_missing_file() { assert!(result.is_err(), "Should error on missing file"); // Purgatory should still be usable (empty state) - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); @@ -410,7 +412,7 @@ async fn test_purgatory_restore_missing_file() { let event = create_test_event(&keys, "test").await; purgatory.add_state(event, "repo1".to_string(), keys.public_key()); - let (state_count, _) = purgatory.count(); + let (_, state_count, _) = purgatory.count(); assert_eq!(state_count, 1); } @@ -461,7 +463,7 @@ async fn test_purgatory_restore_corrupted_file() { assert!(result.is_err(), "Should error on corrupted file"); // Purgatory should still be usable - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -504,7 +506,7 @@ async fn test_empty_purgatory_save_restore() { purgatory2.restore_from_disk(&state_path).unwrap(); // Verify empty state - let (state_count, pr_count) = purgatory2.count(); + let (_, state_count, pr_count) = purgatory2.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); assert_eq!(purgatory2.expired_count(), 0); @@ -591,7 +593,7 @@ async fn test_purgatory_continues_working_after_restore() { purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key()); // Verify both old and new events work - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 2); let repo1_states = purgatory2.find_state("repo1"); @@ -603,7 +605,7 @@ async fn test_purgatory_continues_working_after_restore() { assert_eq!(repo2_states[0].event.id, event2.id); // Verify cleanup still works - let (state_removed, pr_removed) = purgatory2.cleanup(); + let (_, state_removed, pr_removed) = purgatory2.cleanup(); // Nothing should be expired yet assert_eq!(state_removed, 0); assert_eq!(pr_removed, 0); @@ -684,15 +686,15 @@ async fn test_purgatory_entries_expired_during_downtime() { purgatory2.restore_from_disk(&state_path).unwrap(); // Event should be restored - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 1); // Cleanup should work (even if nothing is expired yet) - let (state_removed, _) = purgatory2.cleanup(); + let (_, state_removed, _) = purgatory2.cleanup(); // Nothing expired yet since we didn't wait 30 minutes assert_eq!(state_removed, 0); - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 1); } -- cgit v1.2.3 From 8c903c9449d387c9b0edefa5aa283b176a3ed0cb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 17:42:08 +0000 Subject: fix: revert wrong sync approach for purgatory announcements The partial fix treating ProcessResult::Purgatory as confirmed in pending_sync_index would trigger full L2/L3 sync for purgatory announcements. Per design (decision #6), purgatory announcements should only sync state events via SyncLevel::StateOnly (not yet implemented). Ignore test_archive_read_only_creates_bare_repo until SyncLevel is implemented in Phase 3. --- src/purgatory/sync/context.rs | 7 +---- src/sync/mod.rs | 68 ++----------------------------------------- tests/archive_read_only.rs | 1 + 3 files changed, 4 insertions(+), 72 deletions(-) (limited to 'src') diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 778cdb8..33c2d12 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -279,12 +279,7 @@ impl SyncContext for RealSyncContext { } async fn fetch_repository_data(&self, identifier: &str) -> Result { - crate::git::authorization::fetch_repository_data_with_purgatory( - &self.database, - &self.purgatory, - identifier, - ) - .await + crate::git::authorization::fetch_repository_data(&self.database, identifier).await } fn collect_needed_oids(&self, identifier: &str) -> HashSet { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 872df66..1ee1872 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1719,50 +1719,8 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) if result == ProcessResult::Purgatory { - // Announcements (kind 30617) - re-process rejected state events - // When an announcement goes to purgatory, state events that were - // previously rejected ("no announcement exists") can now be authorized - // via fetch_repository_data_with_purgatory. - if event.kind == Kind::GitRepoAnnouncement { - use crate::nostr::events::RepositoryAnnouncement; - - if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) { - // Re-process rejected state events for this announcement - let (removed, hot_events) = rejected_events_index.invalidate_and_get( - &event.pubkey, - &announcement.identifier, - Some(rejected_index::EventType::State), - ); - - if removed > 0 { - tracing::info!( - pubkey = %event.pubkey, - identifier = %announcement.identifier, - removed_from_cold_index = removed, - hot_cache_events = hot_events.len(), - "Invalidated rejected state events (announcement now in purgatory)" - ); - } - - // Re-process state events from hot cache immediately - if !hot_events.is_empty() { - let _stats = Self::reprocess_events_from_hot_cache( - hot_events, - "state event (announcement in purgatory)", - &event.pubkey, - &announcement.identifier, - &relay_url_clone, - &database, - &write_policy, - &local_relay, - &rejected_events_index, - ) - .await; - } - } - } // State events (kind 30618) - extract identifier and trigger immediate sync - else if event.kind.as_u16() == 30618 { + if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { @@ -1796,9 +1754,7 @@ impl SyncManager { // Track pagination state for this subscription (REQ+EOSE) // and received event IDs for negentropy batches - // Include Purgatory results so announcements in purgatory still trigger - // per-repo sync (state events, PR events) from the source relay. - if result == ProcessResult::Saved || result == ProcessResult::Duplicate || result == ProcessResult::Purgatory { + if result == ProcessResult::Saved || result == ProcessResult::Duplicate { let mut pending = pending_sync_index.write().await; if let Some(batches) = pending.get_mut(&relay_url_clone) { for batch in batches.iter_mut() { @@ -2550,26 +2506,6 @@ impl SyncManager { "{} added to purgatory (waiting for git data)", context ); - // Trigger immediate sync for re-processed events that go to purgatory - // (same as sync-triggered events in the main event loop) - if event.kind.as_u16() == 30618 { - // State event - extract identifier from 'd' tag - if let Some(id) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].clone()) - } else { - None - } - }) { - write_policy.purgatory().enqueue_sync_immediate(&id); - } - } else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 { - // PR event - extract identifier from 'a' tag - if let Some(id) = crate::git::sync::extract_identifier_from_pr_event(&event) { - write_policy.purgatory().enqueue_sync_immediate(&id); - } - } } ProcessResult::Rejected => { stats.rejected += 1; diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs index e39b4b2..e388ae5 100644 --- a/tests/archive_read_only.rs +++ b/tests/archive_read_only.rs @@ -55,6 +55,7 @@ use std::time::Duration; /// 5. Verify bare repository is created and git data is synced /// 6. Verify git pushes are rejected (read-only mode) #[tokio::test] +#[ignore] // Requires SyncLevel implementation (Phase 3) - purgatory announcements don't trigger per-repo sync yet async fn test_archive_read_only_creates_bare_repo() { // 1. Start source relay let source_relay = TestRelay::start().await; -- cgit v1.2.3 From e922e14e3ec4b898c111b2100cd63dddbe2fcdb1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 19:59:36 +0000 Subject: feat: add SyncLevel to sync system for purgatory announcement state-only sync Purgatory announcements need state events (kind 30618) synced from external relays, but not full L2/L3 events (patches, issues, PRs) which would be rejected anyway. This implements the SyncLevel concept from the design doc (decision #6): - Add SyncLevel enum (Full vs StateOnly) to RepoSyncNeeds - When announcement enters purgatory during sync, register in RepoSyncIndex with SyncLevel::StateOnly - Add build_sync_level_aware_filters() that partitions repos by level: StateOnly repos only get state event filters (kind 30618) - Update derive_relay_targets to track state_only_repos separately - Update compute_actions to handle both repo sets - SelfSubscriber always uses SyncLevel::Full (promoted repos) --- src/sync/algorithms.rs | 58 +++++++++++++++++++++++++++++++++++++------- src/sync/filters.rs | 31 ++++++++++++++++++++++++ src/sync/mod.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++- src/sync/self_subscriber.rs | 17 +++++++++---- 4 files changed, 150 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 39788bc..9899abc 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState}; /// this repo need to sync from", it's "what repos does this relay need to sync". #[derive(Debug, Clone, Default)] pub struct RelaySyncNeeds { - /// Repos that need to be synced from this relay + /// Repos that need full L2+L3 sync from this relay pub repos: HashSet, + /// Repos that only need state event sync (purgatory announcements) + pub state_only_repos: HashSet, /// Root events that need to be tracked from this relay pub root_events: HashSet, } @@ -67,8 +69,15 @@ pub fn derive_relay_targets( for relay_url in &needs.relays { let entry = relay_targets.entry(relay_url.clone()).or_default(); - entry.repos.insert(repo_id.clone()); - entry.root_events.extend(needs.root_events.iter().cloned()); + match needs.sync_level { + super::SyncLevel::Full => { + entry.repos.insert(repo_id.clone()); + entry.root_events.extend(needs.root_events.iter().cloned()); + } + super::SyncLevel::StateOnly => { + entry.state_only_repos.insert(repo_id.clone()); + } + } } } @@ -96,7 +105,7 @@ pub fn compute_actions( pending: &HashMap>, confirmed: &HashMap, ) -> Vec { - use crate::sync::filters::build_layer2_and_layer3_filters; + use crate::sync::filters::build_sync_level_aware_filters; let mut actions = Vec::new(); @@ -140,14 +149,22 @@ pub fn compute_actions( .map(|state| state.root_events.clone()) .unwrap_or_default(); - // Calculate what's NEW (not in pending, not in confirmed) - let new_repos: HashSet = target_needs + // Calculate what's NEW for full repos (not in pending, not in confirmed) + let new_full_repos: HashSet = target_needs .repos .difference(&pending_repos) .filter(|repo| !confirmed_repos.contains(*repo)) .cloned() .collect(); + // Calculate what's NEW for state-only repos + let new_state_only_repos: HashSet = target_needs + .state_only_repos + .difference(&pending_repos) + .filter(|repo| !confirmed_repos.contains(*repo)) + .cloned() + .collect(); + let new_events: HashSet = target_needs .root_events .difference(&pending_events) @@ -156,13 +173,23 @@ pub fn compute_actions( .collect(); // If there's anything new, create an AddFilters action - if !new_repos.is_empty() || !new_events.is_empty() { - let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); + if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty() + { + let filters = build_sync_level_aware_filters( + &new_full_repos, + &new_state_only_repos, + &new_events, + None, + ); + + // Combine all repos into pending items (pending tracking doesn't need sync level) + let mut all_new_repos = new_full_repos; + all_new_repos.extend(new_state_only_repos); actions.push(AddFilters { relay_url: relay_url.clone(), items: PendingItems { - repos: new_repos, + repos: all_new_repos, root_events: new_events, }, filters, @@ -204,6 +231,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events, + sync_level: Default::default(), }, ); @@ -229,6 +257,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events: HashSet::new(), + sync_level: Default::default(), }, ); } @@ -252,6 +281,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events: HashSet::new(), + sync_level: Default::default(), }, ); @@ -285,6 +315,7 @@ mod tests { ModRepoSyncNeeds { relays: relays1, root_events: root_events1, + sync_level: Default::default(), }, ); @@ -299,6 +330,7 @@ mod tests { ModRepoSyncNeeds { relays: relays2, root_events: root_events2, + sync_level: Default::default(), }, ); @@ -332,6 +364,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -366,6 +399,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -389,6 +423,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -428,6 +463,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -465,6 +501,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -510,6 +547,7 @@ mod tests { ] .into_iter() .collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -572,6 +610,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: HashSet::new(), + state_only_repos: HashSet::new(), root_events: vec![event_id].into_iter().collect(), }, ); @@ -599,6 +638,7 @@ mod tests { "wss://new-relay.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 3592489..1215e81 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs @@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters( filters } +/// Builds filters respecting SyncLevel for each repo +/// +/// StateOnly repos only get state event filters (kind 30618). +/// Full repos get all L2/L3 filters (state + repo-tagging + root event). +/// +/// # Arguments +/// * `full_repos` - Repos needing full L2+L3 sync +/// * `state_only_repos` - Repos needing only state event sync (purgatory) +/// * `root_events` - Root event IDs (only used for Full repos) +/// * `since` - Optional timestamp for incremental sync +pub fn build_sync_level_aware_filters( + full_repos: &HashSet, + state_only_repos: &HashSet, + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + + // All repos (both Full and StateOnly) need state event filters + let all_repos: HashSet = full_repos.union(state_only_repos).cloned().collect(); + filters.extend(state_event_filters_for_our_repos(&all_repos, since)); + + // Only Full repos get repo-tagging and root event filters + if !full_repos.is_empty() { + filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since)); + } + filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); + + filters +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1ee1872..519017b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex; // Supporting Data Structures // ============================================================================= +/// Level of sync needed for a repository +/// +/// Purgatory announcements only need state events synced (to validate git data). +/// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SyncLevel { + /// Full L2 + L3 sync (promoted repos with git data) + #[default] + Full, + /// Only state events (kind 30618) - for purgatory announcements + StateOnly, +} + /// What repos and root events need to be synced #[derive(Debug, Clone, Default)] pub struct RepoSyncNeeds { @@ -92,6 +105,8 @@ pub struct RepoSyncNeeds { pub relays: HashSet, /// Root event IDs - 1617/1618/1621 - that reference this repo pub root_events: HashSet, + /// Sync level - StateOnly for purgatory, Full for promoted repos + pub sync_level: SyncLevel, } /// Connection status for a relay @@ -1677,6 +1692,7 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); + let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1719,8 +1735,49 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) if result == ProcessResult::Purgatory { + // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly + // so that state events (kind 30618) are synced for this purgatory announcement + if event.kind == Kind::GitRepoAnnouncement { + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].to_string()) + } else { + None + } + }) { + let repo_id = format!("30617:{}:{}", event.pubkey, identifier); + + // Extract relay URLs from the purgatory entry + let relays = write_policy + .purgatory() + .find_announcement(&event.pubkey, &identifier) + .map(|entry| entry.relays) + .unwrap_or_default(); + + tracing::info!( + event_id = %event.id, + repo_id = %repo_id, + relay_count = relays.len(), + "Registering purgatory announcement in RepoSyncIndex with StateOnly level" + ); + + // Register in RepoSyncIndex with StateOnly level + let mut index = repo_sync_index.write().await; + let entry = index + .entry(repo_id) + .or_insert_with(|| RepoSyncNeeds { + relays: HashSet::new(), + root_events: HashSet::new(), + sync_level: SyncLevel::StateOnly, + }); + entry.relays.extend(relays); + // Don't upgrade sync_level if already Full + // (e.g., if announcement was promoted before this runs) + } + } // State events (kind 30618) - extract identifier and trigger immediate sync - if event.kind.as_u16() == 30618 { + else if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..db16c62 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -16,7 +16,7 @@ use nostr_sdk::Timestamp; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc}; -use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; +use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; // ============================================================================= // LoopControl - Result of notification processing @@ -58,6 +58,7 @@ impl PendingUpdates { let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { relays: HashSet::new(), root_events: HashSet::new(), + sync_level: SyncLevel::Full, }); entry.relays.extend(relays); entry.root_events.extend(root_events); @@ -475,6 +476,7 @@ impl SelfSubscriber { .or_insert_with(|| RepoSyncNeeds { relays: HashSet::new(), root_events: HashSet::new(), + sync_level: SyncLevel::Full, }); entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); @@ -499,21 +501,26 @@ impl SelfSubscriber { continue; } - // Build filters for these repos - let filters = crate::sync::filters::build_layer2_and_layer3_filters( + // Build filters for these repos (sync-level-aware) + let filters = crate::sync::filters::build_sync_level_aware_filters( &needs.repos, + &needs.state_only_repos, &needs.root_events, None, ); // Log before moving values - let repo_count = needs.repos.len(); + let repo_count = needs.repos.len() + needs.state_only_repos.len(); let event_count = needs.root_events.len(); + // Combine all repos into pending items + let mut all_repos = needs.repos; + all_repos.extend(needs.state_only_repos); + let action = AddFilters { relay_url: relay_url.clone(), items: crate::sync::PendingItems { - repos: needs.repos, + repos: all_repos, root_events: needs.root_events, }, filters, -- cgit v1.2.3 From efbbcc49ae8e8f598a24c939b35ad9cda0541663 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 17 Feb 2026 10:58:01 +0000 Subject: fix: include purgatory announcements in state event authorization When processing state events from purgatory, we need to check authorization against announcements that may still be in purgatory (not yet promoted to the database). Previously, process_purgatory_state_events() used fetch_repository_data() which only queries the database. This caused authorization failures when: 1. Git data arrives 2. Announcement is promoted from purgatory to database 3. State events are processed from purgatory 4. But db_repo_data was fetched BEFORE the announcement promotion Now uses fetch_repository_data_with_purgatory() to include both database and purgatory announcements, ensuring authorization works correctly regardless of promotion timing. --- src/git/sync.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/git/sync.rs b/src/git/sync.rs index 13f30b6..a0b7c47 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -37,7 +37,8 @@ use tracing::{debug, info, warn}; use nostr_sdk::Event; use crate::git::authorization::{ - collect_authorized_maintainers, fetch_repository_data, RepositoryData, + collect_authorized_maintainers, fetch_repository_data, fetch_repository_data_with_purgatory, + RepositoryData, }; use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; @@ -923,7 +924,10 @@ async fn process_purgatory_state_events( ); // Fetch repository data once for all state events - let mut db_repo_data = match fetch_repository_data(database, identifier).await { + // IMPORTANT: Use fetch_repository_data_with_purgatory to include announcements + // that may still be in purgatory (not yet promoted). This ensures authorization + // works correctly even if the announcement promotion happens in the same batch. + let mut db_repo_data = match fetch_repository_data_with_purgatory(database, purgatory, identifier).await { Ok(data) => data, Err(e) => { warn!( -- cgit v1.2.3 From cad58fccae7ed84bb033e56de0f1323b714a854d Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 17 Feb 2026 11:43:40 +0000 Subject: docs: clarify why fetch_repository_data excludes purgatory Add comments explaining that PR event processing (both incoming and purgatory) should only use database announcements, not purgatory ones. This is intentional because: - Incoming PR events should only be accepted for validated announcements - Purgatory PR events should only be released when announcement is promoted - This prevents accepting PR events for announcements that fail validation Differs from state event processing which uses fetch_repository_data_with_purgatory because state events check authorization without releasing from purgatory. --- src/git/sync.rs | 3 +++ src/nostr/policy/pr_event.rs | 8 ++++++++ 2 files changed, 11 insertions(+) (limited to 'src') diff --git a/src/git/sync.rs b/src/git/sync.rs index a0b7c47..4b35023 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -1171,6 +1171,9 @@ async fn process_purgatory_pr_events( ); // Fetch repository data for syncing + // NOTE: Only fetch from database, NOT purgatory. PR events should only be + // released from purgatory when the announcement has been promoted (validated). + // This ensures we don't accept PR events for announcements that fail validation. let db_repo_data = match fetch_repository_data(database, identifier).await { Ok(data) => data, Err(e) => { diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index 00e09c3..072e445 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs @@ -127,6 +127,10 @@ impl PrEventPolicy { .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; // Fetch repository data + // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should + // only be accepted for announcements that have been promoted (validated). + // If the announcement is still in purgatory, the PR event should also go + // to purgatory and wait for the announcement to be promoted. let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; // Extract owner pubkey from source repo path @@ -203,6 +207,10 @@ impl PrEventPolicy { let identifier = parts[2]; // 2. Fetch repo data + // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should + // only be accepted for announcements that have been promoted (validated). + // If the announcement is still in purgatory, the PR event should also go + // to purgatory and wait for the announcement to be promoted. let db_repo_data = fetch_repository_data(&self.ctx.database, identifier).await?; // 3. Extract list of maintainers from "a 30617::" tags -- cgit v1.2.3 From 467690f33bbbfd442852e61de221e4e5e161b878 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 08:59:52 +0000 Subject: fix: check purgatory in maintainer announcement lookup is_maintainer_in_any_announcement only queried the database, missing announcements still in purgatory. A maintainer's announcement (which lists the recursive maintainer) may arrive and enter purgatory before the recursive maintainer's announcement does, causing the maintainer exception check to return false and reject the recursive maintainer's announcement. --- src/nostr/policy/announcement.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index 1118497..abe9651 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -222,6 +222,11 @@ impl AnnouncementPolicy { /// /// This enables accepting announcements from maintainers even when they don't list /// this GRASP server, for maintainer chain discovery and GRASP-02 sync. + /// + /// Checks both the database (promoted announcements) and purgatory (announcements + /// waiting for git data). This is necessary because a maintainer's announcement + /// (which lists the recursive maintainer) may still be in purgatory when the + /// recursive maintainer's announcement arrives. async fn is_maintainer_in_any_announcement( &self, identifier: &str, @@ -233,12 +238,26 @@ impl AnnouncementPolicy { identifier.to_string(), ); - let announcements: Vec = match self.ctx.database.query(filter).await { + let db_announcements: Vec = match self.ctx.database.query(filter).await { Ok(events) => events.into_iter().collect(), Err(e) => return Err(format!("Database query failed: {}", e)), }; - if announcements.is_empty() { + // Also collect purgatory announcements for this identifier + let purgatory_announcements: Vec = self + .ctx + .purgatory + .get_announcements_by_identifier(identifier) + .into_iter() + .map(|entry| entry.event) + .collect(); + + let all_announcements: Vec<&Event> = db_announcements + .iter() + .chain(purgatory_announcements.iter()) + .collect(); + + if all_announcements.is_empty() { // No existing announcements for this identifier - author cannot be a maintainer return Ok(false); } @@ -246,14 +265,14 @@ impl AnnouncementPolicy { let author_hex = author.to_hex(); // Check each announcement to see if author is listed as a maintainer - for event in &announcements { + for event in &all_announcements { // Check if author is the owner of this announcement if event.pubkey == *author { return Ok(true); } // Check if author is listed in the maintainers tag - if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { + if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) { if announcement.maintainers.contains(&author_hex) { return Ok(true); } -- cgit v1.2.3 From 0c01797812bb77fc81d0efe58f0e7858f2b7af66 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 09:24:01 +0000 Subject: fix: handle announcement replacement when original is still in purgatory Previously, has_active_announcement() only queried the database, so when a newer announcement arrived for the same (pubkey, identifier) while the original was still in purgatory, it was incorrectly routed as a brand-new announcement (AcceptPurgatory) rather than replacing the existing entry. This change splits the logic into two cases: - If the existing entry is in the database: return Accept (replacement) as before - If the existing entry is only in purgatory: replace the purgatory entry via add_announcement() (which overwrites by key) and extend expiries for both the announcement and any waiting state events, then return Accept - If the owner sends a Reject-classified announcement (service removed) but has a purgatory entry: clear the purgatory entry, delete the bare repo, and remove any waiting state events before rejecting Also add an explicit comment to find_accepted_repository() in related.rs clarifying that it intentionally only checks the database. Related events should only be accepted after the repository announcement has been promoted (validated via git data) - this is correct behaviour, not a missing check. --- src/nostr/policy/announcement.rs | 161 +++++++++++++++++++++++++++++++++------ src/nostr/policy/related.rs | 5 ++ 2 files changed, 141 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index abe9651..a90ec94 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -4,6 +4,7 @@ /// according to GRASP-01 specification. use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag}; use std::collections::HashSet; +use std::time::Duration; use super::PolicyContext; use crate::config::Config; @@ -39,7 +40,8 @@ impl AnnouncementPolicy { /// Validate a repository announcement event /// /// Returns: - /// - `Accept` if this is a replacement announcement (active announcement exists) + /// - `Accept` if this is a replacement announcement (active announcement exists in DB or + /// purgatory) /// - `AcceptPurgatory` if this is a new announcement (no active announcement exists) /// - `AcceptMaintainer` if accepted via maintainer exception /// - `AcceptArchive` if accepted via GRASP-05 archive config @@ -54,6 +56,17 @@ impl AnnouncementPolicy { // GRASP-01 Exception: Accept announcements from recursive maintainers match RepositoryAnnouncement::from_event(event.clone()) { Ok(announcement) => { + // If this pubkey+identifier had a purgatory entry, the owner may be + // sending a new announcement that removes our service. Clear the + // purgatory entry and its bare repo so we don't hold stale data. + if self + .ctx + .purgatory + .has_purgatory_announcement(&event.pubkey, &announcement.identifier) + { + self.remove_purgatory_announcement(&event.pubkey, &announcement.identifier); + } + match self .is_maintainer_in_any_announcement( &announcement.identifier, @@ -76,38 +89,55 @@ impl AnnouncementPolicy { // Parse announcement to check for existing active announcement match RepositoryAnnouncement::from_event(event.clone()) { Ok(announcement) => { - // Check if there's already an active announcement for this (pubkey, identifier) - match self - .has_active_announcement(&event.pubkey, &announcement.identifier) + let in_db = match self + .has_db_announcement(&event.pubkey, &announcement.identifier) .await { - Ok(true) => { - // Replacement announcement - accept immediately - tracing::debug!( - identifier = %announcement.identifier, - "Replacement announcement - accepting immediately" - ); - validation_result - } - Ok(false) => { - // New announcement - route to purgatory - tracing::debug!( - identifier = %announcement.identifier, - "New announcement - routing to purgatory" - ); - AnnouncementResult::AcceptPurgatory - } + Ok(v) => v, Err(e) => { tracing::warn!( error = %e, - "Failed to check for existing announcement - rejecting" + "Failed to check for existing DB announcement - rejecting" ); - AnnouncementResult::Reject(format!( + return AnnouncementResult::Reject(format!( "Database error checking existing announcement: {}", e - )) + )); } + }; + + if in_db { + // Replacement announcement with DB entry - accept immediately + tracing::debug!( + identifier = %announcement.identifier, + "Replacement announcement (DB) - accepting immediately" + ); + return validation_result; } + + let in_purgatory = self + .ctx + .purgatory + .has_purgatory_announcement(&event.pubkey, &announcement.identifier); + + if in_purgatory { + // Replacement announcement with purgatory entry - replace it and + // extend expiry so the new announcement gets a fresh 30-minute window. + tracing::debug!( + identifier = %announcement.identifier, + "Replacement announcement (purgatory) - replacing purgatory entry" + ); + self.replace_purgatory_announcement(event, &announcement); + // Return Accept (not AcceptPurgatory) - this is a replacement, not new + return validation_result; + } + + // No existing announcement - route to purgatory + tracing::debug!( + identifier = %announcement.identifier, + "New announcement - routing to purgatory" + ); + AnnouncementResult::AcceptPurgatory } Err(e) => AnnouncementResult::Reject(format!( "Failed to parse announcement: {}", @@ -120,8 +150,89 @@ impl AnnouncementPolicy { } } - /// Check if there's an active announcement in the database for this (pubkey, identifier) - async fn has_active_announcement( + /// Replace a purgatory announcement entry with a newer event. + /// + /// Called when a replacement announcement arrives for a (pubkey, identifier) pair + /// that is currently in purgatory. Updates the purgatory entry and extends the + /// expiry so the new announcement has a fresh waiting window. + fn replace_purgatory_announcement( + &self, + event: &Event, + announcement: &RepositoryAnnouncement, + ) { + let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); + let relays: HashSet = announcement.relays.iter().cloned().collect(); + + // add_announcement uses the (owner, identifier) key so it overwrites the old entry + self.ctx.purgatory.add_announcement( + event.clone(), + announcement.identifier.clone(), + event.pubkey, + repo_path, + relays, + ); + + // Extend the announcement's expiry (reset to full 30 min window) + self.ctx.purgatory.extend_announcement_expiry( + &event.pubkey, + &announcement.identifier, + Duration::from_secs(1800), + ); + + // Also extend any state events waiting for this identifier + let state_entries = self.ctx.purgatory.find_state(&announcement.identifier); + if !state_entries.is_empty() { + let state_ids: Vec<_> = state_entries.iter().map(|e| e.event.id).collect(); + self.ctx.purgatory.extend_expiry( + &announcement.identifier, + &state_ids, + Duration::from_secs(1800), + ); + } + } + + /// Remove a purgatory announcement and clean up associated resources. + /// + /// Called when a replacement announcement is rejected (owner removed our service). + /// Deletes the bare repository from disk and removes any state events waiting for + /// this identifier. + fn remove_purgatory_announcement(&self, pubkey: &PublicKey, identifier: &str) { + // Get the repo path before removing from purgatory + if let Some(entry) = self.ctx.purgatory.find_announcement(pubkey, identifier) { + // Delete the bare repository from disk + if entry.repo_path.exists() { + if let Err(e) = std::fs::remove_dir_all(&entry.repo_path) { + tracing::warn!( + path = %entry.repo_path.display(), + error = %e, + "Failed to delete bare repository during purgatory cleanup" + ); + } else { + tracing::info!( + path = %entry.repo_path.display(), + "Deleted bare repository for rejected purgatory announcement" + ); + } + } + } + + // Remove the announcement from purgatory + self.ctx.purgatory.remove_announcement(pubkey, identifier); + + // Remove any state events waiting for this identifier + self.ctx.purgatory.remove_state(identifier); + + tracing::info!( + identifier = %identifier, + "Cleared purgatory entry: owner removed our service from announcement" + ); + } + + /// Check if there's an announcement in the database for this (pubkey, identifier). + /// + /// Only checks the database (promoted announcements). For purgatory checks use + /// `purgatory.has_purgatory_announcement()` directly. + async fn has_db_announcement( &self, pubkey: &PublicKey, identifier: &str, diff --git a/src/nostr/policy/related.rs b/src/nostr/policy/related.rs index 7ce87db..cfe04a7 100644 --- a/src/nostr/policy/related.rs +++ b/src/nostr/policy/related.rs @@ -139,6 +139,11 @@ impl RelatedEventPolicy { .push((addr, pubkey, identifier)); } + // NOTE: Intentionally only checks the database (promoted announcements), not purgatory. + // Related events should only be accepted once the repository announcement has been + // validated (promoted via git data). Events referencing purgatory-only repositories + // are correctly rejected as orphans and can be re-submitted after promotion. + // Query each kind group for (kind, refs) in by_kind { let authors: Vec = refs.iter().map(|(_, pk, _)| *pk).collect(); -- cgit v1.2.3 From 85d621c791efaad1245c1aec8e5185a1eb78c7b9 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 09:24:10 +0000 Subject: fix: break circular deadlock in sync loop by including purgatory in URL lookup The sync loop calls fetch_repository_data() to get clone URLs so it knows where to fetch git data from. Previously this only queried the database, which means an announcement still in purgatory (no git data yet) would return no clone URLs, so the sync loop could never fetch the git data needed to promote the announcement - a circular deadlock. Fix by switching to fetch_repository_data_with_purgatory() which combines database announcements with purgatory announcements. Update the trait method's doc comment to document this behaviour. The mock implementation in tests is unaffected since it returns pre-configured data rather than delegating to either function. --- src/purgatory/sync/context.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 33c2d12..3568e89 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -75,7 +75,12 @@ pub trait SyncContext: Send + Sync { /// # Returns /// Set of clone URLs from PR events in purgatory for this identifier fn collect_pr_clone_urls(&self, identifier: &str) -> HashSet; - /// Get repository data (announcements, clone URLs, etc.) from the database. + /// Get repository data (announcements, clone URLs, etc.) from the database and purgatory. + /// + /// Checks both the database (promoted announcements) and purgatory (announcements + /// awaiting git data). This is necessary to obtain clone URLs when an announcement + /// has not yet been promoted - without purgatory data, the sync loop would have no + /// URLs to fetch from and the announcement could never be promoted (circular deadlock). /// /// # Arguments /// * `identifier` - The repository identifier (d-tag value) @@ -279,7 +284,16 @@ impl SyncContext for RealSyncContext { } async fn fetch_repository_data(&self, identifier: &str) -> Result { - crate::git::authorization::fetch_repository_data(&self.database, identifier).await + // Use the purgatory-aware variant so that clone URLs from announcements still + // in purgatory (not yet promoted) are available. Without this, the sync loop + // would find no URLs to fetch from and the announcement could never be promoted + // (circular deadlock: can't promote without git data, can't get git data without URLs). + crate::git::authorization::fetch_repository_data_with_purgatory( + &self.database, + &self.purgatory, + identifier, + ) + .await } fn collect_needed_oids(&self, identifier: &str) -> HashSet { -- cgit v1.2.3 From 28aa19bc5b196f2259ab8ff0ac8534afe886529f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 09:43:34 +0000 Subject: fix: only evict purgatory entry when incoming rejected announcement is newer An older rejected announcement (e.g. a relay replay of a superseded event) was incorrectly evicting a newer purgatory entry for the same pubkey+identifier. Now only evict when the incoming event's created_at is strictly greater than the stored entry's created_at. --- src/nostr/policy/announcement.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index a90ec94..9b92aeb 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -56,14 +56,20 @@ impl AnnouncementPolicy { // GRASP-01 Exception: Accept announcements from recursive maintainers match RepositoryAnnouncement::from_event(event.clone()) { Ok(announcement) => { - // If this pubkey+identifier had a purgatory entry, the owner may be - // sending a new announcement that removes our service. Clear the - // purgatory entry and its bare repo so we don't hold stale data. - if self + // If this pubkey+identifier has a purgatory entry AND the incoming + // event is strictly newer, the owner is sending a replacement that + // removes our service. Clear the purgatory entry and its bare repo. + // + // If the incoming event is older than the purgatory entry (e.g. a + // relay replay of a superseded announcement), ignore it — the newer + // purgatory entry takes precedence and must not be evicted. + let should_evict = self .ctx .purgatory - .has_purgatory_announcement(&event.pubkey, &announcement.identifier) - { + .find_announcement(&event.pubkey, &announcement.identifier) + .is_some_and(|entry| event.created_at > entry.event.created_at); + + if should_evict { self.remove_purgatory_announcement(&event.pubkey, &announcement.identifier); } -- cgit v1.2.3 From 2f365fc3b209f6d377d59a6ab8a6891b0350fee6 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 09:58:41 +0000 Subject: fix: preserve state events when another owner's announcement remains in purgatory remove_purgatory_announcement() was unconditionally wiping all state events for an identifier when one owner's announcement was evicted. State events are keyed by identifier alone, so this incorrectly discarded state events belonging to a different owner's repository sharing the same identifier string. Now only removes state events if no other owner's announcement remains in purgatory for that identifier. --- src/nostr/policy/announcement.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index 9b92aeb..b366f0b 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -225,11 +225,23 @@ impl AnnouncementPolicy { // Remove the announcement from purgatory self.ctx.purgatory.remove_announcement(pubkey, identifier); - // Remove any state events waiting for this identifier - self.ctx.purgatory.remove_state(identifier); + // Only remove state events if no other owner still has an announcement in purgatory + // for this identifier. State events are keyed by identifier alone, so blindly removing + // them would also discard state events legitimately belonging to a different owner's + // repository that happens to share the same identifier string. + let other_owners_remain = !self + .ctx + .purgatory + .get_announcements_by_identifier(identifier) + .is_empty(); + + if !other_owners_remain { + self.ctx.purgatory.remove_state(identifier); + } tracing::info!( identifier = %identifier, + other_owners_remain = %other_owners_remain, "Cleared purgatory entry: owner removed our service from announcement" ); } -- cgit v1.2.3 From 806936e7d1aab5dfd0c2ad6b98a115122dc1785c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:04 +0000 Subject: fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery StateOnly repos in a pending batch had their repo IDs included in the negentropy REQ+EOSE fallback, which called build_layer2_and_layer3_filters. This generated #a/#A/#q tag filters for repos whose announcements were still in purgatory (not yet promoted to the database). When the remote relay responded with PR events matching those filters, the write policy correctly rejected them as 'orphan' (no accepted repo in DB yet). However, nostr-sdk's client-level deduplication then silently dropped the same event on all subsequent deliveries, making it permanently unavailable even after the announcement was promoted. Fix: split batch_repos into full vs state-only by consulting repo_sync_index at fallback time, then call build_sync_level_aware_filters which only generates #a/#A/#q filters for Full repos. StateOnly repos only get the kind 30618 + #d filter they were originally subscribed with. --- src/sync/mod.rs | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 108 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..6ab8d33 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -557,6 +557,13 @@ pub struct SyncManager { /// Purgatory for read-only access to events awaiting git data purgatory: Arc, /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) + // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters + // actions when user-submitted purgatory announcements need to trigger relay discovery. + /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) + #[allow(dead_code)] + action_tx: Option>, + /// Receiver for AddFilters actions (taken by run() when the event loop starts) + action_rx: Option>, local_relay: LocalRelay, /// Configuration reference for sync settings config: Config, @@ -643,6 +650,11 @@ impl SyncManager { } } + // Create action channel upfront so callers (e.g. write policy) can send AddFilters + // actions before run() is called (e.g. when user-submitted purgatory announcements + // need to trigger relay discovery). + let (action_tx, action_rx) = tokio::sync::mpsc::channel::(100); + Self { bootstrap_relay_url, service_domain, @@ -663,9 +675,22 @@ impl SyncManager { connect_tx: None, shutdown_tx: None, metrics: sync_metrics, + action_tx: Some(action_tx), + action_rx: Some(action_rx), } } + /// Get a clone of the action sender for external use. + /// + /// This allows the write policy to send AddFilters actions to the SyncManager + /// when user-submitted purgatory announcements need to trigger relay discovery. + /// + /// # Returns + /// Clone of the action sender, or None if the channel was never created. + pub fn action_tx(&self) -> Option> { + self.action_tx.clone() + } + /// Generate a unique batch ID /// /// Increments the internal counter and returns the new value. @@ -686,6 +711,17 @@ impl SyncManager { self.rejected_events_index.clone() } + /// Get a clone of the repo sync index. + /// + /// This allows access to the repo sync index for upgrading sync levels + /// when announcements are promoted from purgatory. + /// + /// # Returns + /// Clone of the repo sync index (Arc>) + pub fn repo_sync_index(&self) -> RepoSyncIndex { + self.repo_sync_index.clone() + } + /// Save rejected events index to disk. /// /// This is called during shutdown to persist the rejected events cache, @@ -949,11 +985,31 @@ impl SyncManager { // Drop the lock before async operations drop(pending); - // Create REQ+EOSE subscriptions using original semantic filters + // Create REQ+EOSE subscriptions using sync-level-aware filters. // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail - let fallback_filters = filters::build_layer2_and_layer3_filters( - &batch_repos, + // succeed even when ID-based queries fail. + // + // CRITICAL: Use build_sync_level_aware_filters to avoid generating + // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements + // are still in purgatory. If we send Layer 2 filters too early, the + // remote relay may return PR events that our write policy rejects as + // "orphan" (no promoted repo). nostr-sdk deduplication then silently + // drops the event on retry, making it permanently unavailable. + let (full_repos, state_only_repos) = { + let index = self.repo_sync_index.read().await; + let mut full = HashSet::new(); + let mut state_only = HashSet::new(); + for repo_id in &batch_repos { + match index.get(repo_id).map(|n| n.sync_level) { + Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } + _ => { full.insert(repo_id.clone()); } + } + } + (full, state_only) + }; + let fallback_filters = filters::build_sync_level_aware_filters( + &full_repos, + &state_only_repos, &batch_root_events, None, ); @@ -1037,8 +1093,20 @@ impl SyncManager { pending.remove(&relay_url_for_fallback); } drop(pending); + let is_generic_filter = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; + + // Trigger filter recomputation for generic filter batches + if is_generic_filter { + tracing::info!( + relay = %relay_url_for_fallback, + "Announcement batch complete (fallback path) - triggering filter recomputation" + ); + self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) + .await; + } } } return; @@ -1136,8 +1204,20 @@ impl SyncManager { pending.remove(&relay_url_for_retry); } drop(pending); + let is_generic_filter = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; + + // Trigger filter recomputation for generic filter batches + if is_generic_filter { + tracing::info!( + relay = %relay_url_for_retry, + "Announcement batch complete (retry path) - triggering filter recomputation" + ); + self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) + .await; + } } } return; @@ -1158,7 +1238,20 @@ impl SyncManager { drop(pending); // 4. Confirm the batch (moves items to RelayState) + let is_generic_filter = + completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); self.confirm_batch(relay_url, completed_batch).await; + + // 5. For generic filter batches (announcements), trigger filter recomputation + // to subscribe to state events for purgatory announcements that were registered + // during event processing. + if is_generic_filter { + tracing::info!( + relay = %relay_url, + "Announcement batch complete - triggering filter recomputation for purgatory repos" + ); + self.recompute_new_sync_filters_for_relay(relay_url).await; + } } /// Confirm a completed batch by moving items to RelayState @@ -1437,8 +1530,16 @@ impl SyncManager { "SyncManager starting" ); - // 1. Create action channel for self-subscriber -> manager communication - let (action_tx, mut action_rx) = mpsc::channel::(100); + // 1. Take action channel receiver (created in new()) - sender is shared with write policy + let mut action_rx = self + .action_rx + .take() + .expect("action_rx should be set in new()"); + // Get a clone of action_tx for the self-subscriber + let action_tx_for_subscriber = self + .action_tx + .clone() + .expect("action_tx should be set in new()"); // 2. Create disconnect channel for spawned tasks -> manager communication let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); @@ -1457,7 +1558,7 @@ impl SyncManager { format!("ws://{}", self.config.bind_address), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), - action_tx, + action_tx_for_subscriber, ); let subscriber_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); -- cgit v1.2.3 From d76003b629a4a03dba23a8a1c41da6e4ac4c30cf Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:17 +0000 Subject: feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion When git data arrives for a purgatory announcement and promotes it to the database, the relay now: 1. Upgrades the announcement's sync level in RepoSyncIndex from StateOnly to Full (git/sync.rs: process_purgatory_announcements) 2. Sends AddFilters actions to SyncManager for all connected relays, using Full sync filters (Layer 2 #a/#A/#q) to subscribe to PR events (purgatory/sync/context.rs: RealSyncContext.process_newly_available_git_data) 3. For user-submitted purgatory announcements, registers the repo in RepoSyncIndex with StateOnly level and sends AddFilters to SyncManager so it discovers and connects to relays listed in the announcement tags (nostr/builder.rs: handle_announcement AcceptPurgatory path) The RealSyncContext now accepts optional repo_sync_index and sync_action_tx parameters. main.rs wires these up from SyncManager. PolicyContext gains repo_sync_index and sync_action_tx fields for the write policy path. --- src/git/handlers.rs | 1 + src/git/sync.rs | 21 ++++++++ src/main.rs | 20 +++++++ src/nostr/builder.rs | 118 ++++++++++++++++++++++++++++++++++++++++++ src/nostr/policy/mod.rs | 37 +++++++++++++ src/purgatory/sync/context.rs | 116 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 313 insertions(+) (limited to 'src') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..129ca2c 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,6 +307,7 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + None, ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index 4b35023..b3fa11a 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -44,6 +44,7 @@ use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; +use crate::sync::{RepoSyncIndex, SyncLevel}; /// Result of processing newly available git data. /// @@ -809,6 +810,7 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) /// * `purgatory` - Purgatory instance to check for satisfiable events /// * `git_data_path` - Base path for git repositories +/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion /// /// # Returns /// A `ProcessResult` describing what was processed @@ -819,6 +821,7 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + repo_sync_index: Option, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -848,6 +851,7 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, + repo_sync_index.as_ref(), ) .await; result.merge(announcement_result); @@ -1284,6 +1288,7 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + repo_sync_index: Option<&RepoSyncIndex>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1338,6 +1343,22 @@ async fn process_purgatory_announcements( } } + // Upgrade sync level to Full in repo_sync_index + if let Some(index) = repo_sync_index { + let mut index = index.write().await; + // Use hex pubkey format to match how repo_sync_index keys are built + // (sync/mod.rs uses event.pubkey which is hex, not bech32) + let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); + if let Some(entry) = index.get_mut(&repo_id) { + entry.sync_level = SyncLevel::Full; + debug!( + identifier = %identifier, + repo_id = %repo_id, + "Upgraded sync level to Full after announcement promotion" + ); + } + } + result.announcements_released += 1; } Err(e) => { diff --git a/src/main.rs b/src/main.rs index ab6ede7..3ff30fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,24 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); + // Get a reference to the repo sync index for upgrading sync levels on promotion + let repo_sync_index = sync_manager.repo_sync_index(); + + // Set the repo sync index on the write policy so user-submitted purgatory + // announcements can trigger relay discovery (connect to relays in announcement tags) + relay_with_db + .write_policy + .set_repo_sync_index(repo_sync_index.clone()); + + // Get the action sender BEFORE consuming sync_manager with spawn + let action_tx = sync_manager.action_tx(); + + // Set the sync action sender so the write policy can trigger relay connections + // when user-submitted purgatory announcements are registered with StateOnly level + if let Some(tx) = action_tx.clone() { + relay_with_db.write_policy.set_sync_action_tx(tx); + } + tokio::spawn(async move { sync_manager.run().await; }); @@ -184,6 +202,8 @@ async fn main() -> Result<()> { Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), + Some(repo_sync_index), + action_tx, )); // Create throttle manager for rate limiting remote git servers diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..4c66f6d 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,6 +98,24 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } + /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. + /// + /// When a user submits an announcement that goes to purgatory (no git data yet), + /// the relay needs to discover and connect to relays listed in the announcement's + /// `relays` and `clone` tags. This index is updated when the announcement is accepted + /// into purgatory, triggering the sync system to connect and sync state events. + pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { + self.ctx.set_repo_sync_index(index); + } + + /// Set the sync action sender for sending AddFilters actions to SyncManager. + /// + /// This allows the write policy to notify the SyncManager when user-submitted + /// purgatory announcements need relay discovery (triggering new connections). + pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { + self.ctx.set_sync_action_tx(tx); + } + /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -146,6 +164,106 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); + + // Register in repo_sync_index with StateOnly level so the sync + // system discovers and connects to relays listed in this announcement. + // This is needed for user-submitted announcements (not via sync path) + // to trigger relay discovery and state event sync. + if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].to_string()) + } else { + None + } + }) { + let repo_id = + format!("30617:{}:{}", event.pubkey, identifier); + + // Get relay URLs stored in purgatory for this announcement + let relays = self + .ctx + .purgatory + .find_announcement(&event.pubkey, &identifier) + .map(|entry| entry.relays) + .unwrap_or_default(); + + if !relays.is_empty() { + use crate::sync::{ + AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, + }; + + // Update repo_sync_index with StateOnly for this repo + let new_repos = { + let mut index = repo_sync_index.write().await; + let entry = + index.entry(repo_id.clone()).or_insert_with(|| { + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + entry.relays.extend(relays.iter().cloned()); + // Don't upgrade if already Full + tracing::info!( + repo_id = %repo_id, + relay_count = entry.relays.len(), + "Registered user-submitted purgatory announcement in \ + RepoSyncIndex with StateOnly level for relay discovery" + ); + // Return cloned relays for AddFilters + relays.clone() + }; + + // Send AddFilters to SyncManager so it connects to these relays + if let Some(tx) = self.ctx.get_sync_action_tx() { + // Build state-only filters for this repo + let state_only_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &std::collections::HashSet::new(), + &state_only_repos, + &std::collections::HashSet::new(), + None, + ); + + for relay_url in new_repos { + // Skip our own domain + if relay_url.contains(&self.ctx.domain) { + continue; + } + let action = AddFilters { + relay_url: relay_url.clone(), + items: PendingItems { + repos: state_only_repos.clone(), + root_events: std::collections::HashSet::new(), + }, + filters: filters.clone(), + }; + if let Err(e) = tx.send(action).await { + tracing::warn!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters action for \ + user-submitted purgatory announcement" + ); + } else { + tracing::info!( + relay = %relay_url, + repo_id = %repo_id, + "Sent AddFilters to SyncManager for \ + user-submitted purgatory announcement relay" + ); + } + } + } + } + } + } + WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..78a09fc 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; +use crate::sync::{AddFilters, RepoSyncIndex}; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -34,6 +35,16 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, + /// Optional repo sync index for triggering relay discovery when announcements + /// go to purgatory via user submission (not via the sync path). + /// Wrapped in Arc for interior mutability (PolicyContext is Clone). + pub repo_sync_index: Arc>>, + /// Optional sender for AddFilters actions to SyncManager. + /// Used to trigger relay discovery when user-submitted purgatory announcements + /// are registered with StateOnly sync level. + /// Wrapped in Arc for interior mutability (PolicyContext is Clone). + pub sync_action_tx: + Arc>>>, } impl PolicyContext { @@ -51,6 +62,8 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, + repo_sync_index: Arc::new(std::sync::RwLock::new(None)), + sync_action_tx: Arc::new(std::sync::RwLock::new(None)), } } @@ -68,4 +81,28 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } + + /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + let mut guard = self.repo_sync_index.write().unwrap(); + *guard = Some(index); + } + + /// Get a clone of the repo sync index if it's been set. + pub fn get_repo_sync_index(&self) -> Option { + let guard = self.repo_sync_index.read().unwrap(); + guard.clone() + } + + /// Set the sync action sender for sending AddFilters actions to SyncManager. + pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { + let mut guard = self.sync_action_tx.write().unwrap(); + *guard = Some(tx); + } + + /// Get a clone of the sync action sender if it's been set. + pub fn get_sync_action_tx(&self) -> Option> { + let guard = self.sync_action_tx.read().unwrap(); + guard.clone() + } } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..4dbb402 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -193,6 +193,7 @@ use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::Purgatory; use crate::sync::naughty_list::NaughtyListTracker; +use crate::sync::RepoSyncIndex; use super::functions::extract_domain; @@ -221,6 +222,13 @@ pub struct RealSyncContext { /// Naughty list tracker for git remote domains with persistent errors git_naughty_list: Arc, + + /// Optional repo sync index for upgrading sync level on promotion + repo_sync_index: Option, + + /// Optional sender for AddFilters actions to SyncManager. + /// Used after announcement promotion to trigger PR event subscription on connected relays. + sync_action_tx: Option>, } impl RealSyncContext { @@ -233,6 +241,9 @@ impl RealSyncContext { /// * `our_domain` - Our domain to exclude from clone URLs /// * `local_relay` - Local relay for WebSocket notifications /// * `git_naughty_list` - Naughty list tracker for git remote domains + /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion + /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion + #[allow(clippy::too_many_arguments)] pub fn new( purgatory: Arc, database: SharedDatabase, @@ -240,6 +251,8 @@ impl RealSyncContext { our_domain: Option, local_relay: Option, git_naughty_list: Arc, + repo_sync_index: Option, + sync_action_tx: Option>, ) -> Self { Self { purgatory, @@ -248,9 +261,23 @@ impl RealSyncContext { our_domain_value: our_domain, local_relay, git_naughty_list, + repo_sync_index, + sync_action_tx, } } + /// Set the sync action sender for triggering filter recomputation after announcement promotion. + /// + /// When an announcement is promoted from purgatory to Full sync level, the SyncManager + /// needs to subscribe to PR events for that repo on all connected relays. This sender + /// is used to trigger that subscription. + pub fn set_sync_action_tx( + &mut self, + tx: tokio::sync::mpsc::Sender, + ) { + self.sync_action_tx = Some(tx); + } + /// Get reference to the git naughty list tracker pub fn git_naughty_list(&self) -> &Arc { &self.git_naughty_list @@ -482,9 +509,98 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, + self.repo_sync_index.clone(), ) .await?; + // If announcements were promoted (now Full sync level), notify SyncManager to + // recompute filters so PR event subscriptions are created on connected relays. + if result.announcements_released > 0 { + if let (Some(ref tx), Some(ref repo_sync_index)) = + (&self.sync_action_tx, &self.repo_sync_index) + { + let index = repo_sync_index.read().await; + for (repo_id, needs) in index.iter() { + if needs.sync_level == crate::sync::SyncLevel::Full + && !needs.root_events.is_empty() + { + // Send AddFilters for Full repos with root events + for relay_url in &needs.relays { + if let Some(ref domain) = self.our_domain_value { + if relay_url.contains(domain.as_str()) { + continue; + } + } + let full_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &full_repos, + &std::collections::HashSet::new(), + &needs.root_events, + None, + ); + let action = crate::sync::AddFilters { + relay_url: relay_url.clone(), + items: crate::sync::PendingItems { + repos: full_repos.clone(), + root_events: needs.root_events.clone(), + }, + filters, + }; + if let Err(e) = tx.send(action).await { + debug!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters after announcement promotion" + ); + } else { + debug!( + relay = %relay_url, + repo_id = %repo_id, + "Sent AddFilters to SyncManager after announcement promotion" + ); + } + } + } else if needs.sync_level == crate::sync::SyncLevel::Full { + // Even without root_events, send empty repo filter to ensure + // Layer 2 subscriptions (PR events) are set up + for relay_url in &needs.relays { + if let Some(ref domain) = self.our_domain_value { + if relay_url.contains(domain.as_str()) { + continue; + } + } + let full_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &full_repos, + &std::collections::HashSet::new(), + &std::collections::HashSet::new(), + None, + ); + let action = crate::sync::AddFilters { + relay_url: relay_url.clone(), + items: crate::sync::PendingItems { + repos: full_repos.clone(), + root_events: std::collections::HashSet::new(), + }, + filters, + }; + if let Err(e) = tx.send(action).await { + debug!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters (no root_events) after announcement promotion" + ); + } + } + } + } + } + } + // Convert from git::sync::ProcessResult to our ProcessResult Ok(ProcessResult { states_released: result.states_released, -- cgit v1.2.3 From 3d9359d5ac0045fb93fd8732160e0de8413d6881 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:28:28 +0000 Subject: Revert "feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion" This reverts commit d76003b629a4a03dba23a8a1c41da6e4ac4c30cf. --- src/git/handlers.rs | 1 - src/git/sync.rs | 21 -------- src/main.rs | 20 ------- src/nostr/builder.rs | 118 ------------------------------------------ src/nostr/policy/mod.rs | 37 ------------- src/purgatory/sync/context.rs | 116 ----------------------------------------- 6 files changed, 313 deletions(-) (limited to 'src') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 129ca2c..017eee4 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,7 +307,6 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, - None, ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index b3fa11a..4b35023 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -44,7 +44,6 @@ use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; -use crate::sync::{RepoSyncIndex, SyncLevel}; /// Result of processing newly available git data. /// @@ -810,7 +809,6 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) /// * `purgatory` - Purgatory instance to check for satisfiable events /// * `git_data_path` - Base path for git repositories -/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion /// /// # Returns /// A `ProcessResult` describing what was processed @@ -821,7 +819,6 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -851,7 +848,6 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, - repo_sync_index.as_ref(), ) .await; result.merge(announcement_result); @@ -1288,7 +1284,6 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option<&RepoSyncIndex>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1343,22 +1338,6 @@ async fn process_purgatory_announcements( } } - // Upgrade sync level to Full in repo_sync_index - if let Some(index) = repo_sync_index { - let mut index = index.write().await; - // Use hex pubkey format to match how repo_sync_index keys are built - // (sync/mod.rs uses event.pubkey which is hex, not bech32) - let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); - if let Some(entry) = index.get_mut(&repo_id) { - entry.sync_level = SyncLevel::Full; - debug!( - identifier = %identifier, - repo_id = %repo_id, - "Upgraded sync level to Full after announcement promotion" - ); - } - } - result.announcements_released += 1; } Err(e) => { diff --git a/src/main.rs b/src/main.rs index 3ff30fb..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,24 +132,6 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); - // Get a reference to the repo sync index for upgrading sync levels on promotion - let repo_sync_index = sync_manager.repo_sync_index(); - - // Set the repo sync index on the write policy so user-submitted purgatory - // announcements can trigger relay discovery (connect to relays in announcement tags) - relay_with_db - .write_policy - .set_repo_sync_index(repo_sync_index.clone()); - - // Get the action sender BEFORE consuming sync_manager with spawn - let action_tx = sync_manager.action_tx(); - - // Set the sync action sender so the write policy can trigger relay connections - // when user-submitted purgatory announcements are registered with StateOnly level - if let Some(tx) = action_tx.clone() { - relay_with_db.write_policy.set_sync_action_tx(tx); - } - tokio::spawn(async move { sync_manager.run().await; }); @@ -202,8 +184,6 @@ async fn main() -> Result<()> { Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), - Some(repo_sync_index), - action_tx, )); // Create throttle manager for rate limiting remote git servers diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 4c66f6d..aff12a6 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,24 +98,6 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - /// - /// When a user submits an announcement that goes to purgatory (no git data yet), - /// the relay needs to discover and connect to relays listed in the announcement's - /// `relays` and `clone` tags. This index is updated when the announcement is accepted - /// into purgatory, triggering the sync system to connect and sync state events. - pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { - self.ctx.set_repo_sync_index(index); - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - /// - /// This allows the write policy to notify the SyncManager when user-submitted - /// purgatory announcements need relay discovery (triggering new connections). - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - self.ctx.set_sync_action_tx(tx); - } - /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -164,106 +146,6 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); - - // Register in repo_sync_index with StateOnly level so the sync - // system discovers and connects to relays listed in this announcement. - // This is needed for user-submitted announcements (not via sync path) - // to trigger relay discovery and state event sync. - if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = - format!("30617:{}:{}", event.pubkey, identifier); - - // Get relay URLs stored in purgatory for this announcement - let relays = self - .ctx - .purgatory - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - if !relays.is_empty() { - use crate::sync::{ - AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, - }; - - // Update repo_sync_index with StateOnly for this repo - let new_repos = { - let mut index = repo_sync_index.write().await; - let entry = - index.entry(repo_id.clone()).or_insert_with(|| { - RepoSyncNeeds { - relays: std::collections::HashSet::new(), - root_events: std::collections::HashSet::new(), - sync_level: SyncLevel::StateOnly, - } - }); - entry.relays.extend(relays.iter().cloned()); - // Don't upgrade if already Full - tracing::info!( - repo_id = %repo_id, - relay_count = entry.relays.len(), - "Registered user-submitted purgatory announcement in \ - RepoSyncIndex with StateOnly level for relay discovery" - ); - // Return cloned relays for AddFilters - relays.clone() - }; - - // Send AddFilters to SyncManager so it connects to these relays - if let Some(tx) = self.ctx.get_sync_action_tx() { - // Build state-only filters for this repo - let state_only_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &std::collections::HashSet::new(), - &state_only_repos, - &std::collections::HashSet::new(), - None, - ); - - for relay_url in new_repos { - // Skip our own domain - if relay_url.contains(&self.ctx.domain) { - continue; - } - let action = AddFilters { - relay_url: relay_url.clone(), - items: PendingItems { - repos: state_only_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters: filters.clone(), - }; - if let Err(e) = tx.send(action).await { - tracing::warn!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters action for \ - user-submitted purgatory announcement" - ); - } else { - tracing::info!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager for \ - user-submitted purgatory announcement relay" - ); - } - } - } - } - } - } - WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 78a09fc..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; -use crate::sync::{AddFilters, RepoSyncIndex}; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -35,16 +34,6 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, - /// Optional repo sync index for triggering relay discovery when announcements - /// go to purgatory via user submission (not via the sync path). - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub repo_sync_index: Arc>>, - /// Optional sender for AddFilters actions to SyncManager. - /// Used to trigger relay discovery when user-submitted purgatory announcements - /// are registered with StateOnly sync level. - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub sync_action_tx: - Arc>>>, } impl PolicyContext { @@ -62,8 +51,6 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, - repo_sync_index: Arc::new(std::sync::RwLock::new(None)), - sync_action_tx: Arc::new(std::sync::RwLock::new(None)), } } @@ -81,28 +68,4 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } - - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - let mut guard = self.repo_sync_index.write().unwrap(); - *guard = Some(index); - } - - /// Get a clone of the repo sync index if it's been set. - pub fn get_repo_sync_index(&self) -> Option { - let guard = self.repo_sync_index.read().unwrap(); - guard.clone() - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - let mut guard = self.sync_action_tx.write().unwrap(); - *guard = Some(tx); - } - - /// Get a clone of the sync action sender if it's been set. - pub fn get_sync_action_tx(&self) -> Option> { - let guard = self.sync_action_tx.read().unwrap(); - guard.clone() - } } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 4dbb402..3568e89 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -193,7 +193,6 @@ use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::Purgatory; use crate::sync::naughty_list::NaughtyListTracker; -use crate::sync::RepoSyncIndex; use super::functions::extract_domain; @@ -222,13 +221,6 @@ pub struct RealSyncContext { /// Naughty list tracker for git remote domains with persistent errors git_naughty_list: Arc, - - /// Optional repo sync index for upgrading sync level on promotion - repo_sync_index: Option, - - /// Optional sender for AddFilters actions to SyncManager. - /// Used after announcement promotion to trigger PR event subscription on connected relays. - sync_action_tx: Option>, } impl RealSyncContext { @@ -241,9 +233,6 @@ impl RealSyncContext { /// * `our_domain` - Our domain to exclude from clone URLs /// * `local_relay` - Local relay for WebSocket notifications /// * `git_naughty_list` - Naughty list tracker for git remote domains - /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion - /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion - #[allow(clippy::too_many_arguments)] pub fn new( purgatory: Arc, database: SharedDatabase, @@ -251,8 +240,6 @@ impl RealSyncContext { our_domain: Option, local_relay: Option, git_naughty_list: Arc, - repo_sync_index: Option, - sync_action_tx: Option>, ) -> Self { Self { purgatory, @@ -261,23 +248,9 @@ impl RealSyncContext { our_domain_value: our_domain, local_relay, git_naughty_list, - repo_sync_index, - sync_action_tx, } } - /// Set the sync action sender for triggering filter recomputation after announcement promotion. - /// - /// When an announcement is promoted from purgatory to Full sync level, the SyncManager - /// needs to subscribe to PR events for that repo on all connected relays. This sender - /// is used to trigger that subscription. - pub fn set_sync_action_tx( - &mut self, - tx: tokio::sync::mpsc::Sender, - ) { - self.sync_action_tx = Some(tx); - } - /// Get reference to the git naughty list tracker pub fn git_naughty_list(&self) -> &Arc { &self.git_naughty_list @@ -509,98 +482,9 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, - self.repo_sync_index.clone(), ) .await?; - // If announcements were promoted (now Full sync level), notify SyncManager to - // recompute filters so PR event subscriptions are created on connected relays. - if result.announcements_released > 0 { - if let (Some(ref tx), Some(ref repo_sync_index)) = - (&self.sync_action_tx, &self.repo_sync_index) - { - let index = repo_sync_index.read().await; - for (repo_id, needs) in index.iter() { - if needs.sync_level == crate::sync::SyncLevel::Full - && !needs.root_events.is_empty() - { - // Send AddFilters for Full repos with root events - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &needs.root_events, - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: needs.root_events.clone(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters after announcement promotion" - ); - } else { - debug!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager after announcement promotion" - ); - } - } - } else if needs.sync_level == crate::sync::SyncLevel::Full { - // Even without root_events, send empty repo filter to ensure - // Layer 2 subscriptions (PR events) are set up - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &std::collections::HashSet::new(), - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters (no root_events) after announcement promotion" - ); - } - } - } - } - } - } - // Convert from git::sync::ProcessResult to our ProcessResult Ok(ProcessResult { states_released: result.states_released, -- cgit v1.2.3 From a804164468d3beafb243ece12555b4d1692a075d Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:28:44 +0000 Subject: Revert "fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery" This reverts commit 806936e7d1aab5dfd0c2ad6b98a115122dc1785c. --- src/sync/mod.rs | 115 ++++---------------------------------------------------- 1 file changed, 7 insertions(+), 108 deletions(-) (limited to 'src') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6ab8d33..519017b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -557,13 +557,6 @@ pub struct SyncManager { /// Purgatory for read-only access to events awaiting git data purgatory: Arc, /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) - // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters - // actions when user-submitted purgatory announcements need to trigger relay discovery. - /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) - #[allow(dead_code)] - action_tx: Option>, - /// Receiver for AddFilters actions (taken by run() when the event loop starts) - action_rx: Option>, local_relay: LocalRelay, /// Configuration reference for sync settings config: Config, @@ -650,11 +643,6 @@ impl SyncManager { } } - // Create action channel upfront so callers (e.g. write policy) can send AddFilters - // actions before run() is called (e.g. when user-submitted purgatory announcements - // need to trigger relay discovery). - let (action_tx, action_rx) = tokio::sync::mpsc::channel::(100); - Self { bootstrap_relay_url, service_domain, @@ -675,22 +663,9 @@ impl SyncManager { connect_tx: None, shutdown_tx: None, metrics: sync_metrics, - action_tx: Some(action_tx), - action_rx: Some(action_rx), } } - /// Get a clone of the action sender for external use. - /// - /// This allows the write policy to send AddFilters actions to the SyncManager - /// when user-submitted purgatory announcements need to trigger relay discovery. - /// - /// # Returns - /// Clone of the action sender, or None if the channel was never created. - pub fn action_tx(&self) -> Option> { - self.action_tx.clone() - } - /// Generate a unique batch ID /// /// Increments the internal counter and returns the new value. @@ -711,17 +686,6 @@ impl SyncManager { self.rejected_events_index.clone() } - /// Get a clone of the repo sync index. - /// - /// This allows access to the repo sync index for upgrading sync levels - /// when announcements are promoted from purgatory. - /// - /// # Returns - /// Clone of the repo sync index (Arc>) - pub fn repo_sync_index(&self) -> RepoSyncIndex { - self.repo_sync_index.clone() - } - /// Save rejected events index to disk. /// /// This is called during shutdown to persist the rejected events cache, @@ -985,31 +949,11 @@ impl SyncManager { // Drop the lock before async operations drop(pending); - // Create REQ+EOSE subscriptions using sync-level-aware filters. + // Create REQ+EOSE subscriptions using original semantic filters // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail. - // - // CRITICAL: Use build_sync_level_aware_filters to avoid generating - // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements - // are still in purgatory. If we send Layer 2 filters too early, the - // remote relay may return PR events that our write policy rejects as - // "orphan" (no promoted repo). nostr-sdk deduplication then silently - // drops the event on retry, making it permanently unavailable. - let (full_repos, state_only_repos) = { - let index = self.repo_sync_index.read().await; - let mut full = HashSet::new(); - let mut state_only = HashSet::new(); - for repo_id in &batch_repos { - match index.get(repo_id).map(|n| n.sync_level) { - Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } - _ => { full.insert(repo_id.clone()); } - } - } - (full, state_only) - }; - let fallback_filters = filters::build_sync_level_aware_filters( - &full_repos, - &state_only_repos, + // succeed even when ID-based queries fail + let fallback_filters = filters::build_layer2_and_layer3_filters( + &batch_repos, &batch_root_events, None, ); @@ -1093,20 +1037,8 @@ impl SyncManager { pending.remove(&relay_url_for_fallback); } drop(pending); - let is_generic_filter = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - - // Trigger filter recomputation for generic filter batches - if is_generic_filter { - tracing::info!( - relay = %relay_url_for_fallback, - "Announcement batch complete (fallback path) - triggering filter recomputation" - ); - self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) - .await; - } } } return; @@ -1204,20 +1136,8 @@ impl SyncManager { pending.remove(&relay_url_for_retry); } drop(pending); - let is_generic_filter = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - - // Trigger filter recomputation for generic filter batches - if is_generic_filter { - tracing::info!( - relay = %relay_url_for_retry, - "Announcement batch complete (retry path) - triggering filter recomputation" - ); - self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) - .await; - } } } return; @@ -1238,20 +1158,7 @@ impl SyncManager { drop(pending); // 4. Confirm the batch (moves items to RelayState) - let is_generic_filter = - completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter batches (announcements), trigger filter recomputation - // to subscribe to state events for purgatory announcements that were registered - // during event processing. - if is_generic_filter { - tracing::info!( - relay = %relay_url, - "Announcement batch complete - triggering filter recomputation for purgatory repos" - ); - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1530,16 +1437,8 @@ impl SyncManager { "SyncManager starting" ); - // 1. Take action channel receiver (created in new()) - sender is shared with write policy - let mut action_rx = self - .action_rx - .take() - .expect("action_rx should be set in new()"); - // Get a clone of action_tx for the self-subscriber - let action_tx_for_subscriber = self - .action_tx - .clone() - .expect("action_tx should be set in new()"); + // 1. Create action channel for self-subscriber -> manager communication + let (action_tx, mut action_rx) = mpsc::channel::(100); // 2. Create disconnect channel for spawned tasks -> manager communication let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); @@ -1558,7 +1457,7 @@ impl SyncManager { format!("ws://{}", self.config.bind_address), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), - action_tx_for_subscriber, + action_tx, ); let subscriber_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); -- cgit v1.2.3 From e22021f0b248ebcf3bd09210d59b2cdb4701032f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:41:29 +0000 Subject: fix: simplify purgatory sync - fix SelfSubscriber sync_level upgrade and negentropy fallback Three targeted fixes for purgatory announcement sync: 1. SelfSubscriber sync_level upgrade: After or_insert_with in process_batch, always set entry.sync_level = SyncLevel::Full so that when a promoted announcement is broadcast via notify_event and SelfSubscriber receives it, an existing StateOnly entry gets upgraded to Full and PR event subscriptions are triggered immediately (not delayed up to 24h). 2. Negentropy fallback filter split: In handle_eose, when falling back from negentropy to REQ+EOSE, split batch_repos by SyncLevel and call build_sync_level_aware_filters instead of build_layer2_and_layer3_filters. Prevents StateOnly (purgatory) repos from getting Layer 2 #a/#A/#q filters prematurely, which caused nostr-sdk client deduplication to permanently drop PR events after orphan rejection. 3. Recompute sync filters after announcement batch EOSE: Add recompute_new_sync_filters_for_relay calls at all three batch-completion paths in handle_eose for generic filter (announcement) batches. This triggers state-only subscriptions for any purgatory repos registered during that batch, fixing the 24h delay before state event sync starts. 4. User-submitted purgatory announcements: Add repo_sync_index field to PolicyContext with setter/getter, wire in main.rs after SyncManager creation, and register in AcceptPurgatory handler so user-submitted announcements get StateOnly sync started immediately. 5. Update archive tests: test_archive_without_state_events_does_not_sync_git updated to reflect that StateOnly subscription now proactively fetches state events from source relays. test_archive_read_only_creates_bare_repo un-ignored as it now works end-to-end. --- src/main.rs | 7 +++++ src/nostr/builder.rs | 54 +++++++++++++++++++++++++++++++++++++ src/nostr/policy/mod.rs | 19 +++++++++++++ src/sync/mod.rs | 66 ++++++++++++++++++++++++++++++++++++++++++--- src/sync/self_subscriber.rs | 4 +++ tests/archive_read_only.rs | 63 +++++++++++++++++++++++++++---------------- 6 files changed, 187 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index ab6ede7..ebe05a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,13 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); + // Wire repo_sync_index into write policy so user-submitted purgatory announcements + // get registered for state event sync immediately (Fix 3). + let repo_sync_index = sync_manager.repo_sync_index(); + relay_with_db + .write_policy + .set_repo_sync_index(repo_sync_index); + tokio::spawn(async move { sync_manager.run().await; }); diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..8d1e461 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -17,6 +17,7 @@ use crate::nostr::policy::{ AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; +use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; @@ -98,6 +99,14 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } + /// Set the repo sync index so that user-submitted purgatory announcements can + /// be registered for state event sync immediately. + /// + /// This must be called after SyncManager is created. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + self.ctx.set_repo_sync_index(index); + } + /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -146,6 +155,51 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); + + // Register repo in repo_sync_index with StateOnly level so that + // state event sync starts promptly via the next batch EOSE recompute. + // This handles user-submitted purgatory announcements - the SelfSubscriber + // only sees DB events, so it won't pick these up automatically. + if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { + if let Ok(announcement) = + RepositoryAnnouncement::from_event(event.clone()) + { + use std::collections::HashSet; + let repo_id = format!( + "30617:{}:{}", + event.pubkey, + announcement.identifier + ); + + // Extract relay URLs from the announcement event tags + let relays: HashSet = event + .tags + .iter() + .flat_map(|tag| { + let tag_vec = tag.as_slice(); + if !tag_vec.is_empty() && tag_vec[0] == "relays" { + tag_vec[1..].iter().map(|s| s.to_string()).collect::>() + } else { + vec![] + } + }) + .collect(); + + let mut index = repo_sync_index.write().await; + index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { + relays, + root_events: HashSet::new(), + sync_level: SyncLevel::StateOnly, + }); + drop(index); + + tracing::debug!( + repo_id = %repo_id, + "Registered purgatory announcement in repo_sync_index as StateOnly" + ); + } + } + WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..c958586 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; +use crate::sync::RepoSyncIndex; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -34,6 +35,8 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, + /// Repo sync index for registering purgatory announcements (set after SyncManager creation) + pub repo_sync_index: Arc>>, } impl PolicyContext { @@ -51,6 +54,7 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, + repo_sync_index: Arc::new(std::sync::RwLock::new(None)), } } @@ -68,4 +72,19 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } + + /// Set the repo sync index after SyncManager has been created. + /// + /// This allows purgatory announcements submitted by users to be registered + /// in the sync index so state event sync starts promptly. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + let mut guard = self.repo_sync_index.write().unwrap(); + *guard = Some(index); + } + + /// Get a clone of the repo sync index if it has been set. + pub fn get_repo_sync_index(&self) -> Option { + let guard = self.repo_sync_index.read().unwrap(); + guard.clone() + } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..916e2b0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -700,6 +700,14 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } + /// Get a clone of the repo sync index Arc. + /// + /// This allows the write policy to register user-submitted purgatory announcements + /// in the sync index so that state event sync starts promptly. + pub fn repo_sync_index(&self) -> RepoSyncIndex { + self.repo_sync_index.clone() + } + /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -951,9 +959,29 @@ impl SyncManager { // Create REQ+EOSE subscriptions using original semantic filters // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail - let fallback_filters = filters::build_layer2_and_layer3_filters( - &batch_repos, + // succeed even when ID-based queries fail. + // Split batch_repos by SyncLevel to avoid sending Layer 2 filters + // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be + // rejected as orphan and then silently dropped by nostr-sdk deduplication. + let (full_repos, state_only_repos) = { + let repo_index = self.repo_sync_index.read().await; + let mut full = HashSet::new(); + let mut state_only = HashSet::new(); + for repo_ref in &batch_repos { + match repo_index.get(repo_ref).map(|n| n.sync_level) { + Some(SyncLevel::StateOnly) => { + state_only.insert(repo_ref.clone()); + } + _ => { + full.insert(repo_ref.clone()); + } + } + } + (full, state_only) + }; + let fallback_filters = filters::build_sync_level_aware_filters( + &full_repos, + &state_only_repos, &batch_root_events, None, ); @@ -1033,12 +1061,24 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_fallback, + ) + .await; + } } } return; @@ -1132,12 +1172,24 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_retry, + ) + .await; + } } } return; @@ -1148,6 +1200,8 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); + let is_generic = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1159,6 +1213,12 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; + + // 5. For generic filter (announcement) batches, recompute sync filters so any + // purgatory repos registered during this batch get state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay(relay_url).await; + } } /// Confirm a completed batch by moving items to RelayState diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index db16c62..70c3dbf 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -478,6 +478,10 @@ impl SelfSubscriber { root_events: HashSet::new(), sync_level: SyncLevel::Full, }); + // Upgrade sync_level to Full - this handles the case where the entry + // already exists as StateOnly (purgatory announcement) and is now being + // promoted (git data arrived and the event was broadcast via notify_event). + entry.sync_level = SyncLevel::Full; entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs index e388ae5..069b3b7 100644 --- a/tests/archive_read_only.rs +++ b/tests/archive_read_only.rs @@ -55,7 +55,6 @@ use std::time::Duration; /// 5. Verify bare repository is created and git data is synced /// 6. Verify git pushes are rejected (read-only mode) #[tokio::test] -#[ignore] // Requires SyncLevel implementation (Phase 3) - purgatory announcements don't trigger per-repo sync yet async fn test_archive_read_only_creates_bare_repo() { // 1. Start source relay let source_relay = TestRelay::start().await; @@ -264,24 +263,24 @@ async fn test_archive_read_only_creates_bare_repo() { source_relay.stop().await; } -/// Test that archive mode without state events does NOT sync git data. +/// Test that archive mode proactively syncs state events and git data +/// when the source relay has state events available. /// -/// This verifies the security model: archive mode only syncs git data -/// when there are state events to validate against. +/// With StateOnly sync now implemented, purgatory announcements subscribe +/// to state events from the relays listed in the announcement. This means +/// the archive relay will: +/// 1. Sync the announcement → purgatory → register as StateOnly in repo_sync_index +/// 2. Subscribe to state events (kind 30618) on source relay +/// 3. Receive the state event → purgatory sync triggered +/// 4. Fetch git data from source relay's clone URL /// -/// With announcement purgatory, the flow is: -/// 1. Send announcement to source relay (goes to purgatory) -/// 2. Send state event to source relay (goes to purgatory) -/// 3. Push git data to source relay (promotes announcement and state event) -/// 4. Start archive relay with sync from source -/// 5. Archive relay syncs the promoted announcement -/// 6. Verify git data is NOT synced (archive has no state event to authorize git fetch) +/// This test verifies the full sync chain works end-to-end for archive mode. #[tokio::test] -async fn test_archive_without_state_events_does_not_sync_git() { +async fn test_archive_syncs_state_events_and_git_data_via_state_only_subscription() { // 1. Start source relay let source_relay = TestRelay::start().await; let keys = Keys::generate(); - let identifier = "archive-no-state-repo"; + let identifier = "archive-state-only-sync-repo"; // Pre-allocate archive relay port let archive_port = TestRelay::find_free_port(); @@ -295,6 +294,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { let npub = keys.public_key().to_bech32().expect("Failed to get npub"); // 3. Create and send announcement listing BOTH relays + // The archive relay will subscribe to state events on BOTH listed relays let announcement = create_repo_announcement( &keys, &[&source_relay.domain(), &archive_domain], @@ -337,6 +337,8 @@ async fn test_archive_without_state_events_does_not_sync_git() { ) .expect("Failed to create state event"); + let state_event_id = state_event.id; + source_client .send_event(&state_event) .await @@ -348,9 +350,12 @@ async fn test_archive_without_state_events_does_not_sync_git() { push_to_relay(temp_dir.path(), &source_relay.domain(), &npub, identifier) .expect("Push to source should succeed"); - tokio::time::sleep(Duration::from_millis(500)).await; + // Wait for state event to be promoted on source relay + wait_for_event_served(source_relay.url(), &state_event_id, Duration::from_secs(5)) + .await + .expect("State event should be served on source relay after push"); - // 6. Start archive relay (without state event - we don't send state event to archive) + // 6. Start archive relay - StateOnly subscription will proactively fetch state events let archive_relay = TestRelay::start_with_archive_and_sync( archive_port, Some(source_relay.url().to_string()), @@ -360,15 +365,28 @@ async fn test_archive_without_state_events_does_not_sync_git() { ) .await; - // Wait for sync + // Wait for sync connection wait_for_sync_connection(archive_relay.url(), 1, Duration::from_secs(5)) .await .expect("Sync connection should establish"); - // Give time for sync to fetch announcement - tokio::time::sleep(Duration::from_secs(3)).await; + // 7. Wait for state event to be served on archive relay + // The StateOnly subscription fetches the state event from source relay, + // which then triggers purgatory sync and git data fetch. + let found = wait_for_event_served( + archive_relay.url(), + &state_event_id, + Duration::from_secs(30), // Allow time for sync + git fetch + ) + .await; + + assert!( + found.is_ok(), + "State event should be served on archive after StateOnly subscription fetches it: {:?}", + found.err() + ); - // 7. Verify bare repository was created (announcement was synced and accepted to purgatory) + // 8. Verify bare repository was created let repo_path = archive_relay .git_data_path() .join(format!("{}/{}.git", npub, identifier)); @@ -378,8 +396,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { "Bare repository should be created for archive announcement" ); - // 8. Verify git data was NOT synced (no state events on archive to trigger git fetch) - // Check that the commit does NOT exist in the archive relay's repo + // 9. Verify git data was synced via the state event chain let output = tokio::process::Command::new("git") .args(["cat-file", "-t", &commit_hash]) .current_dir(&repo_path) @@ -389,8 +406,8 @@ async fn test_archive_without_state_events_does_not_sync_git() { let commit_exists = output.map(|o| o.status.success()).unwrap_or(false); assert!( - !commit_exists, - "Git data should NOT be synced without state events (security: validates against Nostr state)" + commit_exists, + "Git data should be synced via StateOnly subscription → state event → git fetch chain" ); // Cleanup -- cgit v1.2.3 From ee113a654e2971a6ebdb07398cc5638dbe59b48c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 20:32:13 +0000 Subject: fix: replace repo_sync_index wiring with purgatory announcement sync timer Instead of threading repo_sync_index through PolicyContext/builder.rs/main.rs to handle user-submitted purgatory announcements, add a simple background timer (run_purgatory_announcement_sync, every 5s) that scans the purgatory for announcement entries and registers them in repo_sync_index as StateOnly. This is simpler and covers both flows: - Sync-path announcements: inline registration still happens during event processing (sync/mod.rs:1839+), timer provides a safety net - User-submitted announcements: SelfSubscriber never sees them (rejected from DB), timer is the primary registration path The timer calls sync_purgatory_announcements_to_index() which: 1. Snapshots purgatory via new announcements_for_sync() public method 2. Or_inserts StateOnly entries (never downgrades Full entries) 3. Detects newly added relay URLs and calls handle_new_sync_filters to connect and subscribe - fixing the failing test that expected relay discovery from a user-submitted purgatory announcement Removes: repo_sync_index field from PolicyContext, set/get_repo_sync_index methods, set_repo_sync_index on Nip34WritePolicy, wiring in main.rs, and the inline AcceptPurgatory registration block in builder.rs. --- src/main.rs | 7 --- src/nostr/builder.rs | 54 +-------------------- src/nostr/policy/mod.rs | 19 -------- src/purgatory/mod.rs | 17 +++++++ src/sync/mod.rs | 125 ++++++++++++++++++++++++++++++++++++++++++++---- 5 files changed, 134 insertions(+), 88 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index ebe05a3..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,13 +132,6 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); - // Wire repo_sync_index into write policy so user-submitted purgatory announcements - // get registered for state event sync immediately (Fix 3). - let repo_sync_index = sync_manager.repo_sync_index(); - relay_with_db - .write_policy - .set_repo_sync_index(repo_sync_index); - tokio::spawn(async move { sync_manager.run().await; }); diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 8d1e461..c2d4939 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -17,7 +17,7 @@ use crate::nostr::policy::{ AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; -use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; + /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; @@ -99,14 +99,6 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } - /// Set the repo sync index so that user-submitted purgatory announcements can - /// be registered for state event sync immediately. - /// - /// This must be called after SyncManager is created. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - self.ctx.set_repo_sync_index(index); - } - /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -156,50 +148,6 @@ impl Nip34WritePolicy { event_id_str ); - // Register repo in repo_sync_index with StateOnly level so that - // state event sync starts promptly via the next batch EOSE recompute. - // This handles user-submitted purgatory announcements - the SelfSubscriber - // only sees DB events, so it won't pick these up automatically. - if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { - if let Ok(announcement) = - RepositoryAnnouncement::from_event(event.clone()) - { - use std::collections::HashSet; - let repo_id = format!( - "30617:{}:{}", - event.pubkey, - announcement.identifier - ); - - // Extract relay URLs from the announcement event tags - let relays: HashSet = event - .tags - .iter() - .flat_map(|tag| { - let tag_vec = tag.as_slice(); - if !tag_vec.is_empty() && tag_vec[0] == "relays" { - tag_vec[1..].iter().map(|s| s.to_string()).collect::>() - } else { - vec![] - } - }) - .collect(); - - let mut index = repo_sync_index.write().await; - index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { - relays, - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - drop(index); - - tracing::debug!( - repo_id = %repo_id, - "Registered purgatory announcement in repo_sync_index as StateOnly" - ); - } - } - WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index c958586..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; -use crate::sync::RepoSyncIndex; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -35,8 +34,6 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, - /// Repo sync index for registering purgatory announcements (set after SyncManager creation) - pub repo_sync_index: Arc>>, } impl PolicyContext { @@ -54,7 +51,6 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, - repo_sync_index: Arc::new(std::sync::RwLock::new(None)), } } @@ -72,19 +68,4 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } - - /// Set the repo sync index after SyncManager has been created. - /// - /// This allows purgatory announcements submitted by users to be registered - /// in the sync index so state event sync starts promptly. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - let mut guard = self.repo_sync_index.write().unwrap(); - *guard = Some(index); - } - - /// Get a clone of the repo sync index if it has been set. - pub fn get_repo_sync_index(&self) -> Option { - let guard = self.repo_sync_index.read().unwrap(); - guard.clone() - } } diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 3b5514b..1894738 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -680,6 +680,23 @@ impl Purgatory { self.announcement_purgatory.len() } + /// Collect (repo_id, relay_urls) for all announcements currently in purgatory. + /// + /// Returns a vec of `(repo_id, relay_urls)` where `repo_id` is the addressable + /// coordinate string `"30617:{pubkey_hex}:{identifier}"`. Used by the purgatory + /// announcement sync timer to register StateOnly entries in `repo_sync_index`. + pub fn announcements_for_sync(&self) -> Vec<(String, HashSet)> { + self.announcement_purgatory + .iter() + .map(|entry| { + let (owner, identifier) = entry.key(); + let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); + let relays = entry.value().relays.clone(); + (repo_id, relays) + }) + .collect() + } + /// Get all event IDs currently stored in purgatory AND previously expired events. /// /// Returns a HashSet of all event IDs for: diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 916e2b0..ed5b6e7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -397,6 +397,37 @@ async fn run_daily_timer( } } +/// Background task that periodically syncs purgatory announcements into repo_sync_index. +/// +/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there +/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` +/// which connects to the relay URLs listed in the announcement and subscribes to state +/// events (kind 30618). +/// +/// This covers two cases: +/// - Sync-path announcements: registered inline during event processing, but this +/// provides a safety net in case the inline registration was missed. +/// - User-submitted purgatory announcements: the SelfSubscriber never sees them +/// (they're rejected from DB), so this timer is the primary registration path. +async fn run_purgatory_announcement_sync( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { + let interval = Duration::from_secs(5); + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + let mut manager = sync_manager.lock().await; + manager.sync_purgatory_announcements_to_index().await; + } + _ = shutdown_rx.recv() => { + tracing::debug!("Purgatory announcement sync timer received shutdown signal"); + break; + } + } + } +} + // Combined Health and Metrics Checker /// Background task for cleaning up expired entries from the rejected events index @@ -700,14 +731,6 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } - /// Get a clone of the repo sync index Arc. - /// - /// This allows the write policy to register user-submitted purgatory announcements - /// in the sync index so that state event sync starts promptly. - pub fn repo_sync_index(&self) -> RepoSyncIndex { - self.repo_sync_index.clone() - } - /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -1560,7 +1583,17 @@ impl SyncManager { run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; }); - // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications + // 11. Spawn purgatory announcement sync timer (every 5s) + // Ensures purgatory announcements (including user-submitted ones that never + // touch the DB) are registered in repo_sync_index as StateOnly so that + // state event subscriptions are established on their listed relay URLs. + let purgatory_sync_manager = Arc::clone(&sync_manager); + let purgatory_sync_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { + run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await; + }); + + // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications loop { // Wait for an event without holding the lock tokio::select! { @@ -2419,6 +2452,80 @@ impl SyncManager { } } + /// Sync purgatory announcements into repo_sync_index as StateOnly entries. + /// + /// Called periodically by the purgatory announcement sync timer (every 5s). + /// For each announcement currently in purgatory, ensures a `StateOnly` entry + /// exists in `repo_sync_index`. New entries are then picked up by + /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes + /// to state events for that repo. + /// + /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full). + async fn sync_purgatory_announcements_to_index(&mut self) { + use crate::sync::algorithms::{compute_actions, derive_relay_targets}; + + // Collect all purgatory announcements (snapshot - no async holds) + let announcements = self.purgatory.announcements_for_sync(); + + if announcements.is_empty() { + return; + } + + // Register any new entries in repo_sync_index as StateOnly + let mut new_relay_urls: std::collections::HashSet = std::collections::HashSet::new(); + { + let mut index = self.repo_sync_index.write().await; + for (repo_id, relays) in &announcements { + let entry = index.entry(repo_id.clone()).or_insert_with(|| { + tracing::debug!( + repo_id = %repo_id, + "Registering purgatory announcement in repo_sync_index as StateOnly" + ); + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + // Don't downgrade an already-Full entry + // Add any new relay URLs + for relay in relays { + if entry.relays.insert(relay.clone()) { + new_relay_urls.insert(relay.clone()); + } + } + } + } + + if new_relay_urls.is_empty() { + return; + } + + // For any relay URLs that are new, compute and send AddFilters actions + let all_targets = { + let repo_index = self.repo_sync_index.read().await; + derive_relay_targets(&repo_index) + }; + + let actions = { + let pending_index = self.pending_sync_index.read().await; + let relay_index = self.relay_sync_index.read().await; + compute_actions(&all_targets, &pending_index, &relay_index) + }; + + for action in actions { + // Only act on relays that have new URLs (avoids redundant work) + if new_relay_urls.contains(&action.relay_url) { + tracing::info!( + relay = %action.relay_url, + repos = action.items.repos.len(), + "Purgatory sync timer: connecting to new relay from purgatory announcement" + ); + self.handle_new_sync_filters(action).await; + } + } + } + /// Handle a relay disconnection /// /// This method is called when the event loop terminates and sends a disconnect notification. -- cgit v1.2.3 From 70749ea9df1f6061c332112c617b615f91d79d48 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 11:17:10 +0000 Subject: fix: re-process hot-cache maintainer announcements after git push promotion When an owner announcement is promoted from purgatory via a git push, any maintainer announcements sitting in the rejected_events_index hot cache were never re-processed. The invalidate_and_get call only existed in SyncManager::process_event_static (the nostr sync path); the git push promotion path (http -> handlers -> git::sync) had no access to the rejected_events_index at all. Thread rejected_events_index and write_policy through the git push path: - process_purgatory_announcements: after saving the promoted announcement, parse its maintainers tag and call invalidate_and_get() for each, then re-process any returned hot-cache events via admit_event + save - process_newly_available_git_data: accept optional write_policy and rejected_events_index, pass them through to process_purgatory_announcements - handle_receive_pack: accept Arc and Arc, pass them to process_newly_available_git_data - HttpService / run_server: carry the two new fields, clone into each handle_receive_pack call - main.rs: obtain rejected_events_index from sync_manager before moving it into its task; wrap write_policy in Arc for the HTTP server - RealSyncContext::process_newly_available_git_data: pass None for both new params (purgatory sync path already handles this via SyncManager::process_event_static) Also rewrite the maintainer_reprocessing integration tests to correctly exercise the hot-cache path now that announcements require git data before being released from purgatory: - Start relay_b with relay_a as bootstrap so its SyncManager syncs maintainer announcements via negentropy before the owner git push - Use push_unique_git_data_to_relay (new helper) to give each maintainer a distinct commit hash, preventing git from skipping pack transfer - Make wait_for_event_on_relay poll in a retry loop so transient timing gaps between DB write and query do not cause false negatives --- src/git/handlers.rs | 7 +- src/git/sync.rs | 113 +++++++++++++++- src/http/mod.rs | 22 +++- src/main.rs | 7 + src/purgatory/sync/context.rs | 6 +- tests/sync/maintainer_reprocessing.rs | 235 +++++++++++++++++++++------------- 6 files changed, 298 insertions(+), 92 deletions(-) (limited to 'src') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..13d6ba0 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess; use crate::git::authorization::{authorize_push, parse_pushed_refs}; use crate::git::sync::process_newly_available_git_data; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// Handle GET /info/refs?service=git-{upload,receive}-pack /// @@ -195,6 +196,8 @@ pub async fn handle_receive_pack( purgatory: Arc, git_data_path: &str, git_protocol: Option<&str>, + write_policy: Arc, + rejected_events_index: Arc, ) -> Result>, GitError> { debug!("Handling receive-pack for {:?}", repo_path); @@ -307,6 +310,8 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + Some(&write_policy), + Some(&rejected_events_index), ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index 4b35023..8401736 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -32,6 +32,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::process::Command; +use std::sync::Arc; use tracing::{debug, info, warn}; use nostr_sdk::Event; @@ -41,9 +42,10 @@ use crate::git::authorization::{ RepositoryData, }; use crate::git::{self, oid_exists}; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; +use crate::sync::rejected_index::RejectedEventsIndex; /// Result of processing newly available git data. /// @@ -819,6 +821,8 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -848,6 +852,8 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, + write_policy, + rejected_events_index, ) .await; result.merge(announcement_result); @@ -1277,6 +1283,10 @@ async fn process_purgatory_pr_events( /// /// When git data arrives for a repository, any announcements in purgatory /// for that repository should be promoted to the database and served to clients. +/// +/// When `write_policy` and `rejected_events_index` are provided (git push path), +/// any maintainer announcements sitting in the hot cache are re-processed immediately +/// after the owner announcement is promoted, so they don't wait for the next sync cycle. async fn process_purgatory_announcements( identifier: &str, source_repo_path: &Path, @@ -1284,6 +1294,8 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1339,6 +1351,105 @@ async fn process_purgatory_announcements( } result.announcements_released += 1; + + // Re-process any maintainer announcements sitting in the hot cache. + // + // When an owner announcement is promoted from purgatory via a git push, + // maintainer announcements that arrived earlier (via relay sync) may have + // been rejected and stored in the hot cache because the owner announcement + // didn't exist in the DB yet. Now that the owner announcement is saved, + // we must invalidate and re-process those cached events immediately. + // + // This only applies on the git push path (write_policy + rejected_events_index + // are Some). The purgatory sync path already handles this via + // SyncManager::process_event_static. + if let (Some(wp), Some(rei), Some(relay)) = + (write_policy, rejected_events_index, local_relay) + { + use crate::nostr::events::RepositoryAnnouncement; + use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { + if !announcement.maintainers.is_empty() { + debug!( + identifier = %identifier, + event_id = %event.id, + maintainer_count = announcement.maintainers.len(), + "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements" + ); + + for maintainer_hex in &announcement.maintainers { + match nostr_sdk::PublicKey::from_hex(maintainer_hex) { + Ok(maintainer_pubkey) => { + let (removed, hot_events) = rei.invalidate_and_get( + &maintainer_pubkey, + &announcement.identifier, + Some(crate::sync::rejected_index::EventType::Announcement), + ); + + if removed > 0 { + info!( + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + removed_from_cold_index = removed, + hot_cache_events = hot_events.len(), + "Invalidated rejected maintainer announcements after git push promotion" + ); + } + + // Re-process events from hot cache + let dummy_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 0, + ); + for hot_event in hot_events { + info!( + event_id = %hot_event.id, + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + "Re-processing maintainer announcement from hot cache after git push promotion" + ); + match wp.admit_event(&hot_event, &dummy_addr).await { + WritePolicyResult::Accept => { + match database.save_event(&hot_event).await { + Ok(_) => { + relay.notify_event(hot_event.clone()); + info!( + event_id = %hot_event.id, + "Maintainer announcement accepted and saved on re-processing" + ); + } + Err(e) => { + warn!( + event_id = %hot_event.id, + error = %e, + "Failed to save re-processed maintainer announcement" + ); + } + } + } + _ => { + warn!( + event_id = %hot_event.id, + "Maintainer announcement still rejected on re-processing" + ); + } + } + } + } + Err(e) => { + warn!( + maintainer_hex = %maintainer_hex, + error = %e, + "Invalid maintainer public key in promoted announcement" + ); + } + } + } + } + } + } } Err(e) => { warn!( diff --git a/src/http/mod.rs b/src/http/mod.rs index ffb1562..cfd7c52 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -26,8 +26,9 @@ use tokio::net::TcpListener; use crate::config::Config; use crate::git; use crate::metrics::Metrics; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// CORS headers required by GRASP-01 specification (lines 40-47) const CORS_ALLOW_ORIGIN: &str = "*"; @@ -97,6 +98,10 @@ struct HttpService { metrics: Option>, /// Purgatory for event/git coordination purgatory: Arc, + /// Write policy for re-processing hot-cache events after git push promotion + write_policy: Arc, + /// Rejected events index for hot-cache re-processing after git push promotion + rejected_events_index: Arc, } impl HttpService { @@ -107,6 +112,8 @@ impl HttpService { database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> Self { Self { relay, @@ -115,6 +122,8 @@ impl HttpService { database, metrics, purgatory, + write_policy, + rejected_events_index, } } } @@ -132,6 +141,8 @@ impl Service> for HttpService { let git_data_path = self.config.effective_git_data_path(); let database = self.database.clone(); let purgatory = self.purgatory.clone(); + let write_policy = self.write_policy.clone(); + let rejected_events_index = self.rejected_events_index.clone(); // Handle OPTIONS preflight requests (CORS) // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content @@ -293,6 +304,8 @@ impl Service> for HttpService { purgatory.clone(), &git_data_path, git_protocol.as_deref(), + write_policy.clone(), + rejected_events_index.clone(), ) .await; @@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String { /// * `relay` - The LocalRelay for WebSocket connections /// * `database` - The database for direct queries (e.g., push authorization) /// * `metrics` - Optional metrics for Prometheus endpoint +/// * `purgatory` - Purgatory for event/git coordination +/// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion +/// * `rejected_events_index` - Rejected events index for hot-cache re-processing pub async fn run_server( config: Config, relay: LocalRelay, database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> anyhow::Result<()> { let bind_addr: SocketAddr = config.bind_address.parse()?; @@ -582,6 +600,8 @@ pub async fn run_server( database.clone(), metrics.clone(), purgatory.clone(), + write_policy.clone(), + rejected_events_index.clone(), ); tokio::spawn(async move { diff --git a/src/main.rs b/src/main.rs index ab6ede7..6769cf3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,7 +130,9 @@ async fn main() -> Result<()> { } // Get a reference to the rejected events index for shutdown persistence + // and for the HTTP server's git push path (hot-cache re-processing) let shutdown_rejected_index = sync_manager.rejected_events_index(); + let http_rejected_index = shutdown_rejected_index.clone(); tokio::spawn(async move { sync_manager.run().await; @@ -206,6 +208,9 @@ async fn main() -> Result<()> { // Start HTTP server with integrated relay and database info!("Starting HTTP server on {}", config.bind_address); + // Wrap write_policy in Arc for sharing between HTTP server connections + let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); + // Run server until shutdown signal, then cleanup tokio::select! { result = http::run_server( @@ -214,6 +219,8 @@ async fn main() -> Result<()> { relay_with_db.database, metrics, purgatory, + http_write_policy, + http_rejected_index, ) => { result? } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..ece8cd6 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -474,7 +474,9 @@ impl SyncContext for RealSyncContext { source_repo_path: &Path, new_oids: &HashSet, ) -> Result { - // Delegate to the unified function from git::sync + // Delegate to the unified function from git::sync. + // Pass None for write_policy and rejected_events_index: the purgatory sync path + // already handles hot-cache re-processing via SyncManager::process_event_static. let result = crate::git::sync::process_newly_available_git_data( source_repo_path, new_oids, @@ -482,6 +484,8 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, + None, + None, ) .await?; diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 266a437..61d8e14 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -2,51 +2,61 @@ //! //! Tests the two-tier rejected events index and immediate re-processing of //! maintainer announcements when owner announcements are accepted. +//! +//! ## Test design +//! +//! Announcements now require git data before they are released from purgatory and +//! served to other relays. The hot-cache re-processing path we want to exercise is: +//! +//! relay_b syncs maintainer announcement from relay_a +//! → write policy rejects it (no owner announcement in DB yet) +//! → event stored in hot cache +//! owner git push to relay_b promotes owner announcement from purgatory +//! → our new code calls rejected_events_index.invalidate_and_get() +//! → maintainer announcement re-processed and accepted +//! +//! To guarantee the maintainer announcements arrive at relay_b *before* the owner +//! git push, relay_b is started with relay_a as its bootstrap relay. That way +//! relay_b's SyncManager connects to relay_a immediately and syncs whatever is +//! already in relay_a's DB. We push the maintainer git data first (so the +//! announcements are in relay_a's DB), wait briefly for the sync round-trip, then +//! send the owner announcement + git push. use std::time::Duration; use nostr_sdk::prelude::*; -use crate::common::{ - sync_helpers::*, - TestRelay, -}; +use crate::common::{sync_helpers::*, TestRelay}; -/// Test that maintainer announcements are re-processed immediately when owner announcement accepted +/// Test that a maintainer announcement is re-processed immediately when the owner +/// announcement is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Maintainer sends announcement (gets rejected - doesn't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + maintainer) -/// 3. relay_b syncs from relay_a, maintainer announcement enters rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes maintainer announcement +/// 1. relay_a: Maintainer sends announcement + git data → accepted into relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs maintainer announcement +/// → rejected by write policy (no owner in DB) → stored in hot cache +/// 3. relay_b: Owner sends announcement → purgatory (no git data yet) +/// 4. relay_b: Owner git push → owner announcement promoted from purgatory +/// → hot-cache re-processing fires → maintainer announcement accepted /// 5. Both announcements should be in relay_b's database -/// -/// Expected time: <5 seconds (vs 24 hours without hot cache) #[tokio::test] async fn test_maintainer_announcement_reprocessed_immediately() { // Start relay_a (where maintainer announcement will be sent) let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer_keys = Keys::generate(); - let identifier = "test-repo"; - let start = std::time::Instant::now(); - - // Step 1: Send maintainer announcement to relay_a (will be rejected by relay_b - doesn't list relay_b) - // Use HTTP clone URL pointing to relay_a's git endpoint so it can be released from purgatory + // Step 1: Send maintainer announcement to relay_a then push git data so it lands in + // relay_a's DB. The announcement lists relay_a only (not relay_b), so relay_b's write + // policy will reject it when it arrives via sync. let maintainer_npub = maintainer_keys .public_key() .to_bech32() .expect("Failed to get npub"); - let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -60,27 +70,33 @@ async fn test_maintainer_announcement_reprocessed_immediately() { identifier )], ), - Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), + Tag::custom( + TagKind::custom("relays"), + vec![relay_a.url().to_string()], + ), ]) .sign_with_keys(&maintainer_keys) .unwrap(); + send_to_relay(&relay_a, &maintainer_announcement).await.unwrap(); + let _git_dir_maintainer = + push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()]) + .await; + println!("✓ Maintainer announcement + git data pushed to relay_a"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // relay_b's initial negentropy sync will pick up the maintainer announcement and reject it + // (no owner announcement in relay_b's DB yet), storing it in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); - send_to_relay(&relay_a, &maintainer_announcement) - .await - .unwrap(); - println!("✓ Maintainer announcement sent to relay_a"); - - // Push git data for maintainer's repo to relay_a → releases maintainer announcement from purgatory - let _git_dir_maintainer = push_git_data_to_relay( - &relay_a, - &maintainer_keys, - identifier, - &[&relay_a.domain()], - ) - .await; - println!("✓ Maintainer git data pushed to relay_a (announcement released from purgatory)"); - - // Step 2: Set up owner announcement on relay_b (lists relay_a + maintainer) with git data + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcement should be in hot cache)"); + + let start = std::time::Instant::now(); + + // Step 3: Send owner announcement to relay_b → goes to purgatory (no git data yet). + // The announcement lists relay_a + relay_b and names the maintainer. let owner_npub = owner_keys .public_key() .to_bech32() @@ -111,19 +127,21 @@ async fn test_maintainer_announcement_reprocessed_immediately() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner's repo to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory, which triggers hot-cache + // re-processing of the maintainer announcement via our new code path. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)"); - // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes) - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; let elapsed = start.elapsed(); - // Step 4: Verify both announcements are in relay_b's database + // Step 6: Verify both announcements are in relay_b's database. let owner_filter = Filter::new() .kind(Kind::GitRepoAnnouncement) .author(owner_keys.public_key()) @@ -145,7 +163,6 @@ async fn test_maintainer_announcement_reprocessed_immediately() { "Maintainer announcement should be re-processed and accepted in relay_b" ); - // Step 5: Verify it happened quickly (not 24 hours!) assert!( elapsed.as_secs() < 15, "Re-processing should happen in <15 seconds, took {:?}", @@ -258,13 +275,16 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() { relay.stop().await; } -/// Test multiple maintainers are all re-processed when owner announcement accepted +/// Test that all maintainer announcements are re-processed when the owner announcement +/// is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Three maintainers send announcements (get rejected - don't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + all three maintainers) -/// 3. relay_b syncs from relay_a, all maintainer announcements enter rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes all maintainer announcements +/// 1. relay_a: Three maintainers send announcements + git data → in relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs all three maintainer +/// announcements → all rejected (no owner in DB) → all in hot cache +/// 3. relay_b: Owner sends announcement → purgatory +/// 4. relay_b: Owner git push → owner promoted → hot-cache re-processing fires for +/// all three maintainers /// 5. All four announcements should be in relay_b's database #[tokio::test] async fn test_multiple_maintainers_all_reprocessed() { @@ -272,21 +292,23 @@ async fn test_multiple_maintainers_all_reprocessed() { let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer1_keys = Keys::generate(); let maintainer2_keys = Keys::generate(); let maintainer3_keys = Keys::generate(); - let identifier = "multi-maintainer-repo"; + // Use a unique identifier per test run to avoid cross-test interference when + // tests run in parallel (each test gets its own namespace on relay_a). + let identifier = &format!( + "multi-maintainer-repo-{}", + owner_keys.public_key().to_hex()[..8].to_string() + ); - // Step 1: Send three maintainer announcements to relay_a with git data - // (purgatory requires git data before announcements are accepted) - let mut git_dirs_maintainers = Vec::new(); + // Step 1: Send each maintainer announcement to relay_a then push git data so all three + // land in relay_a's DB. Each announcement lists relay_a only, so relay_b will reject + // them when syncing (no owner announcement in relay_b's DB yet). + let mut git_dirs = Vec::new(); for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] .iter() .enumerate() @@ -295,13 +317,12 @@ async fn test_multiple_maintainers_all_reprocessed() { .public_key() .to_bech32() .expect("Failed to get npub"); - let announcement = EventBuilder::new( Kind::GitRepoAnnouncement, format!("Maintainer {} repository", idx + 1), ) .tags(vec![ - Tag::identifier(identifier), + Tag::identifier(identifier.as_str()), Tag::custom( TagKind::custom("clone"), vec![format!( @@ -315,18 +336,53 @@ async fn test_multiple_maintainers_all_reprocessed() { ]) .sign_with_keys(maintainer_keys) .unwrap(); - send_to_relay(&relay_a, &announcement).await.unwrap(); + // Use push_unique_git_data_to_relay so each maintainer gets a distinct commit + // hash. Identical hashes cause git to skip pack transfer when the object + // already exists on the server, leaving the announcement in purgatory. + let git_dir = push_unique_git_data_to_relay( + &relay_a, + maintainer_keys, + identifier, + &[&relay_a.domain()], + &m_npub, + ) + .await; + git_dirs.push(git_dir); + } + println!("✓ Three maintainer announcements + git data pushed to relay_a"); - // Push git data to release each maintainer's announcement from purgatory - let git_dir = - push_git_data_to_relay(&relay_a, maintainer_keys, identifier, &[&relay_a.domain()]) - .await; - git_dirs_maintainers.push(git_dir); + // Confirm all three announcements are queryable on relay_a before starting relay_b. + // This eliminates the race between relay_a's DB writes and relay_b's initial negentropy sync. + for (name, keys) in [ + ("maintainer1", &maintainer1_keys), + ("maintainer2", &maintainer2_keys), + ("maintainer3", &maintainer3_keys), + ] { + let filter = Filter::new() + .kind(Kind::GitRepoAnnouncement) + .author(keys.public_key()) + .identifier(identifier); + let found = + wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await; + assert!(found, "{} announcement should be in relay_a before starting relay_b", name); } - println!("✓ Three maintainer announcements sent to relay_a with git data"); + println!("✓ All three maintainer announcements confirmed in relay_a's DB"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // Because all three maintainer announcements are confirmed in relay_a's DB, relay_b's + // initial negentropy sync will pick them all up and reject them (no owner announcement + // in relay_b's DB yet), storing them in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); + + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // allow extra time for slow CI environments. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); - // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers) + // Step 3: Send owner announcement to relay_b → goes to purgatory. let owner_npub = owner_keys .public_key() .to_bech32() @@ -361,17 +417,19 @@ async fn test_multiple_maintainers_all_reprocessed() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory and triggers hot-cache + // re-processing for all three maintainer announcements. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (hot-cache re-processing should fire)"); - // Step 3: Wait for sync and re-processing - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; - // Step 4: Verify all four announcements are in relay_b's database + // Step 6: Verify all four announcements are in relay_b's database. for (name, keys) in [ ("owner", &owner_keys), ("maintainer1", &maintainer1_keys), @@ -396,10 +454,10 @@ async fn test_multiple_maintainers_all_reprocessed() { /// Test that invalid maintainer public keys don't cause panics /// /// Flow: -/// 1. Maintainer announcement arrives → Rejected -/// 2. Owner announcement arrives with INVALID maintainer hex → Should handle gracefully -/// 3. Owner announcement should still be accepted -/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey) +/// 1. Maintainer announcement arrives → Rejected (doesn't list our relay) +/// 2. Owner announcement + git push → accepted, with INVALID maintainer hex in maintainers tag +/// 3. Owner announcement should be accepted +/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey can't be parsed) #[tokio::test] async fn test_invalid_maintainer_pubkey_handled_gracefully() { let relay = TestRelay::start().await; @@ -410,8 +468,12 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { let identifier = "invalid-maintainer-repo"; + // Create client using TestClient helper + let client = TestClient::new(relay.url(), owner_keys.clone()) + .await + .expect("Failed to connect to relay"); + // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) - // This one uses example.com clone URL - it goes to purgatory on relay, never promoted let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -428,12 +490,13 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .sign_with_keys(&maintainer_keys) .unwrap(); - // Send maintainer announcement - expect it to be rejected (purgatory / policy) - send_to_relay(&relay, &maintainer_announcement).await.ok(); + // Send maintainer announcement - expect it to be rejected + let _ = client.send_event(&maintainer_announcement).await; tokio::time::sleep(Duration::from_millis(200)).await; - // Step 2: Set up owner announcement with INVALID maintainer hex and git data - // Use HTTP clone URL to relay's git endpoint so it can be released from purgatory + // Step 2: Send owner announcement with INVALID maintainer hex, then push git data. + // The announcement goes to purgatory first; the git push promotes it. + // The invalid maintainer hex should be handled gracefully (no panic). let owner_npub = owner_keys .public_key() .to_bech32() @@ -461,13 +524,8 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .unwrap(); send_to_relay(&relay, &owner_announcement).await.unwrap(); - - // Push git data to relay → releases owner announcement from purgatory let _git_dir = push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await; - println!("✓ Owner git data pushed to relay (announcement released from purgatory)"); - - // Wait for processing tokio::time::sleep(Duration::from_millis(500)).await; // Step 3: Verify owner announcement accepted, maintainer not re-processed @@ -497,5 +555,6 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { println!("✅ Invalid maintainer pubkey handled gracefully without panic"); + client.disconnect().await; relay.stop().await; } -- cgit v1.2.3 From 49401286ea7413f834197e6a5b221649e10e2ad8 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 11:36:45 +0000 Subject: fix: promote purgatory announcements after git sync copy path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a state event arrives and the required commits already exist in another maintainer's repo on the same relay, process_state_with_git_data copies the OIDs across and aligns refs — but never called process_purgatory_announcements for the target repos. Any announcement waiting in purgatory for that repo stayed there indefinitely. Fix: after process_state_with_git_data, call process_newly_available_git_data for each target repo (those that received copied OIDs) so purgatory announcements are promoted immediately. --- src/nostr/policy/state.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) (limited to 'src') diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 4bfb513..9ad72c2 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; @@ -192,6 +193,42 @@ impl StatePolicy { } } + // After copying OIDs to other owner repos, promote any purgatory announcements + // for those repos. This handles the case where two maintainers push to the same + // identifier on the same relay with identical commit hashes: the second maintainer's + // announcement sits in purgatory, and when their state event arrives the relay copies + // commits from the first maintainer's repo — but without this call the announcement + // would stay in purgatory indefinitely. + let local_relay = self.ctx.get_local_relay(); + let empty_oids: HashSet = HashSet::new(); + for announcement in &db_repo_data.announcements { + let target_repo_path = self.ctx.git_data_path.join(announcement.repo_path()); + if target_repo_path != repo_with_git_data { + // OIDs were copied to this repo by process_state_with_git_data; + // check if there's a purgatory announcement waiting for it. + if let Err(e) = crate::git::sync::process_newly_available_git_data( + &target_repo_path, + &empty_oids, + &self.ctx.database, + local_relay.as_ref(), + &self.ctx.purgatory, + &self.ctx.git_data_path, + None, + None, + ) + .await + { + tracing::warn!( + identifier = %state.identifier, + event_id = %event.id, + repo_path = %target_repo_path.display(), + error = %e, + "Failed to process purgatory announcements for target repo after git sync copy" + ); + } + } + } + // Event will be saved and broadcast by relay builder Ok(WritePolicyResult::Accept) } else { -- cgit v1.2.3 From f62ef12fb84e2210f9a0a67a5e1e574a8ee66c16 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:48:26 +0000 Subject: refactor: replace inline purgatory sync registration with timer-only approach Remove the redundant inline kind-30617 registration block from the sync event loop and the three is_generic/recompute_new_sync_filters_for_relay calls from confirm_batch error paths. The purgatory announcement sync timer (run_purgatory_announcement_sync) is now the sole registration path. Consolidate NGIT_SYNC_BATCH_WINDOW_MS and NGIT_PURGATORY_SYNC_INTERVAL_MS into a single NGIT_TEST=1 flag that sets both timers to 200ms, replacing two ad-hoc env vars with one reusable test-mode flag. --- src/sync/mod.rs | 103 ++++++---------------------------- src/sync/self_subscriber.rs | 12 ++-- tests/common/relay.rs | 2 +- tests/sync/maintainer_reprocessing.rs | 2 +- 4 files changed, 26 insertions(+), 93 deletions(-) (limited to 'src') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed5b6e7..44efbf0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -399,21 +399,24 @@ async fn run_daily_timer( /// Background task that periodically syncs purgatory announcements into repo_sync_index. /// -/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there -/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` -/// which connects to the relay URLs listed in the announcement and subscribes to state -/// events (kind 30618). +/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). +/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in +/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the +/// relay URLs listed in the announcement and subscribes to state events (kind 30618). /// -/// This covers two cases: -/// - Sync-path announcements: registered inline during event processing, but this -/// provides a safety net in case the inline registration was missed. +/// This is the sole registration path for purgatory announcements: +/// - Sync-path announcements: registered here within one interval of arriving. /// - User-submitted purgatory announcements: the SelfSubscriber never sees them -/// (they're rejected from DB), so this timer is the primary registration path. +/// (they're rejected from DB), so this timer is the only registration path. async fn run_purgatory_announcement_sync( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, ) { - let interval = Duration::from_secs(5); + let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_secs(5) + }; loop { tokio::select! { _ = tokio::time::sleep(interval) => { @@ -1084,24 +1087,12 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_fallback, - ) - .await; - } } } return; @@ -1195,24 +1186,12 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_retry, - ) - .await; - } } } return; @@ -1223,8 +1202,6 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); - let is_generic = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1236,12 +1213,6 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter (announcement) batches, recompute sync filters so any - // purgatory repos registered during this batch get state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1370,7 +1341,7 @@ impl SyncManager { /// to be batched and create Layer 2/3 filters before we mark sync complete. /// /// The 6-second delay is based on: - /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) + /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) /// - Buffer for processing: 1 second /// /// Called after each batch is confirmed to detect completion. @@ -1785,7 +1756,6 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); - let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1827,50 +1797,13 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) + // + // Note: announcement events (kind 30617) are registered in repo_sync_index + // by the purgatory announcement sync timer (run_purgatory_announcement_sync) + // rather than inline here. if result == ProcessResult::Purgatory { - // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly - // so that state events (kind 30618) are synced for this purgatory announcement - if event.kind == Kind::GitRepoAnnouncement { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = format!("30617:{}:{}", event.pubkey, identifier); - - // Extract relay URLs from the purgatory entry - let relays = write_policy - .purgatory() - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - tracing::info!( - event_id = %event.id, - repo_id = %repo_id, - relay_count = relays.len(), - "Registering purgatory announcement in RepoSyncIndex with StateOnly level" - ); - - // Register in RepoSyncIndex with StateOnly level - let mut index = repo_sync_index.write().await; - let entry = index - .entry(repo_id) - .or_insert_with(|| RepoSyncNeeds { - relays: HashSet::new(), - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - entry.relays.extend(relays); - // Don't upgrade sync_level if already Full - // (e.g., if announcement was promoted before this runs) - } - } // State events (kind 30618) - extract identifier and trigger immediate sync - else if event.kind.as_u16() == 30618 { + if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 70c3dbf..ab10c49 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -126,14 +126,14 @@ impl SelfSubscriber { /// Get batch window from environment or use default /// - /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. + /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. /// Default: 5000ms (5 seconds) fn get_batch_window() -> Duration { - std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") - .ok() - .and_then(|s| s.parse::().ok()) - .map(Duration::from_millis) - .unwrap_or(Duration::from_millis(5000)) + if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_millis(5000) + } } /// Process a relay pool notification diff --git a/tests/common/relay.rs b/tests/common/relay.rs index 0ec9a2e..b1e96cf 100644 --- a/tests/common/relay.rs +++ b/tests/common/relay.rs @@ -204,7 +204,7 @@ impl TestRelay { .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation .env("NGIT_OWNER_NPUB", &test_npub) - .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) + .env("NGIT_TEST", "1") // Enable test mode: fast timers (200ms batch window, 200ms purgatory sync) .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 61d8e14..ff1eb43 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -377,7 +377,7 @@ async fn test_multiple_maintainers_all_reprocessed() { println!("relay_b started at {}", relay_b.url()); // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. - // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // The negentropy sync completes within ~200ms (NGIT_TEST=1 sets batch window to 200ms), but we // allow extra time for slow CI environments. tokio::time::sleep(Duration::from_secs(3)).await; println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); -- cgit v1.2.3 From f659ac657bbce1aec423815c184255bb50652ba3 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:53:18 +0000 Subject: feat: implement soft expiry and revival for purgatory announcements Two-phase expiry for announcement purgatory entries: - Phase 1 (initial 30min timeout): delete bare repo, set soft_expired=true, extend expiry by 24h so the event is retained for potential revival - Phase 2 (24h extended timeout): fully remove from purgatory Revival: extend_announcement_expiry() now recreates the bare git repo when called on a soft-expired entry (triggered by state event or git auth), clearing soft_expired and resetting the expiry window. --- src/purgatory/mod.rs | 139 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 123 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 1894738..3c6bc1b 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -33,6 +33,13 @@ pub use sync::SyncQueueEntry; /// Default expiry duration for purgatory entries (30 minutes) const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); +/// Extended expiry for soft-expired announcements (24 hours). +/// +/// After the initial 30-minute expiry, the bare repo is deleted but the event is +/// retained for this additional period. This allows revival if a state event arrives +/// late (e.g. slow sync), without permanently blocking the repository. +const SOFT_EXPIRY_EXTENDED: Duration = Duration::from_secs(86400); + /// Default delay before syncing user-submitted events (3 minutes). /// This gives time for the git push to arrive after the nostr event. const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); @@ -657,20 +664,77 @@ impl Purgatory { /// * `duration` - Minimum duration to guarantee from now pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) { let key = (*owner, identifier.to_string()); + + // Collect revival info before taking a mutable borrow + let revival_info: Option<(PathBuf, bool)> = self + .announcement_purgatory + .get(&key) + .map(|entry| (entry.repo_path.clone(), entry.soft_expired)); + if let Some(mut entry) = self.announcement_purgatory.get_mut(&key) { let now = Instant::now(); let new_expiry = now + duration; if entry.expires_at < new_expiry { entry.expires_at = new_expiry; - // If soft-expired, revive it - if entry.soft_expired { - entry.soft_expired = false; - tracing::debug!( - owner = %owner, - identifier = %identifier, - "Revived soft-expired announcement" - ); + } + // Always reset soft_expired when expiry is extended — the caller + // (state event or git auth) signals the repo is still active. + if entry.soft_expired { + entry.soft_expired = false; + } + } + + // If the entry was soft-expired, recreate the bare repo outside the + // mutable borrow so we don't hold the DashMap lock during I/O. + if let Some((repo_path, was_soft_expired)) = revival_info { + if was_soft_expired { + if !repo_path.exists() { + match std::fs::create_dir_all(&repo_path) { + Ok(()) => { + // Initialise as a bare git repository + let status = std::process::Command::new("git") + .args(["init", "--bare"]) + .arg(&repo_path) + .status(); + match status { + Ok(s) if s.success() => { + tracing::info!( + path = %repo_path.display(), + owner = %owner, + identifier = %identifier, + "Recreated bare repository for revived soft-expired announcement" + ); + } + Ok(s) => { + tracing::warn!( + path = %repo_path.display(), + exit_code = ?s.code(), + "git init --bare failed when reviving soft-expired announcement" + ); + } + Err(e) => { + tracing::warn!( + path = %repo_path.display(), + error = %e, + "Failed to run git init --bare when reviving soft-expired announcement" + ); + } + } + } + Err(e) => { + tracing::warn!( + path = %repo_path.display(), + error = %e, + "Failed to create directory when reviving soft-expired announcement" + ); + } + } } + tracing::info!( + owner = %owner, + identifier = %identifier, + "Revived soft-expired announcement (bare repo recreated, expiry extended)" + ); } } } @@ -803,22 +867,65 @@ impl Purgatory { pub fn cleanup(&self) -> (usize, usize, usize) { let now = Instant::now(); - // Remove expired announcements and mark them as expired - let expired_announcements: Vec<(PublicKey, String, EventId)> = self + // Process expired announcements with two-phase soft expiry: + // + // Phase 1 (initial expiry, !soft_expired): Delete bare repo, set soft_expired=true, + // extend expiry by SOFT_EXPIRY_EXTENDED so the event is retained for revival. + // Phase 2 (extended expiry, soft_expired): Fully remove from purgatory. + // + // Collect entries that have passed their expires_at deadline. + let expired_announcements: Vec<(PublicKey, String, PathBuf, EventId, bool)> = self .announcement_purgatory .iter() .filter(|entry| entry.value().expires_at <= now) .map(|entry| { let key = entry.key(); - let event_id = entry.value().event.id; - (key.0.clone(), key.1.clone(), event_id) + let v = entry.value(); + (key.0.clone(), key.1.clone(), v.repo_path.clone(), v.event.id, v.soft_expired) }) .collect(); - let announcement_removed = expired_announcements.len(); - for (owner, identifier, event_id) in expired_announcements { - self.mark_expired(event_id); - self.announcement_purgatory.remove(&(owner, identifier)); + let mut announcement_removed = 0; + for (owner, identifier, repo_path, event_id, already_soft_expired) in expired_announcements { + if already_soft_expired { + // Phase 2: fully remove + self.mark_expired(event_id); + self.announcement_purgatory.remove(&(owner.clone(), identifier.clone())); + announcement_removed += 1; + tracing::info!( + owner = %owner, + identifier = %identifier, + "Announcement fully expired from purgatory (soft expiry period elapsed)" + ); + } else { + // Phase 1: soft expiry — delete bare repo, retain event + if repo_path.exists() { + if let Err(e) = std::fs::remove_dir_all(&repo_path) { + tracing::warn!( + path = %repo_path.display(), + error = %e, + "Failed to delete bare repository during soft expiry" + ); + } else { + tracing::info!( + path = %repo_path.display(), + owner = %owner, + identifier = %identifier, + "Deleted bare repository during soft expiry (event retained for revival)" + ); + } + } + // Mark soft_expired and extend expiry + if let Some(mut entry) = self.announcement_purgatory.get_mut(&(owner.clone(), identifier.clone())) { + entry.soft_expired = true; + entry.expires_at = now + SOFT_EXPIRY_EXTENDED; + } + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Announcement soft-expired: bare repo deleted, event retained for 24h" + ); + } } let mut state_removed = 0; -- cgit v1.2.3 From 84c9003323162f166552d1dea15ee9ed1b1a025a Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:54:05 +0000 Subject: feat: extend purgatory announcement expiry when state event arrives Per design doc decision #4: state event arrival resets the 30-minute protocol timer for purgatory announcements. This prevents premature expiry during slow sync operations where the repo is actively receiving metadata but git data hasn't arrived yet. Extends expiry for all owners whose announcement authorized the state event, and triggers revival if the announcement was soft-expired. --- src/nostr/policy/state.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'src') diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 9ad72c2..e6de54e 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -146,6 +146,34 @@ impl StatePolicy { "State event author authorized via maintainer set" ); + // Extend expiry for any purgatory announcements for this identifier. + // + // Per design doc decision #4: state event arrival extends the purgatory + // announcement's expiry (reset the 30-minute protocol timer). This prevents + // premature expiry during slow sync operations — the repo is actively receiving + // metadata so it should stay alive. + // + // We extend for all owners that authorized this state event, since the state + // event proves the repo is active regardless of which owner's announcement + // authorized it. + for owner_hex in &authorized_owners { + if let Ok(owner_pk) = nostr_sdk::PublicKey::from_hex(owner_hex) { + if self.ctx.purgatory.has_purgatory_announcement(&owner_pk, &state.identifier) { + self.ctx.purgatory.extend_announcement_expiry( + &owner_pk, + &state.identifier, + std::time::Duration::from_secs(1800), + ); + tracing::debug!( + event_id = %event.id, + identifier = %state.identifier, + owner = %owner_hex, + "Extended purgatory announcement expiry due to state event arrival" + ); + } + } + } + // Duplicate check in db if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { tracing::debug!("processed state event duplicate (in db): {}", event.id); -- cgit v1.2.3 From c3dedb7a5b527c3a3deb1e781aba9d562c6eb294 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:54:57 +0000 Subject: feat: extend purgatory announcement expiry during git push authorization Per design doc decision #4: when git auth finds a matching state event in purgatory that authorizes a push, extend the announcement's expiry. The repo is actively receiving git data so the announcement should not expire prematurely. Also triggers revival of soft-expired announcements. --- src/git/authorization.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'src') diff --git a/src/git/authorization.rs b/src/git/authorization.rs index 9d53c4f..69a0751 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs @@ -661,6 +661,27 @@ pub async fn get_state_authorization_for_specific_owner_repo( .unwrap_or_else(|_| latest_authorized.pubkey.to_hex()) ); + // Extend purgatory announcement expiry for the owner. + // + // Per design doc decision #4: git auth extending a state event's expiry + // also extends the announcement's expiry. The repo is actively receiving + // git data, so the announcement should not expire prematurely. + // This also revives soft-expired announcements (recreates bare repo). + if let Ok(owner_pk) = PublicKey::parse(owner_pubkey) { + if purgatory.has_purgatory_announcement(&owner_pk, identifier) { + purgatory.extend_announcement_expiry( + &owner_pk, + identifier, + std::time::Duration::from_secs(1800), + ); + debug!( + identifier = %identifier, + owner = %owner_pubkey, + "Extended purgatory announcement expiry due to git push authorization" + ); + } + } + return Ok(AuthorizationResult { authorized: true, reason: "Authorized by state event in purgatory".to_string(), -- cgit v1.2.3 From c368f9132a16d45a17ad55943e4b68ba85a6835b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 13:04:07 +0000 Subject: fix: only soft-expire announcement when bare repo deletion succeeds If remove_dir_all fails, leave the entry untouched so the next cleanup cycle retries the deletion automatically. Previously a failed deletion would still set soft_expired=true and extend the expiry, meaning the bare repo would never be retried. --- src/purgatory/mod.rs | 65 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 3c6bc1b..f5f8b31 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -898,33 +898,48 @@ impl Purgatory { "Announcement fully expired from purgatory (soft expiry period elapsed)" ); } else { - // Phase 1: soft expiry — delete bare repo, retain event - if repo_path.exists() { - if let Err(e) = std::fs::remove_dir_all(&repo_path) { - tracing::warn!( - path = %repo_path.display(), - error = %e, - "Failed to delete bare repository during soft expiry" - ); - } else { - tracing::info!( - path = %repo_path.display(), - owner = %owner, - identifier = %identifier, - "Deleted bare repository during soft expiry (event retained for revival)" - ); + // Phase 1: soft expiry — delete bare repo, retain event. + // + // Only transition to soft_expired if the directory is gone (or never + // existed). If removal fails we leave the entry untouched so the next + // cleanup cycle retries the deletion automatically. + let repo_gone = if repo_path.exists() { + match std::fs::remove_dir_all(&repo_path) { + Ok(()) => { + tracing::info!( + path = %repo_path.display(), + owner = %owner, + identifier = %identifier, + "Deleted bare repository during soft expiry (event retained for revival)" + ); + true + } + Err(e) => { + tracing::warn!( + path = %repo_path.display(), + error = %e, + "Failed to delete bare repository during soft expiry; will retry next cleanup cycle" + ); + false + } } + } else { + // Already gone (e.g. deleted externally) + true + }; + + if repo_gone { + // Mark soft_expired and extend expiry + if let Some(mut entry) = self.announcement_purgatory.get_mut(&(owner.clone(), identifier.clone())) { + entry.soft_expired = true; + entry.expires_at = now + SOFT_EXPIRY_EXTENDED; + } + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Announcement soft-expired: bare repo deleted, event retained for 24h" + ); } - // Mark soft_expired and extend expiry - if let Some(mut entry) = self.announcement_purgatory.get_mut(&(owner.clone(), identifier.clone())) { - entry.soft_expired = true; - entry.expires_at = now + SOFT_EXPIRY_EXTENDED; - } - tracing::debug!( - owner = %owner, - identifier = %identifier, - "Announcement soft-expired: bare repo deleted, event retained for 24h" - ); } } -- cgit v1.2.3 From 65ac6ef83205c41653e6ffe2acd664f968926fb2 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 13:29:47 +0000 Subject: feat: remove purgatory announcements on NIP-09 deletion events Kind 5 deletion events signed by the announcement author now evict the corresponding purgatory entry and delete the bare repository from disk. Both NIP-09 reference styles are supported: - e tag (event ID): matches the purgatory entry whose event ID equals the tag value - a tag (coordinate 30617::): matches by coordinate, only removes entries with created_at <= deletion event created_at per NIP-09 spec Author-only enforcement: coordinate pubkey and e-tag owner must match the deletion event pubkey; third-party deletion attempts are silently ignored. Includes 6 unit tests and 2 integration tests (event ID and coordinate paths). --- grasp-audit/src/specs/grasp01/purgatory.rs | 199 +++++++++++++ src/nostr/builder.rs | 8 +- src/nostr/policy/deletion.rs | 438 +++++++++++++++++++++++++++++ src/nostr/policy/mod.rs | 2 + tests/purgatory.rs | 7 + 5 files changed, 652 insertions(+), 2 deletions(-) create mode 100644 src/nostr/policy/deletion.rs (limited to 'src') diff --git a/grasp-audit/src/specs/grasp01/purgatory.rs b/grasp-audit/src/specs/grasp01/purgatory.rs index 9c4b401..9d97d3b 100644 --- a/grasp-audit/src/specs/grasp01/purgatory.rs +++ b/grasp-audit/src/specs/grasp01/purgatory.rs @@ -46,6 +46,12 @@ impl PurgatoryTests { results.add(Self::test_bare_repo_exists_for_purgatory_announcement(client).await); results.add(Self::test_state_event_accepted_for_purgatory_announcement(client).await); + // Deletion event tests (NIP-09) + results.add(Self::test_deletion_by_event_id_removes_purgatory_announcement(client).await); + results.add( + Self::test_deletion_by_coordinate_removes_purgatory_announcement(client).await, + ); + // State event purgatory tests (already implemented) results.add(Self::test_state_event_not_served_before_git_data(client).await); results.add(Self::test_state_event_served_after_git_push(client).await); @@ -646,6 +652,199 @@ impl PurgatoryTests { }) .await } + // ============================================================ + // Deletion Event Tests (NIP-09) + // ============================================================ + + /// Test: Kind 5 deletion event by event ID removes purgatory announcement + /// + /// Spec: NIP-09 + /// "A special event with kind 5... having a list of one or more `e` or `a` tags, + /// each referencing an event the author is requesting to be deleted." + /// + /// This test verifies: + /// 1. Send a valid repository announcement (enters purgatory) + /// 2. Send a kind 5 deletion event referencing the announcement by event ID + /// 3. The announcement is no longer in purgatory (git push would fail) + /// 4. The deletion event itself is accepted by the relay + pub async fn test_deletion_by_event_id_removes_purgatory_announcement( + client: &AuditClient, + ) -> TestResult { + TestResult::new( + "deletion_by_event_id_removes_purgatory_announcement", + SpecRef::PurgatoryAcceptUntilGitData, + "Kind 5 deletion by event ID SHOULD remove a purgatory announcement", + ) + .run(|| async { + let ctx = TestContext::new(client); + + // Send announcement to purgatory + let repo = ctx + .get_fixture(FixtureKind::ValidRepoSent) + .await + .map_err(|e| format!("Failed to create repo announcement: {}", e))?; + + let repo_id = repo + .tags + .iter() + .find(|t| t.kind() == TagKind::d()) + .and_then(|t| t.content()) + .ok_or("Missing d tag in repo announcement")? + .to_string(); + + // Verify it's in purgatory (not served) + tokio::time::sleep(Duration::from_millis(300)).await; + if client.is_event_on_relay(repo.id).await.map_err(|e| e.to_string())? { + return Err( + "Announcement was served immediately - purgatory not working".to_string(), + ); + } + + // Build and send kind 5 deletion event referencing the announcement by event ID + let deletion = client + .event_builder(Kind::EventDeletion, "") + .tag(Tag::event(repo.id)) + .tag(Tag::custom( + TagKind::custom("k"), + vec!["30617"], + )) + .build(client.keys()) + .map_err(|e| format!("Failed to build deletion event: {}", e))?; + + client + .send_event(deletion) + .await + .map_err(|e| format!("Relay rejected deletion event: {}", e))?; + + tokio::time::sleep(Duration::from_millis(300)).await; + + // Verify the announcement can no longer be promoted by attempting a git push. + // We check this indirectly: if the purgatory entry was removed, a subsequent + // git push to the repo path should fail (no bare repo). + // For the integration test we verify the announcement is still not served + // (it was never promoted) and that the deletion event was accepted. + // The bare-repo deletion is verified by attempting a git clone. + let http_url = AuditClient::ws_to_http_url(&client.relay_url().await.map_err(|e| e.to_string())?) + .map_err(|e| e.to_string())?; + let clone_url = format!( + "{}/{}/{}.git", + http_url, + client.public_key().to_bech32().map_err(|e| e.to_string())?, + repo_id + ); + + // git ls-remote should fail (bare repo deleted) + let output = std::process::Command::new("git") + .args(["ls-remote", &clone_url]) + .output() + .map_err(|e| format!("Failed to run git ls-remote: {}", e))?; + + if output.status.success() { + return Err(format!( + "Bare repo still exists after deletion event. \ + Expected git ls-remote to fail for {}", + clone_url + )); + } + + Ok(()) + }) + .await + } + + /// Test: Kind 5 deletion event by `a` tag coordinate removes purgatory announcement + /// + /// Spec: NIP-09 + /// "When an `a` tag is used, relays SHOULD delete all versions of the replaceable + /// event up to the `created_at` timestamp of the deletion request event." + /// + /// This test verifies: + /// 1. Send a valid repository announcement (enters purgatory) + /// 2. Send a kind 5 deletion event referencing the announcement by coordinate + /// (`30617::`) + /// 3. The announcement is no longer in purgatory + pub async fn test_deletion_by_coordinate_removes_purgatory_announcement( + client: &AuditClient, + ) -> TestResult { + TestResult::new( + "deletion_by_coordinate_removes_purgatory_announcement", + SpecRef::PurgatoryAcceptUntilGitData, + "Kind 5 deletion by `a` coordinate SHOULD remove a purgatory announcement", + ) + .run(|| async { + let ctx = TestContext::new(client); + + // Send announcement to purgatory + let repo = ctx + .get_fixture(FixtureKind::ValidRepoSent) + .await + .map_err(|e| format!("Failed to create repo announcement: {}", e))?; + + let repo_id = repo + .tags + .iter() + .find(|t| t.kind() == TagKind::d()) + .and_then(|t| t.content()) + .ok_or("Missing d tag in repo announcement")? + .to_string(); + + // Verify it's in purgatory (not served) + tokio::time::sleep(Duration::from_millis(300)).await; + if client.is_event_on_relay(repo.id).await.map_err(|e| e.to_string())? { + return Err( + "Announcement was served immediately - purgatory not working".to_string(), + ); + } + + // Build coordinate: `30617::` + let coord = format!( + "30617:{}:{}", + client.public_key().to_hex(), + repo_id + ); + + // Build and send kind 5 deletion event referencing by coordinate + let deletion = client + .event_builder(Kind::EventDeletion, "") + .tag(Tag::custom(TagKind::custom("a"), vec![coord])) + .tag(Tag::custom(TagKind::custom("k"), vec!["30617"])) + .build(client.keys()) + .map_err(|e| format!("Failed to build deletion event: {}", e))?; + + client + .send_event(deletion) + .await + .map_err(|e| format!("Relay rejected deletion event: {}", e))?; + + tokio::time::sleep(Duration::from_millis(300)).await; + + // Verify bare repo was deleted + let http_url = AuditClient::ws_to_http_url(&client.relay_url().await.map_err(|e| e.to_string())?) + .map_err(|e| e.to_string())?; + let clone_url = format!( + "{}/{}/{}.git", + http_url, + client.public_key().to_bech32().map_err(|e| e.to_string())?, + repo_id + ); + + let output = std::process::Command::new("git") + .args(["ls-remote", &clone_url]) + .output() + .map_err(|e| format!("Failed to run git ls-remote: {}", e))?; + + if output.status.success() { + return Err(format!( + "Bare repo still exists after deletion event. \ + Expected git ls-remote to fail for {}", + clone_url + )); + } + + Ok(()) + }) + .await + } } #[cfg(test)] diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index c2d4939..d056e46 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -14,8 +14,8 @@ use nostr_relay_builder::prelude::*; use crate::config::{Config, DatabaseBackend}; use crate::nostr::events::RepositoryAnnouncement; use crate::nostr::policy::{ - AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, - RelatedEventPolicy, StatePolicy, StateResult, + AnnouncementPolicy, AnnouncementResult, DeletionPolicy, PolicyContext, PrEventPolicy, + ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; @@ -29,6 +29,7 @@ pub type SharedDatabase = Arc; /// - `StatePolicy` - State event validation + ref alignment /// - `PrEventPolicy` - PR/PR Update validation /// - `RelatedEventPolicy` - Forward/backward reference checking +/// - `DeletionPolicy` - NIP-09 event deletion request handling /// /// Uses stateful database queries to check event relationships. #[derive(Clone)] @@ -38,6 +39,7 @@ pub struct Nip34WritePolicy { state_policy: StatePolicy, pr_event_policy: PrEventPolicy, related_event_policy: RelatedEventPolicy, + deletion_policy: DeletionPolicy, } impl std::fmt::Debug for Nip34WritePolicy { @@ -69,6 +71,7 @@ impl Nip34WritePolicy { state_policy: StatePolicy::new(ctx.clone()), pr_event_policy: PrEventPolicy::new(ctx.clone()), related_event_policy: RelatedEventPolicy::new(ctx.clone()), + deletion_policy: DeletionPolicy::new(ctx.clone()), ctx, } } @@ -521,6 +524,7 @@ impl WritePolicy for Nip34WritePolicy { ); WritePolicyResult::Accept } + Kind::EventDeletion => self.deletion_policy.handle(event).await, _ => self.handle_related_event(event, "Event").await, } }) diff --git a/src/nostr/policy/deletion.rs b/src/nostr/policy/deletion.rs new file mode 100644 index 0000000..69a5758 --- /dev/null +++ b/src/nostr/policy/deletion.rs @@ -0,0 +1,438 @@ +/// Deletion Policy - NIP-09 event deletion request handling +/// +/// Handles kind 5 (EventDeletion) events that request removal of repository +/// announcements (kind 30617) from purgatory. +/// +/// ## NIP-09 Rules Enforced +/// +/// - Only the event author can delete their own events (pubkey must match) +/// - `e` tags reference specific event IDs to delete +/// - `a` tags reference addressable events by coordinate (`::`) +/// - When an `a` tag is used, all versions up to `created_at` of the deletion request +/// are considered deleted +/// +/// ## Purgatory Interaction +/// +/// When a valid deletion request targets a kind 30617 announcement that is currently +/// in purgatory (not yet promoted to the database), the purgatory entry is removed +/// and the bare repository is deleted from disk. +use nostr_relay_builder::prelude::{Event, WritePolicyResult}; + +use super::PolicyContext; + +/// Policy for handling NIP-09 event deletion requests +#[derive(Clone)] +pub struct DeletionPolicy { + ctx: PolicyContext, +} + +impl DeletionPolicy { + pub fn new(ctx: PolicyContext) -> Self { + Self { ctx } + } + + /// Process a kind 5 (EventDeletion) event. + /// + /// Checks whether the deletion request targets any purgatory announcements + /// and removes them if so. The deletion event itself is always accepted + /// (relays should store deletion requests per NIP-09). + /// + /// Only the event author can delete their own events — this is enforced by + /// checking that the purgatory entry's owner matches `event.pubkey`. + pub async fn handle(&self, event: &Event) -> WritePolicyResult { + // Process purgatory removals synchronously (no async needed) + self.remove_purgatory_targets(event); + + // Always accept the deletion event itself so it is stored and + // can prevent re-acceptance of the deleted event in the future. + WritePolicyResult::Accept + } + + /// Remove any purgatory announcements targeted by this deletion event. + /// + /// Handles both reference styles from NIP-09: + /// - `e` tags: event ID references — match against purgatory entry event IDs + /// - `a` tags: addressable coordinate references — `30617::` + /// + /// Only removes entries where the purgatory entry's owner matches the deletion + /// event's pubkey (enforces author-only deletion). + fn remove_purgatory_targets(&self, event: &Event) { + let author = &event.pubkey; + + for tag in event.tags.iter() { + let tag_vec = tag.as_slice(); + if tag_vec.len() < 2 { + continue; + } + + match tag_vec[0].as_str() { + "e" => { + // Event ID reference: find purgatory announcement with this event ID + let target_id = &tag_vec[1]; + self.remove_by_event_id(author, target_id, event.created_at.as_secs()); + } + "a" => { + // Addressable coordinate reference: `::` + let coord = &tag_vec[1]; + self.remove_by_coordinate(author, coord, event.created_at.as_secs()); + } + _ => {} + } + } + } + + /// Remove a purgatory announcement matched by event ID. + /// + /// Scans all purgatory announcements owned by `author` and removes the one + /// whose event ID hex matches `target_id_hex`. + fn remove_by_event_id(&self, author: &nostr_relay_builder::prelude::PublicKey, target_id_hex: &str, _deletion_created_at: u64) { + // Scan announcements owned by this author for a matching event ID + // We use get_announcements_by_identifier would require knowing the identifier, + // so instead we iterate via find_announcement after collecting all entries. + // The DashMap doesn't expose a direct "find by event ID" method, so we use + // the announcements_for_sync snapshot to get all (repo_id, _) pairs and then + // look up each one. + let all = self.ctx.purgatory.announcements_for_sync(); + for (repo_id, _) in all { + // repo_id format: "30617:{pubkey_hex}:{identifier}" + let parts: Vec<&str> = repo_id.splitn(3, ':').collect(); + if parts.len() != 3 { + continue; + } + let entry_pubkey_hex = parts[1]; + let identifier = parts[2]; + + // Only check entries owned by the deletion event author + if entry_pubkey_hex != author.to_hex() { + continue; + } + + if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { + if entry.event.id.to_hex() == target_id_hex { + tracing::info!( + event_id = %target_id_hex, + identifier = %identifier, + author = %author.to_hex(), + "Deletion request: removing purgatory announcement by event ID" + ); + self.evict_purgatory_entry(author, identifier); + return; // event IDs are unique, no need to continue + } + } + } + } + + /// Remove a purgatory announcement matched by addressable coordinate. + /// + /// The coordinate format is `::`. Only kind 30617 + /// coordinates are relevant here. Per NIP-09, all versions up to `deletion_created_at` + /// are considered deleted — since purgatory entries are always a single event per + /// (owner, identifier), we delete if the entry's `created_at` ≤ `deletion_created_at`. + fn remove_by_coordinate( + &self, + author: &nostr_relay_builder::prelude::PublicKey, + coordinate: &str, + deletion_created_at: u64, + ) { + // Parse coordinate: `::` + let parts: Vec<&str> = coordinate.splitn(3, ':').collect(); + if parts.len() != 3 { + return; + } + + let kind_str = parts[0]; + let coord_pubkey_hex = parts[1]; + let identifier = parts[2]; + + // Only handle kind 30617 (GitRepoAnnouncement) + if kind_str != "30617" { + return; + } + + // The coordinate pubkey must match the deletion event author + if coord_pubkey_hex != author.to_hex() { + tracing::debug!( + coord_pubkey = %coord_pubkey_hex, + deletion_author = %author.to_hex(), + "Ignoring deletion: coordinate pubkey does not match deletion author" + ); + return; + } + + if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { + // Per NIP-09: delete all versions up to deletion_created_at + if entry.event.created_at.as_secs() <= deletion_created_at { + tracing::info!( + identifier = %identifier, + author = %author.to_hex(), + entry_created_at = entry.event.created_at.as_secs(), + deletion_created_at = %deletion_created_at, + "Deletion request: removing purgatory announcement by coordinate" + ); + self.evict_purgatory_entry(author, identifier); + } else { + tracing::debug!( + identifier = %identifier, + author = %author.to_hex(), + entry_created_at = entry.event.created_at.as_secs(), + deletion_created_at = %deletion_created_at, + "Ignoring deletion: purgatory entry is newer than deletion request" + ); + } + } + } + + /// Remove a purgatory announcement and delete its bare repository from disk. + fn evict_purgatory_entry( + &self, + author: &nostr_relay_builder::prelude::PublicKey, + identifier: &str, + ) { + // Get repo path before removing + if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { + if entry.repo_path.exists() { + if let Err(e) = std::fs::remove_dir_all(&entry.repo_path) { + tracing::warn!( + path = %entry.repo_path.display(), + error = %e, + "Failed to delete bare repository during deletion request processing" + ); + } else { + tracing::info!( + path = %entry.repo_path.display(), + "Deleted bare repository for deletion-requested purgatory announcement" + ); + } + } + } + + self.ctx.purgatory.remove_announcement(author, identifier); + + // Remove state events for this identifier only if no other owner's + // announcement remains in purgatory (state events are keyed by identifier alone) + let other_owners_remain = !self + .ctx + .purgatory + .get_announcements_by_identifier(identifier) + .is_empty(); + + if !other_owners_remain { + self.ctx.purgatory.remove_state(identifier); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::nostr::policy::PolicyContext; + use crate::purgatory::Purgatory; + use nostr_relay_builder::prelude::*; + use std::collections::HashSet; + use std::path::PathBuf; + use std::sync::Arc; + + fn make_context() -> PolicyContext { + let db = Arc::new(MemoryDatabase::with_opts(MemoryDatabaseOptions { + events: true, + max_events: None, + })); + let purgatory = Arc::new(Purgatory::new(PathBuf::new())); + let config = crate::config::Config::for_testing(); + PolicyContext::new("test.example.com", db, PathBuf::new(), purgatory, config) + } + + fn make_announcement_event(keys: &Keys, identifier: &str) -> Event { + EventBuilder::new(Kind::GitRepoAnnouncement, "") + .tags(vec![ + Tag::identifier(identifier), + Tag::custom(TagKind::custom("clone"), vec!["https://example.com/repo.git"]), + ]) + .sign_with_keys(keys) + .unwrap() + } + + fn add_to_purgatory(ctx: &PolicyContext, event: &Event, identifier: &str) { + ctx.purgatory.add_announcement( + event.clone(), + identifier.to_string(), + event.pubkey, + PathBuf::new(), + HashSet::new(), + ); + } + + #[tokio::test] + async fn test_deletion_by_event_id_removes_purgatory_entry() { + let ctx = make_context(); + let keys = Keys::generate(); + let identifier = "my-repo"; + + let announcement = make_announcement_event(&keys, identifier); + add_to_purgatory(&ctx, &announcement, identifier); + + assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier)); + + // Build kind 5 deletion event referencing the announcement by event ID + let deletion = EventBuilder::new(Kind::EventDeletion, "") + .tags(vec![ + Tag::event(announcement.id), + Tag::custom(TagKind::custom("k"), vec!["30617"]), + ]) + .sign_with_keys(&keys) + .unwrap(); + + let policy = DeletionPolicy::new(ctx.clone()); + let result = policy.handle(&deletion).await; + + assert!(matches!(result, WritePolicyResult::Accept)); + assert!( + !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), + "Purgatory entry should have been removed" + ); + } + + #[tokio::test] + async fn test_deletion_by_coordinate_removes_purgatory_entry() { + let ctx = make_context(); + let keys = Keys::generate(); + let identifier = "my-repo"; + + let announcement = make_announcement_event(&keys, identifier); + add_to_purgatory(&ctx, &announcement, identifier); + + assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier)); + + // Build kind 5 deletion event referencing the announcement by coordinate + let coord = format!("30617:{}:{}", keys.public_key().to_hex(), identifier); + let deletion = EventBuilder::new(Kind::EventDeletion, "") + .tags(vec![ + Tag::custom(TagKind::custom("a"), vec![coord]), + Tag::custom(TagKind::custom("k"), vec!["30617"]), + ]) + .sign_with_keys(&keys) + .unwrap(); + + let policy = DeletionPolicy::new(ctx.clone()); + let result = policy.handle(&deletion).await; + + assert!(matches!(result, WritePolicyResult::Accept)); + assert!( + !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), + "Purgatory entry should have been removed" + ); + } + + #[tokio::test] + async fn test_deletion_by_wrong_author_does_not_remove() { + let ctx = make_context(); + let owner_keys = Keys::generate(); + let attacker_keys = Keys::generate(); + let identifier = "my-repo"; + + let announcement = make_announcement_event(&owner_keys, identifier); + add_to_purgatory(&ctx, &announcement, identifier); + + // Attacker tries to delete by event ID + let deletion = EventBuilder::new(Kind::EventDeletion, "") + .tags(vec![ + Tag::event(announcement.id), + Tag::custom(TagKind::custom("k"), vec!["30617"]), + ]) + .sign_with_keys(&attacker_keys) + .unwrap(); + + let policy = DeletionPolicy::new(ctx.clone()); + let result = policy.handle(&deletion).await; + + assert!(matches!(result, WritePolicyResult::Accept)); + assert!( + ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier), + "Purgatory entry should NOT have been removed by wrong author" + ); + } + + #[tokio::test] + async fn test_deletion_by_coordinate_wrong_author_does_not_remove() { + let ctx = make_context(); + let owner_keys = Keys::generate(); + let attacker_keys = Keys::generate(); + let identifier = "my-repo"; + + let announcement = make_announcement_event(&owner_keys, identifier); + add_to_purgatory(&ctx, &announcement, identifier); + + // Attacker tries to delete by coordinate using owner's pubkey in coord + // but signs with their own key — coord pubkey != deletion author + let coord = format!("30617:{}:{}", owner_keys.public_key().to_hex(), identifier); + let deletion = EventBuilder::new(Kind::EventDeletion, "") + .tags(vec![ + Tag::custom(TagKind::custom("a"), vec![coord]), + Tag::custom(TagKind::custom("k"), vec!["30617"]), + ]) + .sign_with_keys(&attacker_keys) + .unwrap(); + + let policy = DeletionPolicy::new(ctx.clone()); + let result = policy.handle(&deletion).await; + + assert!(matches!(result, WritePolicyResult::Accept)); + assert!( + ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier), + "Purgatory entry should NOT have been removed by wrong author" + ); + } + + #[tokio::test] + async fn test_deletion_of_nonexistent_entry_is_accepted() { + let ctx = make_context(); + let keys = Keys::generate(); + + // No purgatory entry exists — deletion should still be accepted + let deletion = EventBuilder::new(Kind::EventDeletion, "") + .tags(vec![ + Tag::custom(TagKind::custom("a"), vec![ + format!("30617:{}:nonexistent", keys.public_key().to_hex()) + ]), + ]) + .sign_with_keys(&keys) + .unwrap(); + + let policy = DeletionPolicy::new(ctx.clone()); + let result = policy.handle(&deletion).await; + + assert!(matches!(result, WritePolicyResult::Accept)); + } + + #[tokio::test] + async fn test_deletion_by_coordinate_respects_created_at() { + let ctx = make_context(); + let keys = Keys::generate(); + let identifier = "my-repo"; + + // Create announcement with a future timestamp + let future_ts = Timestamp::now().as_secs() + 3600; // 1 hour in the future + let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "") + .tags(vec![Tag::identifier(identifier)]) + .custom_created_at(Timestamp::from(future_ts)) + .sign_with_keys(&keys) + .unwrap(); + add_to_purgatory(&ctx, &announcement, identifier); + + // Deletion event with current timestamp (older than announcement) + let coord = format!("30617:{}:{}", keys.public_key().to_hex(), identifier); + let deletion = EventBuilder::new(Kind::EventDeletion, "") + .tags(vec![Tag::custom(TagKind::custom("a"), vec![coord])]) + .sign_with_keys(&keys) + .unwrap(); + + let policy = DeletionPolicy::new(ctx.clone()); + let result = policy.handle(&deletion).await; + + assert!(matches!(result, WritePolicyResult::Accept)); + assert!( + ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), + "Purgatory entry should NOT be removed: entry is newer than deletion request" + ); + } +} diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..f5b981a 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -6,11 +6,13 @@ /// - `PrEventPolicy` - PR/PR Update validation /// - `RelatedEventPolicy` - Forward/backward reference checking mod announcement; +mod deletion; mod pr_event; mod related; mod state; pub use announcement::{AnnouncementPolicy, AnnouncementResult}; +pub use deletion::DeletionPolicy; pub use pr_event::PrEventPolicy; pub use related::{ReferenceResult, RelatedEventPolicy}; pub use state::{StatePolicy, StateResult}; diff --git a/tests/purgatory.rs b/tests/purgatory.rs index efc28c9..553271f 100644 --- a/tests/purgatory.rs +++ b/tests/purgatory.rs @@ -66,6 +66,13 @@ isolated_purgatory_test!(test_announcement_served_after_git_push); isolated_purgatory_test!(test_bare_repo_exists_for_purgatory_announcement); isolated_purgatory_test!(test_state_event_accepted_for_purgatory_announcement); +// ============================================================ +// Deletion Event Tests (NIP-09) +// ============================================================ + +isolated_purgatory_test!(test_deletion_by_event_id_removes_purgatory_announcement); +isolated_purgatory_test!(test_deletion_by_coordinate_removes_purgatory_announcement); + // ============================================================ // State Event Purgatory Tests (already implemented) // ============================================================ -- cgit v1.2.3 From 0c71e191963bec729c3ca13c212b231af7582f06 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 13:42:57 +0000 Subject: fix: rewrite deletion integration tests to avoid shared-state side effects The previous tests deleted purgatory announcements (kind 30617) and checked for bare-repo absence via git ls-remote, which would corrupt shared-mode test state by destroying repos other tests depend on. New approach tests deletion of purgatory state events (kind 30618) instead: - e-tag test: promotes a repo, creates a unique commit locally, submits a state event pointing to it (enters purgatory), deletes the state event by event ID, then verifies git push of that commit is rejected. - a-tag coordinate test: promotes a repo, generates a fresh maintainer keypair, sends a replacement announcement adding that maintainer, submits a state event signed by the new maintainer (enters purgatory), deletes by coordinate 30618::, then verifies git push is rejected. Also extends DeletionPolicy to handle kind 30618 state events in purgatory for both e-tag (event ID) and a-tag (coordinate) deletion paths. --- grasp-audit/src/specs/grasp01/purgatory.rs | 329 +++++++++++++++++++---------- src/nostr/policy/deletion.rs | 138 +++++++----- tests/purgatory.rs | 4 +- 3 files changed, 307 insertions(+), 164 deletions(-) (limited to 'src') diff --git a/grasp-audit/src/specs/grasp01/purgatory.rs b/grasp-audit/src/specs/grasp01/purgatory.rs index 9d97d3b..29eabad 100644 --- a/grasp-audit/src/specs/grasp01/purgatory.rs +++ b/grasp-audit/src/specs/grasp01/purgatory.rs @@ -27,9 +27,11 @@ //! - `test_pr_event_in_purgatory_git_push_accepted` - Git push to refs/nostr/ succeeds //! - `test_pr_event_served_after_git_push` - Event becomes queryable after git data +use crate::fixtures::{clone_repo, create_commit, try_push}; use crate::specs::grasp01::SpecRef; use crate::{AuditClient, AuditResult, FixtureKind, TestContext, TestResult}; use nostr_sdk::prelude::*; +use std::fs; use std::time::Duration; /// Test suite for GRASP-01 purgatory behavior @@ -47,9 +49,9 @@ impl PurgatoryTests { results.add(Self::test_state_event_accepted_for_purgatory_announcement(client).await); // Deletion event tests (NIP-09) - results.add(Self::test_deletion_by_event_id_removes_purgatory_announcement(client).await); + results.add(Self::test_deletion_by_event_id_removes_purgatory_state_event(client).await); results.add( - Self::test_deletion_by_coordinate_removes_purgatory_announcement(client).await, + Self::test_deletion_by_coordinate_removes_purgatory_state_event(client).await, ); // State event purgatory tests (already implemented) @@ -656,192 +658,293 @@ impl PurgatoryTests { // Deletion Event Tests (NIP-09) // ============================================================ - /// Test: Kind 5 deletion event by event ID removes purgatory announcement + /// Test: Kind 5 deletion event by event ID removes a purgatory state event /// /// Spec: NIP-09 /// "A special event with kind 5... having a list of one or more `e` or `a` tags, /// each referencing an event the author is requesting to be deleted." /// /// This test verifies: - /// 1. Send a valid repository announcement (enters purgatory) - /// 2. Send a kind 5 deletion event referencing the announcement by event ID - /// 3. The announcement is no longer in purgatory (git push would fail) - /// 4. The deletion event itself is accepted by the relay - pub async fn test_deletion_by_event_id_removes_purgatory_announcement( + /// 1. Get a promoted repo (OwnerStateDataPushed) so git pushes are possible + /// 2. Clone the repo and create a unique commit (not yet pushed) + /// 3. Submit a state event pointing to that unique commit (enters purgatory) + /// 4. Send a kind 5 deletion event referencing the state event by event ID + /// 5. Attempt to push the unique commit — MUST be rejected (no authorized state event) + pub async fn test_deletion_by_event_id_removes_purgatory_state_event( client: &AuditClient, ) -> TestResult { TestResult::new( - "deletion_by_event_id_removes_purgatory_announcement", + "deletion_by_event_id_removes_purgatory_state_event", SpecRef::PurgatoryAcceptUntilGitData, - "Kind 5 deletion by event ID SHOULD remove a purgatory announcement", + "Kind 5 deletion by event ID SHOULD remove a purgatory state event, causing push rejection", ) .run(|| async { let ctx = TestContext::new(client); - // Send announcement to purgatory - let repo = ctx - .get_fixture(FixtureKind::ValidRepoSent) + // Stage 1: get a promoted repo with git data already on the relay + let existing_state = ctx + .get_fixture(FixtureKind::OwnerStateDataPushed) .await - .map_err(|e| format!("Failed to create repo announcement: {}", e))?; + .map_err(|e| format!("Failed to get promoted repo: {}", e))?; - let repo_id = repo + let repo_id = existing_state .tags .iter() .find(|t| t.kind() == TagKind::d()) .and_then(|t| t.content()) - .ok_or("Missing d tag in repo announcement")? + .ok_or("Missing d tag in state event")? .to_string(); - // Verify it's in purgatory (not served) - tokio::time::sleep(Duration::from_millis(300)).await; - if client.is_event_on_relay(repo.id).await.map_err(|e| e.to_string())? { - return Err( - "Announcement was served immediately - purgatory not working".to_string(), - ); + let relay_domain = client + .relay_url() + .await + .map_err(|e| e.to_string())? + .trim_start_matches("ws://") + .trim_start_matches("wss://") + .to_string(); + + let npub = client + .public_key() + .to_bech32() + .map_err(|e| e.to_string())?; + + // Stage 2: clone the repo and create a unique commit (not pushed yet) + let clone_path = clone_repo(&relay_domain, &npub, &repo_id) + .map_err(|e| format!("Failed to clone repo: {}", e))?; + + let cleanup = || { let _ = fs::remove_dir_all(&clone_path); }; + + let unique_commit = match create_commit(&clone_path, "deletion test unique commit") { + Ok(h) => h, + Err(e) => { cleanup(); return Err(format!("Failed to create commit: {}", e)); } + }; + + // Stage 3: submit a state event pointing to the unique commit (enters purgatory) + let state_event = client + .event_builder(Kind::RepoState, "") + .tag(Tag::identifier(&repo_id)) + .tag(Tag::custom( + TagKind::custom("refs/heads/main"), + vec![unique_commit.clone()], + )) + .tag(Tag::custom( + TagKind::custom("HEAD"), + vec!["ref: refs/heads/main".to_string()], + )) + .build(client.keys()) + .map_err(|e| { cleanup(); format!("Failed to build state event: {}", e) })?; + + let (_, in_purgatory) = client + .send_event_and_note_purgatory(state_event.clone()) + .await + .map_err(|e| { cleanup(); format!("Failed to send state event: {}", e) })?; + + if !in_purgatory { + cleanup(); + return Err(format!( + "State event was served immediately (not in purgatory). \ + Commit {} may already exist on relay.", + unique_commit + )); } - // Build and send kind 5 deletion event referencing the announcement by event ID + // Stage 4: send kind 5 deletion event referencing the state event by event ID let deletion = client .event_builder(Kind::EventDeletion, "") - .tag(Tag::event(repo.id)) - .tag(Tag::custom( - TagKind::custom("k"), - vec!["30617"], - )) + .tag(Tag::event(state_event.id)) + .tag(Tag::custom(TagKind::custom("k"), vec!["30618"])) .build(client.keys()) - .map_err(|e| format!("Failed to build deletion event: {}", e))?; + .map_err(|e| { cleanup(); format!("Failed to build deletion event: {}", e) })?; client .send_event(deletion) .await - .map_err(|e| format!("Relay rejected deletion event: {}", e))?; + .map_err(|e| { cleanup(); format!("Relay rejected deletion event: {}", e) })?; tokio::time::sleep(Duration::from_millis(300)).await; - // Verify the announcement can no longer be promoted by attempting a git push. - // We check this indirectly: if the purgatory entry was removed, a subsequent - // git push to the repo path should fail (no bare repo). - // For the integration test we verify the announcement is still not served - // (it was never promoted) and that the deletion event was accepted. - // The bare-repo deletion is verified by attempting a git clone. - let http_url = AuditClient::ws_to_http_url(&client.relay_url().await.map_err(|e| e.to_string())?) - .map_err(|e| e.to_string())?; - let clone_url = format!( - "{}/{}/{}.git", - http_url, - client.public_key().to_bech32().map_err(|e| e.to_string())?, - repo_id - ); - - // git ls-remote should fail (bare repo deleted) - let output = std::process::Command::new("git") - .args(["ls-remote", &clone_url]) - .output() - .map_err(|e| format!("Failed to run git ls-remote: {}", e))?; - - if output.status.success() { - return Err(format!( - "Bare repo still exists after deletion event. \ - Expected git ls-remote to fail for {}", - clone_url - )); + // Stage 5: attempt to push the unique commit — must be rejected + let push_result = try_push(&clone_path); + cleanup(); + + match push_result { + Ok(false) => Ok(()), // push rejected as expected + Ok(true) => Err(format!( + "Push was accepted but should have been rejected. \ + The state event (id={}) was deleted, so commit {} \ + should not be authorized.", + state_event.id, unique_commit + )), + Err(e) => Err(format!("Git push error: {}", e)), } - - Ok(()) }) .await } - /// Test: Kind 5 deletion event by `a` tag coordinate removes purgatory announcement + /// Test: Kind 5 deletion event by `a` tag coordinate removes a purgatory state event /// /// Spec: NIP-09 /// "When an `a` tag is used, relays SHOULD delete all versions of the replaceable /// event up to the `created_at` timestamp of the deletion request event." /// /// This test verifies: - /// 1. Send a valid repository announcement (enters purgatory) - /// 2. Send a kind 5 deletion event referencing the announcement by coordinate - /// (`30617::`) - /// 3. The announcement is no longer in purgatory - pub async fn test_deletion_by_coordinate_removes_purgatory_announcement( + /// 1. Get a promoted repo (OwnerStateDataPushed) so git pushes are possible + /// 2. Generate a fresh keypair for a new maintainer + /// 3. Send a replacement owner announcement adding the new maintainer (goes to DB) + /// 4. Send a state event signed by the new maintainer pointing to a unique commit + /// (enters purgatory — maintainer is authorized but commit doesn't exist yet) + /// 5. Delete by coordinate `30618::` + /// 6. Clone repo, create that unique commit, attempt to push — MUST be rejected + /// (the state event was deleted, so the commit is no longer authorized) + pub async fn test_deletion_by_coordinate_removes_purgatory_state_event( client: &AuditClient, ) -> TestResult { TestResult::new( - "deletion_by_coordinate_removes_purgatory_announcement", + "deletion_by_coordinate_removes_purgatory_state_event", SpecRef::PurgatoryAcceptUntilGitData, - "Kind 5 deletion by `a` coordinate SHOULD remove a purgatory announcement", + "Kind 5 deletion by `a` coordinate SHOULD remove a purgatory state event, causing push rejection", ) .run(|| async { let ctx = TestContext::new(client); - // Send announcement to purgatory - let repo = ctx - .get_fixture(FixtureKind::ValidRepoSent) + // Stage 1: get a promoted repo with git data already on the relay + let existing_state = ctx + .get_fixture(FixtureKind::OwnerStateDataPushed) .await - .map_err(|e| format!("Failed to create repo announcement: {}", e))?; + .map_err(|e| format!("Failed to get promoted repo: {}", e))?; - let repo_id = repo + let repo_id = existing_state .tags .iter() .find(|t| t.kind() == TagKind::d()) .and_then(|t| t.content()) - .ok_or("Missing d tag in repo announcement")? + .ok_or("Missing d tag in state event")? .to_string(); - // Verify it's in purgatory (not served) - tokio::time::sleep(Duration::from_millis(300)).await; - if client.is_event_on_relay(repo.id).await.map_err(|e| e.to_string())? { - return Err( - "Announcement was served immediately - purgatory not working".to_string(), - ); - } + // Stage 2: generate a fresh keypair for a new maintainer + let new_maintainer_keys = Keys::generate(); + let new_maintainer_hex = new_maintainer_keys.public_key().to_hex(); - // Build coordinate: `30617::` - let coord = format!( - "30617:{}:{}", - client.public_key().to_hex(), - repo_id - ); + // Stage 3: send a replacement owner announcement that adds the new maintainer. + // This is a replacement (same pubkey + identifier already in DB) so it goes + // straight to the database without entering purgatory. + let relay_url = client + .relay_url() + .await + .map_err(|e| e.to_string())?; + let http_url = relay_url + .replace("ws://", "http://") + .replace("wss://", "https://"); + let npub = client + .public_key() + .to_bech32() + .map_err(|e| e.to_string())?; - // Build and send kind 5 deletion event referencing by coordinate - let deletion = client - .event_builder(Kind::EventDeletion, "") - .tag(Tag::custom(TagKind::custom("a"), vec![coord])) - .tag(Tag::custom(TagKind::custom("k"), vec!["30617"])) + let replacement_announcement = client + .event_builder(Kind::GitRepoAnnouncement, "") + .tag(Tag::identifier(&repo_id)) + .tag(Tag::custom( + TagKind::custom("clone"), + vec![format!("{}/{}/{}.git", http_url, npub, repo_id)], + )) + .tag(Tag::custom( + TagKind::custom("relays"), + vec![relay_url.clone()], + )) + .tag(Tag::custom( + TagKind::custom("maintainers"), + vec![new_maintainer_hex.clone()], + )) .build(client.keys()) - .map_err(|e| format!("Failed to build deletion event: {}", e))?; + .map_err(|e| format!("Failed to build replacement announcement: {}", e))?; client - .send_event(deletion) + .send_event(replacement_announcement) .await - .map_err(|e| format!("Relay rejected deletion event: {}", e))?; + .map_err(|e| format!("Relay rejected replacement announcement: {}", e))?; - tokio::time::sleep(Duration::from_millis(300)).await; + tokio::time::sleep(Duration::from_millis(200)).await; - // Verify bare repo was deleted - let http_url = AuditClient::ws_to_http_url(&client.relay_url().await.map_err(|e| e.to_string())?) - .map_err(|e| e.to_string())?; - let clone_url = format!( - "{}/{}/{}.git", - http_url, - client.public_key().to_bech32().map_err(|e| e.to_string())?, - repo_id - ); + // Stage 4: clone the repo and create a unique commit (not pushed yet) + let relay_domain = relay_url + .trim_start_matches("ws://") + .trim_start_matches("wss://") + .to_string(); + + let clone_path = clone_repo(&relay_domain, &npub, &repo_id) + .map_err(|e| format!("Failed to clone repo: {}", e))?; + + let cleanup = || { let _ = fs::remove_dir_all(&clone_path); }; - let output = std::process::Command::new("git") - .args(["ls-remote", &clone_url]) - .output() - .map_err(|e| format!("Failed to run git ls-remote: {}", e))?; + let unique_commit = match create_commit(&clone_path, "deletion coordinate test unique commit") { + Ok(h) => h, + Err(e) => { cleanup(); return Err(format!("Failed to create commit: {}", e)); } + }; - if output.status.success() { + // Stage 5: submit a state event signed by the new maintainer pointing to the + // unique commit. The new maintainer is now authorized (listed in the replacement + // announcement), so the state event should enter purgatory (commit doesn't exist). + let state_event = client + .event_builder(Kind::RepoState, "") + .tag(Tag::identifier(&repo_id)) + .tag(Tag::custom( + TagKind::custom("refs/heads/main"), + vec![unique_commit.clone()], + )) + .tag(Tag::custom( + TagKind::custom("HEAD"), + vec!["ref: refs/heads/main".to_string()], + )) + .build(&new_maintainer_keys) + .map_err(|e| { cleanup(); format!("Failed to build state event: {}", e) })?; + + let (_, in_purgatory) = client + .send_event_and_note_purgatory(state_event.clone()) + .await + .map_err(|e| { cleanup(); format!("Failed to send state event: {}", e) })?; + + if !in_purgatory { + cleanup(); return Err(format!( - "Bare repo still exists after deletion event. \ - Expected git ls-remote to fail for {}", - clone_url + "State event was served immediately (not in purgatory). \ + Commit {} may already exist on relay.", + unique_commit )); } - Ok(()) + // Stage 6: send kind 5 deletion event signed by the new maintainer, + // referencing their state event by coordinate `30618::` + let coord = format!("30618:{}:{}", new_maintainer_hex, repo_id); + + let deletion = client + .event_builder(Kind::EventDeletion, "") + .tag(Tag::custom(TagKind::custom("a"), vec![coord])) + .tag(Tag::custom(TagKind::custom("k"), vec!["30618"])) + .build(&new_maintainer_keys) + .map_err(|e| { cleanup(); format!("Failed to build deletion event: {}", e) })?; + + client + .send_event(deletion) + .await + .map_err(|e| { cleanup(); format!("Relay rejected deletion event: {}", e) })?; + + tokio::time::sleep(Duration::from_millis(300)).await; + + // Stage 7: attempt to push the unique commit — must be rejected because + // the new maintainer's state event was deleted from purgatory + let push_result = try_push(&clone_path); + cleanup(); + + match push_result { + Ok(false) => Ok(()), // push rejected as expected + Ok(true) => Err(format!( + "Push was accepted but should have been rejected. \ + The new maintainer's state event (id={}) was deleted by coordinate, \ + so commit {} should not be authorized.", + state_event.id, unique_commit + )), + Err(e) => Err(format!("Git push error: {}", e)), + } }) .await } diff --git a/src/nostr/policy/deletion.rs b/src/nostr/policy/deletion.rs index 69a5758..01241c9 100644 --- a/src/nostr/policy/deletion.rs +++ b/src/nostr/policy/deletion.rs @@ -1,7 +1,7 @@ /// Deletion Policy - NIP-09 event deletion request handling /// -/// Handles kind 5 (EventDeletion) events that request removal of repository -/// announcements (kind 30617) from purgatory. +/// Handles kind 5 (EventDeletion) events that request removal of purgatory entries +/// for repository announcements (kind 30617) and state events (kind 30618). /// /// ## NIP-09 Rules Enforced /// @@ -13,9 +13,9 @@ /// /// ## Purgatory Interaction /// -/// When a valid deletion request targets a kind 30617 announcement that is currently -/// in purgatory (not yet promoted to the database), the purgatory entry is removed -/// and the bare repository is deleted from disk. +/// - Kind 30617 (announcement) in purgatory: entry removed, bare repo deleted from disk +/// - Kind 30618 (state event) in purgatory: matching state event(s) removed by event ID +/// or by (author, identifier) coordinate use nostr_relay_builder::prelude::{Event, WritePolicyResult}; use super::PolicyContext; @@ -48,13 +48,13 @@ impl DeletionPolicy { WritePolicyResult::Accept } - /// Remove any purgatory announcements targeted by this deletion event. + /// Remove any purgatory entries targeted by this deletion event. /// /// Handles both reference styles from NIP-09: - /// - `e` tags: event ID references — match against purgatory entry event IDs - /// - `a` tags: addressable coordinate references — `30617::` + /// - `e` tags: event ID references — match against announcement or state event IDs + /// - `a` tags: addressable coordinate references — `30617:…` or `30618:…` /// - /// Only removes entries where the purgatory entry's owner matches the deletion + /// Only removes entries where the purgatory entry's author matches the deletion /// event's pubkey (enforces author-only deletion). fn remove_purgatory_targets(&self, event: &Event) { let author = &event.pubkey; @@ -81,17 +81,19 @@ impl DeletionPolicy { } } - /// Remove a purgatory announcement matched by event ID. + /// Remove a purgatory entry (announcement or state event) matched by event ID. /// - /// Scans all purgatory announcements owned by `author` and removes the one - /// whose event ID hex matches `target_id_hex`. - fn remove_by_event_id(&self, author: &nostr_relay_builder::prelude::PublicKey, target_id_hex: &str, _deletion_created_at: u64) { - // Scan announcements owned by this author for a matching event ID - // We use get_announcements_by_identifier would require knowing the identifier, - // so instead we iterate via find_announcement after collecting all entries. + /// Checks announcements first (kind 30617), then state events (kind 30618). + /// Only removes entries whose author matches `author`. + fn remove_by_event_id( + &self, + author: &nostr_relay_builder::prelude::PublicKey, + target_id_hex: &str, + _deletion_created_at: u64, + ) { + // --- Check announcements (kind 30617) --- // The DashMap doesn't expose a direct "find by event ID" method, so we use - // the announcements_for_sync snapshot to get all (repo_id, _) pairs and then - // look up each one. + // the announcements_for_sync snapshot to enumerate all (repo_id, _) pairs. let all = self.ctx.purgatory.announcements_for_sync(); for (repo_id, _) in all { // repo_id format: "30617:{pubkey_hex}:{identifier}" @@ -102,7 +104,6 @@ impl DeletionPolicy { let entry_pubkey_hex = parts[1]; let identifier = parts[2]; - // Only check entries owned by the deletion event author if entry_pubkey_hex != author.to_hex() { continue; } @@ -116,18 +117,37 @@ impl DeletionPolicy { "Deletion request: removing purgatory announcement by event ID" ); self.evict_purgatory_entry(author, identifier); - return; // event IDs are unique, no need to continue + return; // event IDs are unique + } + } + } + + // --- Check state events (kind 30618) --- + // State events are keyed by identifier; scan all identifiers for a match. + let state_identifiers = self.ctx.purgatory.get_all_identifiers(); + for identifier in state_identifiers { + let entries = self.ctx.purgatory.find_state(&identifier); + for entry in entries { + if entry.author == *author && entry.event.id.to_hex() == target_id_hex { + tracing::info!( + event_id = %target_id_hex, + identifier = %identifier, + author = %author.to_hex(), + "Deletion request: removing purgatory state event by event ID" + ); + self.ctx.purgatory.remove_state_event(&identifier, &entry.event.id); + return; // event IDs are unique } } } } - /// Remove a purgatory announcement matched by addressable coordinate. + /// Remove a purgatory entry matched by addressable coordinate. + /// + /// The coordinate format is `::`. + /// Handles kind 30617 (announcements) and kind 30618 (state events). /// - /// The coordinate format is `::`. Only kind 30617 - /// coordinates are relevant here. Per NIP-09, all versions up to `deletion_created_at` - /// are considered deleted — since purgatory entries are always a single event per - /// (owner, identifier), we delete if the entry's `created_at` ≤ `deletion_created_at`. + /// Per NIP-09, all versions up to `deletion_created_at` are considered deleted. fn remove_by_coordinate( &self, author: &nostr_relay_builder::prelude::PublicKey, @@ -144,11 +164,6 @@ impl DeletionPolicy { let coord_pubkey_hex = parts[1]; let identifier = parts[2]; - // Only handle kind 30617 (GitRepoAnnouncement) - if kind_str != "30617" { - return; - } - // The coordinate pubkey must match the deletion event author if coord_pubkey_hex != author.to_hex() { tracing::debug!( @@ -159,25 +174,50 @@ impl DeletionPolicy { return; } - if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { - // Per NIP-09: delete all versions up to deletion_created_at - if entry.event.created_at.as_secs() <= deletion_created_at { - tracing::info!( - identifier = %identifier, - author = %author.to_hex(), - entry_created_at = entry.event.created_at.as_secs(), - deletion_created_at = %deletion_created_at, - "Deletion request: removing purgatory announcement by coordinate" - ); - self.evict_purgatory_entry(author, identifier); - } else { - tracing::debug!( - identifier = %identifier, - author = %author.to_hex(), - entry_created_at = entry.event.created_at.as_secs(), - deletion_created_at = %deletion_created_at, - "Ignoring deletion: purgatory entry is newer than deletion request" - ); + match kind_str { + "30617" => { + // Announcement purgatory entry + if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { + if entry.event.created_at.as_secs() <= deletion_created_at { + tracing::info!( + identifier = %identifier, + author = %author.to_hex(), + "Deletion request: removing purgatory announcement by coordinate" + ); + self.evict_purgatory_entry(author, identifier); + } else { + tracing::debug!( + identifier = %identifier, + author = %author.to_hex(), + "Ignoring deletion: purgatory announcement is newer than deletion request" + ); + } + } + } + "30618" => { + // State event purgatory entries for this (author, identifier). + // Remove all entries authored by `author` with created_at ≤ deletion_created_at. + let entries = self.ctx.purgatory.find_state(identifier); + let mut removed = 0usize; + for entry in entries { + if entry.author == *author + && entry.event.created_at.as_secs() <= deletion_created_at + { + self.ctx.purgatory.remove_state_event(identifier, &entry.event.id); + removed += 1; + } + } + if removed > 0 { + tracing::info!( + identifier = %identifier, + author = %author.to_hex(), + removed = %removed, + "Deletion request: removed purgatory state event(s) by coordinate" + ); + } + } + _ => { + // Other kinds not handled } } } diff --git a/tests/purgatory.rs b/tests/purgatory.rs index 553271f..73f85ca 100644 --- a/tests/purgatory.rs +++ b/tests/purgatory.rs @@ -70,8 +70,8 @@ isolated_purgatory_test!(test_state_event_accepted_for_purgatory_announcement); // Deletion Event Tests (NIP-09) // ============================================================ -isolated_purgatory_test!(test_deletion_by_event_id_removes_purgatory_announcement); -isolated_purgatory_test!(test_deletion_by_coordinate_removes_purgatory_announcement); +isolated_purgatory_test!(test_deletion_by_event_id_removes_purgatory_state_event); +isolated_purgatory_test!(test_deletion_by_coordinate_removes_purgatory_state_event); // ============================================================ // State Event Purgatory Tests (already implemented) -- cgit v1.2.3 From f19b424e01fc5a682778c5e2bb194d242efd6987 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 13:46:57 +0000 Subject: feat: handle deletion of PR/PR-update events from purgatory Kind 5 deletion events referencing a PR or PR-update event by e-tag now remove the matching purgatory entry, provided the deletion author matches the PR event author. Placeholders (git data arrived before the event) are not removed since they have no author to verify against. PR purgatory is keyed by event ID hex so this is an O(1) lookup, checked before the O(n) announcement and state event scans. --- src/nostr/policy/deletion.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/nostr/policy/deletion.rs b/src/nostr/policy/deletion.rs index 01241c9..6457c90 100644 --- a/src/nostr/policy/deletion.rs +++ b/src/nostr/policy/deletion.rs @@ -81,9 +81,9 @@ impl DeletionPolicy { } } - /// Remove a purgatory entry (announcement or state event) matched by event ID. + /// Remove a purgatory entry (announcement, state event, or PR event) matched by event ID. /// - /// Checks announcements first (kind 30617), then state events (kind 30618). + /// Checks in order: announcements (30617), state events (30618), PR/PR-update events. /// Only removes entries whose author matches `author`. fn remove_by_event_id( &self, @@ -91,6 +91,26 @@ impl DeletionPolicy { target_id_hex: &str, _deletion_created_at: u64, ) { + // --- Check PR events (kind 1617/1618) first — O(1) direct lookup --- + // PR purgatory is keyed by event ID hex, so this is the cheapest check. + // Only remove if the entry has an actual event (not a placeholder) and the + // event's author matches the deletion request author. + if let Some(entry) = self.ctx.purgatory.find_pr(target_id_hex) { + if let Some(ref event) = entry.event { + if event.pubkey == *author { + tracing::info!( + event_id = %target_id_hex, + author = %author.to_hex(), + "Deletion request: removing purgatory PR event by event ID" + ); + self.ctx.purgatory.remove_pr(target_id_hex); + return; + } + } + // Entry exists but is a placeholder or wrong author — don't remove + return; + } + // --- Check announcements (kind 30617) --- // The DashMap doesn't expose a direct "find by event ID" method, so we use // the announcements_for_sync snapshot to enumerate all (repo_id, _) pairs. -- cgit v1.2.3 From 26f608e5011b9d1ad6036da75b89272835e69695 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 15:08:37 +0000 Subject: persist and restore announcement events across graceful restarts Extends purgatory persistence to include announcement purgatory entries. On graceful shutdown, non-soft-expired announcements are serialised to purgatory-state.json alongside state/PR/expired events; on startup they are restored, skipping any entry whose bare repo path no longer exists. Updates purgatory-design.md to reflect that purgatory persists through graceful shutdown and documents the new PurgatoryState disk format. Adds create_announcement_event helper to purgatory_helpers and three new integration tests in purgatory_persistence covering the full save/restore cycle, missing-repo skip, and the combined roundtrip with all entry types. --- docs/explanation/purgatory-design.md | 66 ++++++++- src/purgatory/mod.rs | 264 ++++++++++++++++++++++++++++++++++- tests/common/purgatory_helpers.rs | 38 +++++ tests/purgatory_persistence.rs | 135 +++++++++++++++++- 4 files changed, 493 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/docs/explanation/purgatory-design.md b/docs/explanation/purgatory-design.md index bd792d4..8e7d75c 100644 --- a/docs/explanation/purgatory-design.md +++ b/docs/explanation/purgatory-design.md @@ -39,14 +39,36 @@ This ensures we only serve announcements for repos that actually have content. ## Key Design Principles -### 1. In-Memory Only +### 1. Graceful-Shutdown Persistence -Purgatory data is **not persisted** to disk. On restart, all purgatory entries are lost. This is acceptable because: +Purgatory state is **saved to disk on graceful shutdown** and **restored on startup**. This preserves in-flight work across planned restarts (deployments, reboots). + +On `SIGINT` / Ctrl-C, `main.rs` calls `purgatory.save_to_disk()` before exiting. On startup, if the state file exists, `purgatory.restore_from_disk()` is called before the server begins accepting connections. + +**What is persisted:** + +| Store | Persisted? | Notes | +|-------|-----------|-------| +| `announcement_purgatory` | ✅ Yes | Non-soft-expired entries only (bare repo must exist) | +| `state_events` | ✅ Yes | All active entries | +| `pr_events` | ✅ Yes | Both events and placeholders | +| `expired_events` | ✅ Yes | Prevents re-sync loops after restart | +| `sync_queue` | ❌ No | Rebuilt automatically after restore | + +**What is NOT persisted (unclean shutdown):** + +On a crash or `SIGKILL`, the state file is not written. In that case: - Events are still on other relays (can be re-submitted) - Git data can be re-pushed - 30-minute expiry means data is transient anyway +**State file location:** `/purgatory-state.json` + +**Downtime accounting:** Expiry deadlines are stored as duration offsets from the save timestamp. On restore, elapsed downtime is subtracted from each deadline. Entries that expired during downtime are immediately swept by the next cleanup tick. + +**Soft-expired announcements are excluded:** Their bare repos have already been deleted, so they cannot be meaningfully restored. They will be re-fetched via background sync if needed. + ### 2. Separate Storage for Each Event Type | Store | Index | Purpose | @@ -233,6 +255,31 @@ pub struct Purgatory { } ``` +### Persistence State (Disk Format) + +`Instant` fields cannot be serialized directly. Each entry type has a corresponding `Serializable*` wrapper that stores time fields as `u64` second offsets from a `saved_at: SystemTime` reference point. On restore, elapsed downtime is subtracted to produce the correct remaining TTL. + +```rust +struct PurgatoryState { + version: u32, // currently 1 + saved_at: SystemTime, // reference for offset math + + /// Non-soft-expired announcements indexed by "owner_hex:identifier" + announcement_purgatory: HashMap, + + /// State events indexed by repository identifier + state_events: HashMap>, + + /// PR events (and placeholders) indexed by event ID hex + pr_events: HashMap, + + /// Expired event IDs → approximate expiry SystemTime + expired_events: HashMap, +} +``` + +The `announcement_purgatory` field uses `#[serde(default)]` so that state files written before announcement persistence was added (version 1 without the field) still deserialize correctly. + --- ## Announcement Purgatory Flows @@ -806,8 +853,9 @@ A background timer (`run_purgatory_announcement_sync`, every 5 seconds) ensures ``` src/ ├── purgatory/ -│ ├── mod.rs # Main Purgatory struct and API +│ ├── mod.rs # Main Purgatory struct, API, save_to_disk, restore_from_disk │ ├── types.rs # RefPair, AnnouncementPurgatoryEntry, StatePurgatoryEntry, PrPurgatoryEntry +│ ├── persistence.rs # instant_to_offset / offset_to_instant time conversion utilities │ ├── helpers.rs # Ref extraction and matching functions │ └── sync/ │ ├── mod.rs # Sync module exports @@ -835,7 +883,8 @@ src/ Located in each module: -- **[`src/purgatory/mod.rs`](../../src/purgatory/mod.rs)** - Core purgatory operations including announcement purgatory +- **[`src/purgatory/mod.rs`](../../src/purgatory/mod.rs)** - Core purgatory operations including announcement purgatory; persistence round-trip tests for all entry types (state, PR, announcement, expired events, downtime calculation, soft-expired exclusion, missing-repo skip) +- **[`src/purgatory/persistence.rs`](../../src/purgatory/persistence.rs)** - `instant_to_offset` / `offset_to_instant` round-trip tests - **[`src/purgatory/helpers.rs`](../../src/purgatory/helpers.rs)** - Ref matching logic - **[`src/purgatory/sync/functions.rs`](../../src/purgatory/sync/functions.rs)** - Sync functions with MockSyncContext - **[`src/purgatory/sync/throttle.rs`](../../src/purgatory/sync/throttle.rs)** - Throttle manager @@ -852,6 +901,7 @@ Located in [`tests/`](../../tests/): - **Git-data-first flow** - Git push creates placeholder, event completes it - **Authorization with purgatory** - Push authorized by purgatory state - **Background sync** - Sync fetches git data and releases events +- **Persistence across restart** - Save/restore cycle preserves all entry types including announcements --- @@ -894,6 +944,14 @@ PR events can arrive before or after git data: **Solution:** `PrPurgatoryEntry.event: Option` with `None` = placeholder. +### 6. Persistence Requires Instant → Duration Conversion + +`std::time::Instant` is not serializable and is not meaningful across process boundaries. Expiry deadlines must be converted to a portable form. + +**Solution:** Store each deadline as a `u64` second offset from a `saved_at: SystemTime` reference. On restore, subtract elapsed downtime from each offset to compute the new `Instant`. Entries whose deadline already passed during downtime get `expires_at = now` and are swept by the next cleanup tick. + +**Soft-expired announcements are excluded from persistence** because their bare repos have been deleted. Restoring them would leave purgatory entries pointing at non-existent repos. They are simply dropped; background sync will re-fetch the announcement event if needed. + --- ## Related Documentation diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index f5f8b31..9a63bf6 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -83,9 +83,35 @@ struct SerializablePrPurgatoryEntry { expires_at_offset_secs: u64, } +/// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. +/// +/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp +/// in `PurgatoryState`, allowing state to be persisted and restored across restarts. +/// +/// Note: soft-expired entries (bare repo deleted) are NOT persisted — they have +/// no git repo on disk and would be immediately cleaned up on restore anyway. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SerializableAnnouncementPurgatoryEntry { + /// The nostr announcement event (kind 30617) + event: Event, + /// The repository identifier from the event's 'd' tag + identifier: String, + /// The owner pubkey (event author) + owner: PublicKey, + /// Path to the bare git repository (must exist on disk) + repo_path: PathBuf, + /// Relay URLs from the announcement (for sync registration) + relays: HashSet, + /// Duration offset from saved_at for created_at + created_at_offset_secs: u64, + /// Duration offset from saved_at for expires_at + expires_at_offset_secs: u64, +} + /// Serializable purgatory state for disk persistence. /// /// Contains all purgatory data needed to restore state across restarts: +/// - Announcement events (indexed by (owner, identifier)) — non-soft-expired only /// - State events (indexed by identifier) /// - PR events (indexed by event ID) /// - Expired events (to prevent re-sync loops) @@ -97,6 +123,10 @@ struct PurgatoryState { version: u32, /// When this state was saved to disk saved_at: SystemTime, + /// Announcement events indexed by "owner_hex:identifier" + /// Only non-soft-expired entries are persisted (bare repo must exist). + #[serde(default)] + announcement_purgatory: HashMap, /// State events indexed by repository identifier state_events: HashMap>, /// PR events indexed by event ID (hex string) @@ -1114,6 +1144,34 @@ impl Purgatory { let saved_at = SystemTime::now(); let now_instant = Instant::now(); + // Convert announcement_purgatory to serializable format. + // Skip soft-expired entries: their bare repos have been deleted, so they + // cannot be meaningfully restored (the repo path no longer exists on disk). + let mut announcement_purgatory = HashMap::new(); + for entry in self.announcement_purgatory.iter() { + let e = entry.value(); + if e.soft_expired { + continue; + } + let created_offset = + persistence::instant_to_offset(e.created_at, saved_at, now_instant); + let expires_offset = + persistence::instant_to_offset(e.expires_at, saved_at, now_instant); + let key = format!("{}:{}", e.owner.to_hex(), e.identifier); + announcement_purgatory.insert( + key, + SerializableAnnouncementPurgatoryEntry { + event: e.event.clone(), + identifier: e.identifier.clone(), + owner: e.owner, + repo_path: e.repo_path.clone(), + relays: e.relays.clone(), + created_at_offset_secs: created_offset.as_secs(), + expires_at_offset_secs: expires_offset.as_secs(), + }, + ); + } + // Convert state_events to serializable format let mut state_events = HashMap::new(); for entry in self.state_events.iter() { @@ -1176,6 +1234,7 @@ impl Purgatory { let state = PurgatoryState { version: 1, saved_at, + announcement_purgatory, state_events, pr_events, expired_events, @@ -1187,6 +1246,7 @@ impl Purgatory { tracing::info!( path = %path.display(), + announcements = state.announcement_purgatory.len(), state_events = state.state_events.len(), pr_events = state.pr_events.len(), expired_events = state.expired_events.len(), @@ -1234,6 +1294,45 @@ impl Purgatory { let now_instant = Instant::now(); + // Restore announcement_purgatory. + // Skip entries whose bare repo no longer exists on disk — this can happen + // if the repo was deleted externally between save and restore. + for (_key, e) in state.announcement_purgatory { + if !e.repo_path.exists() { + tracing::warn!( + owner = %e.owner, + identifier = %e.identifier, + repo_path = %e.repo_path.display(), + "Skipping announcement restore: bare repo no longer exists" + ); + continue; + } + let created_at = persistence::offset_to_instant( + Duration::from_secs(e.created_at_offset_secs), + state.saved_at, + now_instant, + ); + let expires_at = persistence::offset_to_instant( + Duration::from_secs(e.expires_at_offset_secs), + state.saved_at, + now_instant, + ); + let key = (e.owner, e.identifier.clone()); + self.announcement_purgatory.insert( + key, + AnnouncementPurgatoryEntry { + event: e.event, + identifier: e.identifier, + owner: e.owner, + repo_path: e.repo_path, + relays: e.relays, + created_at, + expires_at, + soft_expired: false, + }, + ); + } + // Restore state_events for (identifier, entries) in state.state_events { let restored_entries: Vec = entries @@ -1301,6 +1400,7 @@ impl Purgatory { tracing::info!( path = %path.display(), + announcements = self.announcement_purgatory.len(), state_events = self.state_events.len(), pr_events = self.pr_events.len(), expired_events = self.expired_events.len(), @@ -2425,6 +2525,141 @@ async fn test_file_cleanup_after_successful_restore() { assert!(!state_file.exists()); } +#[tokio::test] +async fn test_save_and_restore_announcement_events() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + // Create a real bare repo directory so the restore path-existence check passes + let repo_dir = temp_dir.path().join("owner.git"); + std::fs::create_dir_all(&repo_dir).unwrap(); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + let ann_event = EventBuilder::text_note("announcement event") + .sign_with_keys(&keys) + .unwrap(); + let ann_event_id = ann_event.id; + + let mut relays = HashSet::new(); + relays.insert("wss://relay.example.com".to_string()); + + purgatory.add_announcement( + ann_event.clone(), + "my-repo".to_string(), + keys.public_key(), + repo_dir.clone(), + relays.clone(), + ); + + // Save to disk + purgatory.save_to_disk(&state_file).unwrap(); + assert!(state_file.exists()); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // File should be deleted after restore + assert!(!state_file.exists()); + + // Verify announcement was restored + let (ann_count, _, _) = purgatory2.count(); + assert_eq!(ann_count, 1); + + let restored = purgatory2 + .find_announcement(&keys.public_key(), "my-repo") + .unwrap(); + assert_eq!(restored.event.id, ann_event_id); + assert_eq!(restored.identifier, "my-repo"); + assert_eq!(restored.owner, keys.public_key()); + assert_eq!(restored.repo_path, repo_dir); + assert_eq!(restored.relays, relays); + assert!(!restored.soft_expired); +} + +#[tokio::test] +async fn test_soft_expired_announcements_not_persisted() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + let repo_dir = temp_dir.path().join("owner.git"); + std::fs::create_dir_all(&repo_dir).unwrap(); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + let ann_event = EventBuilder::text_note("announcement event") + .sign_with_keys(&keys) + .unwrap(); + + purgatory.add_announcement( + ann_event.clone(), + "my-repo".to_string(), + keys.public_key(), + repo_dir.clone(), + HashSet::new(), + ); + + // Manually mark as soft-expired (bare repo deleted) + let key = (keys.public_key(), "my-repo".to_string()); + if let Some(mut entry) = purgatory.announcement_purgatory.get_mut(&key) { + entry.soft_expired = true; + } + + // Save to disk — soft-expired entry should be excluded + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + // Soft-expired announcement should NOT be restored + let (ann_count, _, _) = purgatory2.count(); + assert_eq!(ann_count, 0); +} + +#[tokio::test] +async fn test_announcement_with_missing_repo_skipped_on_restore() { + use tempfile::tempdir; + + let temp_dir = tempdir().unwrap(); + let state_file = temp_dir.path().join("purgatory_state.json"); + + // Point to a repo path that does NOT exist + let missing_repo = temp_dir.path().join("nonexistent.git"); + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + let ann_event = EventBuilder::text_note("announcement event") + .sign_with_keys(&keys) + .unwrap(); + + purgatory.add_announcement( + ann_event.clone(), + "my-repo".to_string(), + keys.public_key(), + missing_repo.clone(), + HashSet::new(), + ); + + // Save to disk (repo path is serialized even though it doesn't exist) + purgatory.save_to_disk(&state_file).unwrap(); + + // Create new purgatory and restore — entry should be skipped + let purgatory2 = Purgatory::new(PathBuf::new()); + purgatory2.restore_from_disk(&state_file).unwrap(); + + let (ann_count, _, _) = purgatory2.count(); + assert_eq!(ann_count, 0); +} + #[tokio::test] async fn test_comprehensive_roundtrip() { use nostr_sdk::{Kind, Tag, TagKind}; @@ -2433,10 +2668,27 @@ async fn test_comprehensive_roundtrip() { let temp_dir = tempdir().unwrap(); let state_file = temp_dir.path().join("purgatory_state.json"); + // Create a real bare repo directory for the announcement + let repo_dir = temp_dir.path().join("owner.git"); + std::fs::create_dir_all(&repo_dir).unwrap(); + let purgatory = Purgatory::new(PathBuf::new()); let keys1 = Keys::generate(); let keys2 = Keys::generate(); + // Add announcement + let ann_event = EventBuilder::text_note("announcement") + .sign_with_keys(&keys1) + .unwrap(); + let ann_event_id = ann_event.id; + purgatory.add_announcement( + ann_event, + "repo1".to_string(), + keys1.public_key(), + repo_dir.clone(), + HashSet::new(), + ); + // Add multiple state events let state1 = EventBuilder::text_note("state 1") .sign_with_keys(&keys1) @@ -2476,7 +2728,8 @@ async fn test_comprehensive_roundtrip() { purgatory.cleanup(); // Verify initial state - let (_, state_count, pr_count) = purgatory.count(); + let (ann_count, state_count, pr_count) = purgatory.count(); + assert_eq!(ann_count, 1); // announcement assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) assert_eq!(pr_count, 2); // pr-1, pr-2 assert_eq!(purgatory.expired_count(), 1); // expired_event @@ -2489,11 +2742,18 @@ async fn test_comprehensive_roundtrip() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify all data was restored correctly - let (_, state_count2, pr_count2) = purgatory2.count(); + let (ann_count2, state_count2, pr_count2) = purgatory2.count(); + assert_eq!(ann_count2, 1); assert_eq!(state_count2, 2); assert_eq!(pr_count2, 2); assert_eq!(purgatory2.expired_count(), 1); + // Verify announcement + let restored_ann = purgatory2 + .find_announcement(&keys1.public_key(), "repo1") + .unwrap(); + assert_eq!(restored_ann.event.id, ann_event_id); + // Verify state events assert_eq!(purgatory2.find_state("repo1").len(), 1); assert_eq!(purgatory2.find_state("repo2").len(), 1); diff --git a/tests/common/purgatory_helpers.rs b/tests/common/purgatory_helpers.rs index 1d06f22..cfcea1c 100644 --- a/tests/common/purgatory_helpers.rs +++ b/tests/common/purgatory_helpers.rs @@ -338,6 +338,44 @@ pub fn build_repo_coord(keys: &Keys, identifier: &str) -> String { format!("30617:{}:{}", keys.public_key().to_hex(), identifier) } +/// Create a repository announcement event (kind 30617) for purgatory tests. +/// +/// Creates a minimal but valid NIP-34 repository announcement with a `d` tag, +/// optional `clone` URLs, and optional `relays` URLs. +/// +/// # Arguments +/// * `keys` - Keys for signing +/// * `identifier` - Repository identifier (d-tag) +/// * `clone_urls` - Clone URLs to include (may be empty) +/// * `relay_urls` - Relay URLs to include (may be empty) +/// +/// # Returns +/// * `Ok(Event)` - Signed announcement event +/// * `Err(String)` - If signing fails +pub fn create_announcement_event( + keys: &Keys, + identifier: &str, + clone_urls: &[&str], + relay_urls: &[&str], +) -> Result { + let mut tags = vec![Tag::identifier(identifier)]; + + if !clone_urls.is_empty() { + let urls: Vec = clone_urls.iter().map(|s| s.to_string()).collect(); + tags.push(Tag::custom(TagKind::custom("clone"), urls)); + } + + if !relay_urls.is_empty() { + let urls: Vec = relay_urls.iter().map(|s| s.to_string()).collect(); + tags.push(Tag::custom(TagKind::custom("relays"), urls)); + } + + EventBuilder::new(Kind::GitRepoAnnouncement, "") + .tags(tags) + .sign_with_keys(keys) + .map_err(|e| format!("Failed to sign announcement event: {}", e)) +} + /// Wait for an event to be served by a relay (not in purgatory). /// /// Polls the relay until the event is queryable, indicating it has diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs index 5abbf15..05cb44b 100644 --- a/tests/purgatory_persistence.rs +++ b/tests/purgatory_persistence.rs @@ -31,9 +31,11 @@ mod common; +use common::purgatory_helpers::create_announcement_event; use ngit_grasp::purgatory::Purgatory; use ngit_grasp::sync::rejected_index::{EventType, RejectedEventsIndex, RejectionReason}; use nostr_sdk::prelude::*; +use std::collections::HashSet; use std::time::Duration; /// Helper to create a test event @@ -116,12 +118,31 @@ async fn test_full_purgatory_save_restore_cycle() { // Add a PR placeholder (git-data-first scenario) purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-xyz".to_string()); - // Note: We can't directly test expired events without accessing private fields, - // so we'll focus on testing state and PR events persistence + // Add an announcement to purgatory (requires a real directory for the repo path) + let repo_dir = temp_dir.path().join("repo.git"); + std::fs::create_dir_all(&repo_dir).unwrap(); + let ann_keys = Keys::generate(); + let ann_event = create_announcement_event( + &ann_keys, + "my-repo", + &["http://example.com/my-repo.git"], + &["wss://relay.example.com"], + ) + .unwrap(); + let ann_event_id = ann_event.id; + let mut ann_relays = HashSet::new(); + ann_relays.insert("wss://relay.example.com".to_string()); + purgatory.add_announcement( + ann_event, + "my-repo".to_string(), + ann_keys.public_key(), + repo_dir.clone(), + ann_relays, + ); // Verify initial counts let (announcement_count, state_count, pr_count) = purgatory.count(); - assert_eq!(announcement_count, 0, "Should have 0 announcements"); + assert_eq!(announcement_count, 1, "Should have 1 announcement"); assert_eq!(state_count, 2, "Should have 2 state events"); assert_eq!( pr_count, 3, @@ -144,13 +165,22 @@ async fn test_full_purgatory_save_restore_cycle() { // Verify all data was restored let (announcement_count2, state_count2, pr_count2) = purgatory2.count(); - assert_eq!(announcement_count2, 0, "Should have 0 announcements after restore"); + assert_eq!(announcement_count2, 1, "Should have 1 announcement after restore"); assert_eq!(state_count2, 2, "Should have 2 state events after restore"); assert_eq!( pr_count2, 3, "Should have 3 PR events after restore (2 events + 1 placeholder)" ); + // Verify announcement was restored correctly + let restored_ann = purgatory2 + .find_announcement(&ann_keys.public_key(), "my-repo") + .expect("Announcement should be restored"); + assert_eq!(restored_ann.event.id, ann_event_id); + assert_eq!(restored_ann.identifier, "my-repo"); + assert_eq!(restored_ann.repo_path, repo_dir); + assert!(!restored_ann.soft_expired); + // Verify specific state events let repo1_states = purgatory2.find_state("repo1"); assert_eq!(repo1_states.len(), 1); @@ -748,3 +778,100 @@ async fn test_rejected_cache_entries_expired_during_downtime() { assert_eq!(index2.hot_cache_len(), 0); assert_eq!(index2.cold_index_len(), 1); } + +/// Test 18: Announcement events are saved and restored across restarts +#[tokio::test] +async fn test_announcement_save_restore_cycle() { + let temp_dir = tempfile::tempdir().unwrap(); + let git_data_path = temp_dir.path().join("git"); + let state_path = temp_dir.path().join("purgatory.json"); + + // Create a real bare repo directory (restore skips entries whose path is missing) + let repo_dir = temp_dir.path().join("owner.git"); + std::fs::create_dir_all(&repo_dir).unwrap(); + + let purgatory = Purgatory::new(&git_data_path); + let keys = Keys::generate(); + + let ann_event = create_announcement_event( + &keys, + "my-repo", + &["http://example.com/my-repo.git"], + &["wss://relay.example.com"], + ) + .unwrap(); + let ann_event_id = ann_event.id; + + let mut relays = HashSet::new(); + relays.insert("wss://relay.example.com".to_string()); + + purgatory.add_announcement( + ann_event, + "my-repo".to_string(), + keys.public_key(), + repo_dir.clone(), + relays.clone(), + ); + + let (ann_count, _, _) = purgatory.count(); + assert_eq!(ann_count, 1); + + // Save to disk + purgatory.save_to_disk(&state_path).unwrap(); + assert!(state_path.exists()); + + // Restore into a fresh purgatory + let purgatory2 = Purgatory::new(&git_data_path); + purgatory2.restore_from_disk(&state_path).unwrap(); + + assert!(!state_path.exists(), "State file should be deleted after restore"); + + let (ann_count2, _, _) = purgatory2.count(); + assert_eq!(ann_count2, 1, "Announcement should be restored"); + + let restored = purgatory2 + .find_announcement(&keys.public_key(), "my-repo") + .expect("Announcement should be findable after restore"); + + assert_eq!(restored.event.id, ann_event_id); + assert_eq!(restored.identifier, "my-repo"); + assert_eq!(restored.owner, keys.public_key()); + assert_eq!(restored.repo_path, repo_dir); + assert_eq!(restored.relays, relays); + assert!(!restored.soft_expired); +} + +/// Test 19: Announcement with missing repo path is skipped on restore +#[tokio::test] +async fn test_announcement_missing_repo_skipped_on_restore() { + let temp_dir = tempfile::tempdir().unwrap(); + let git_data_path = temp_dir.path().join("git"); + let state_path = temp_dir.path().join("purgatory.json"); + + // Point to a path that does NOT exist on disk + let missing_repo = temp_dir.path().join("nonexistent.git"); + + let purgatory = Purgatory::new(&git_data_path); + let keys = Keys::generate(); + + let ann_event = create_announcement_event(&keys, "my-repo", &[], &[]).unwrap(); + + purgatory.add_announcement( + ann_event, + "my-repo".to_string(), + keys.public_key(), + missing_repo, + HashSet::new(), + ); + + purgatory.save_to_disk(&state_path).unwrap(); + + let purgatory2 = Purgatory::new(&git_data_path); + purgatory2.restore_from_disk(&state_path).unwrap(); + + let (ann_count, _, _) = purgatory2.count(); + assert_eq!( + ann_count, 0, + "Announcement with missing repo path must be skipped" + ); +} -- cgit v1.2.3