From 1d09e4bdea7e328cf2740818df9df660c5532a99 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 13:24:46 +0000 Subject: feat: implement announcement purgatory core (breaks archive sync test) Route new announcements to purgatory instead of accepting immediately. Announcements are promoted to the database when git data arrives, ensuring we only serve announcements for repos with actual content. Implemented: - AnnouncementPurgatoryEntry type and DashMap store - Route new announcements to purgatory (replacement announcements skip) - Promote announcements on git data arrival (process_purgatory_announcements) - Authorization checks purgatory announcements (fetch_repository_data_with_purgatory) - State policy uses purgatory announcements for maintainer validation - Cleanup task handles announcement expiry - Updated count()/cleanup() to 3-tuples Known broken: - test_archive_read_only_creates_bare_repo fails: sync module does not treat purgatory announcements as confirmed repos, so per-repo sync (state events, PRs) is never triggered for purgatory announcements - Announcement persistence (save/restore) not implemented - SyncLevel (StateOnly vs Full) not implemented - Soft expiry two-phase not implemented - Expiry extension on state event / git auth not wired up --- src/git/authorization.rs | 38 +++++++++++++++- src/git/sync.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 145 insertions(+), 3 deletions(-) (limited to 'src/git') diff --git a/src/git/authorization.rs b/src/git/authorization.rs index e174b51..9d53c4f 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs @@ -287,6 +287,39 @@ pub async fn fetch_repository_data( }) } +/// Fetch repository data including announcements from purgatory +/// +/// This combines database announcements with purgatory announcements, +/// which is needed for authorization when the announcement hasn't been +/// promoted yet (no git data has arrived). +pub async fn fetch_repository_data_with_purgatory( + database: &SharedDatabase, + purgatory: &crate::purgatory::Purgatory, + identifier: &str, +) -> Result { + // First, fetch from database + let mut repo_data = fetch_repository_data(database, identifier).await?; + + // Then, add announcements from purgatory + let purgatory_announcements = purgatory.get_announcements_by_identifier(identifier); + let purgatory_count = purgatory_announcements.len(); + + for entry in purgatory_announcements { + if let Ok(announcement) = RepositoryAnnouncement::from_event(entry.event) { + repo_data.announcements.push(announcement); + } + } + + debug!( + "Fetched repository data with purgatory: {} announcements ({} from purgatory), {} states", + repo_data.announcements.len(), + purgatory_count, + repo_data.states.len() + ); + + Ok(repo_data) +} + pub fn pubkey_authorised_for_repo_owners( pubkey: &PublicKey, db_repo_data: &RepositoryData, @@ -539,8 +572,9 @@ pub async fn get_state_authorization_for_specific_owner_repo( use crate::git::list_refs; use crate::purgatory::RefUpdate; - // Fetch announcements only - we don't need database states - let repo_data = fetch_repository_data(database, identifier).await?; + // Fetch announcements from database AND purgatory - needed for authorization + // when the announcement hasn't been promoted yet (no git data has arrived) + let repo_data = fetch_repository_data_with_purgatory(database, purgatory, identifier).await?; if repo_data.announcements.is_empty() { return Ok(AuthorizationResult::denied( diff --git a/src/git/sync.rs b/src/git/sync.rs index e8e9655..13f30b6 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -51,6 +51,8 @@ use crate::purgatory::{can_apply_state, Purgatory}; /// or from purgatory sync fetching OIDs from remote servers). #[derive(Debug, Default, Clone)] pub struct ProcessResult { + /// Number of announcements released from purgatory + pub announcements_released: usize, /// Number of state events released from purgatory pub states_released: usize, /// Number of PR events released from purgatory @@ -70,11 +72,12 @@ pub struct ProcessResult { impl ProcessResult { /// Check if any events were released pub fn released_any(&self) -> bool { - self.states_released > 0 || self.prs_released > 0 + self.announcements_released > 0 || self.states_released > 0 || self.prs_released > 0 } /// Merge another ProcessResult into this one pub fn merge(&mut self, other: ProcessResult) { + self.announcements_released += other.announcements_released; self.states_released += other.states_released; self.prs_released += other.prs_released; self.repos_synced += other.repos_synced; @@ -836,6 +839,18 @@ pub async fn process_newly_available_git_data( "Processing newly available git data" ); + // Process announcements from purgatory + let announcement_result = process_purgatory_announcements( + &identifier, + source_repo_path, + database, + local_relay, + purgatory, + git_data_path, + ) + .await; + result.merge(announcement_result); + // Process state events from purgatory let state_result = process_purgatory_state_events( &identifier, @@ -863,6 +878,7 @@ pub async fn process_newly_available_git_data( if result.released_any() { info!( identifier = %identifier, + announcements_released = result.announcements_released, states_released = result.states_released, prs_released = result.prs_released, repos_synced = result.repos_synced, @@ -1250,6 +1266,90 @@ async fn process_purgatory_pr_events( result } +/// Process announcements from purgatory that can now be promoted. +/// +/// When git data arrives for a repository, any announcements in purgatory +/// for that repository should be promoted to the database and served to clients. +async fn process_purgatory_announcements( + identifier: &str, + source_repo_path: &Path, + database: &SharedDatabase, + local_relay: Option<&nostr_relay_builder::LocalRelay>, + purgatory: &Purgatory, + git_data_path: &Path, +) -> ProcessResult { + let mut result = ProcessResult::default(); + + // Extract owner pubkey from the source repo path + let owner_pubkey = match extract_owner_from_repo_path(source_repo_path, git_data_path) { + Some(npub) => npub, + None => { + debug!( + identifier = %identifier, + "Could not extract owner from repo path" + ); + return result; + } + }; + + // Parse the npub back to PublicKey + let owner = match nostr_sdk::PublicKey::parse(&owner_pubkey) { + Ok(pk) => pk, + Err(e) => { + warn!( + identifier = %identifier, + owner_pubkey = %owner_pubkey, + error = %e, + "Failed to parse owner pubkey" + ); + result.errors.push(format!("Failed to parse owner pubkey: {}", e)); + return result; + } + }; + + // Check if there's an announcement in purgatory for this owner and identifier + let announcement_event = purgatory.promote_announcement(&owner, identifier); + + if let Some(event) = announcement_event { + // Save to database + match database.save_event(&event).await { + Ok(_) => { + info!( + identifier = %identifier, + event_id = %event.id, + "Promoted announcement from purgatory to database" + ); + + // Notify WebSocket subscribers + if let Some(relay) = local_relay { + if relay.notify_event(event.clone()) { + debug!( + identifier = %identifier, + event_id = %event.id, + "Broadcast announcement event to WebSocket listeners" + ); + } + } + + result.announcements_released += 1; + } + Err(e) => { + warn!( + identifier = %identifier, + event_id = %event.id, + error = %e, + "Failed to save announcement to database" + ); + result + .errors + .push(format!("Failed to save announcement: {}", e)); + } + } + } + + result +} + /// Extract owner pubkey from a repository path. /// /// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. @@ -1271,6 +1371,7 @@ mod tests { #[test] fn test_process_result_default() { let result = ProcessResult::default(); + assert_eq!(result.announcements_released, 0); assert_eq!(result.states_released, 0); assert_eq!(result.prs_released, 0); assert_eq!(result.repos_synced, 0); @@ -1282,6 +1383,10 @@ mod tests { let mut result = ProcessResult::default(); assert!(!result.released_any()); + result.announcements_released = 1; + assert!(result.released_any()); + + result.announcements_released = 0; result.states_released = 1; assert!(result.released_any()); @@ -1293,6 +1398,7 @@ mod tests { #[test] fn test_process_result_merge() { let mut result1 = ProcessResult { + announcements_released: 0, states_released: 1, prs_released: 2, repos_synced: 3, @@ -1303,6 +1409,7 @@ mod tests { }; let result2 = ProcessResult { + announcements_released: 5, states_released: 10, prs_released: 20, repos_synced: 30, @@ -1314,6 +1421,7 @@ mod tests { result1.merge(result2); + assert_eq!(result1.announcements_released, 5); assert_eq!(result1.states_released, 11); assert_eq!(result1.prs_released, 22); assert_eq!(result1.repos_synced, 33); -- cgit v1.2.3 From efbbcc49ae8e8f598a24c939b35ad9cda0541663 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 17 Feb 2026 10:58:01 +0000 Subject: fix: include purgatory announcements in state event authorization When processing state events from purgatory, we need to check authorization against announcements that may still be in purgatory (not yet promoted to the database). Previously, process_purgatory_state_events() used fetch_repository_data() which only queries the database. This caused authorization failures when: 1. Git data arrives 2. Announcement is promoted from purgatory to database 3. State events are processed from purgatory 4. But db_repo_data was fetched BEFORE the announcement promotion Now uses fetch_repository_data_with_purgatory() to include both database and purgatory announcements, ensuring authorization works correctly regardless of promotion timing. --- src/git/sync.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/git') diff --git a/src/git/sync.rs b/src/git/sync.rs index 13f30b6..a0b7c47 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -37,7 +37,8 @@ use tracing::{debug, info, warn}; use nostr_sdk::Event; use crate::git::authorization::{ - collect_authorized_maintainers, fetch_repository_data, RepositoryData, + collect_authorized_maintainers, fetch_repository_data, fetch_repository_data_with_purgatory, + RepositoryData, }; use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; @@ -923,7 +924,10 @@ async fn process_purgatory_state_events( ); // Fetch repository data once for all state events - let mut db_repo_data = match fetch_repository_data(database, identifier).await { + // IMPORTANT: Use fetch_repository_data_with_purgatory to include announcements + // that may still be in purgatory (not yet promoted). This ensures authorization + // works correctly even if the announcement promotion happens in the same batch. + let mut db_repo_data = match fetch_repository_data_with_purgatory(database, purgatory, identifier).await { Ok(data) => data, Err(e) => { warn!( -- cgit v1.2.3 From cad58fccae7ed84bb033e56de0f1323b714a854d Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 17 Feb 2026 11:43:40 +0000 Subject: docs: clarify why fetch_repository_data excludes purgatory Add comments explaining that PR event processing (both incoming and purgatory) should only use database announcements, not purgatory ones. This is intentional because: - Incoming PR events should only be accepted for validated announcements - Purgatory PR events should only be released when announcement is promoted - This prevents accepting PR events for announcements that fail validation Differs from state event processing which uses fetch_repository_data_with_purgatory because state events check authorization without releasing from purgatory. --- src/git/sync.rs | 3 +++ src/nostr/policy/pr_event.rs | 8 ++++++++ 2 files changed, 11 insertions(+) (limited to 'src/git') diff --git a/src/git/sync.rs b/src/git/sync.rs index a0b7c47..4b35023 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -1171,6 +1171,9 @@ async fn process_purgatory_pr_events( ); // Fetch repository data for syncing + // NOTE: Only fetch from database, NOT purgatory. PR events should only be + // released from purgatory when the announcement has been promoted (validated). + // This ensures we don't accept PR events for announcements that fail validation. let db_repo_data = match fetch_repository_data(database, identifier).await { Ok(data) => data, Err(e) => { diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index 00e09c3..072e445 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs @@ -127,6 +127,10 @@ impl PrEventPolicy { .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; // Fetch repository data + // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should + // only be accepted for announcements that have been promoted (validated). + // If the announcement is still in purgatory, the PR event should also go + // to purgatory and wait for the announcement to be promoted. let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; // Extract owner pubkey from source repo path @@ -203,6 +207,10 @@ impl PrEventPolicy { let identifier = parts[2]; // 2. Fetch repo data + // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should + // only be accepted for announcements that have been promoted (validated). + // If the announcement is still in purgatory, the PR event should also go + // to purgatory and wait for the announcement to be promoted. let db_repo_data = fetch_repository_data(&self.ctx.database, identifier).await?; // 3. Extract list of maintainers from "a 30617::" tags -- cgit v1.2.3 From d76003b629a4a03dba23a8a1c41da6e4ac4c30cf Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:17 +0000 Subject: feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion When git data arrives for a purgatory announcement and promotes it to the database, the relay now: 1. Upgrades the announcement's sync level in RepoSyncIndex from StateOnly to Full (git/sync.rs: process_purgatory_announcements) 2. Sends AddFilters actions to SyncManager for all connected relays, using Full sync filters (Layer 2 #a/#A/#q) to subscribe to PR events (purgatory/sync/context.rs: RealSyncContext.process_newly_available_git_data) 3. For user-submitted purgatory announcements, registers the repo in RepoSyncIndex with StateOnly level and sends AddFilters to SyncManager so it discovers and connects to relays listed in the announcement tags (nostr/builder.rs: handle_announcement AcceptPurgatory path) The RealSyncContext now accepts optional repo_sync_index and sync_action_tx parameters. main.rs wires these up from SyncManager. PolicyContext gains repo_sync_index and sync_action_tx fields for the write policy path. --- src/git/handlers.rs | 1 + src/git/sync.rs | 21 ++++++++ src/main.rs | 20 +++++++ src/nostr/builder.rs | 118 ++++++++++++++++++++++++++++++++++++++++++ src/nostr/policy/mod.rs | 37 +++++++++++++ src/purgatory/sync/context.rs | 116 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 313 insertions(+) (limited to 'src/git') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..129ca2c 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,6 +307,7 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + None, ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index 4b35023..b3fa11a 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -44,6 +44,7 @@ use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; +use crate::sync::{RepoSyncIndex, SyncLevel}; /// Result of processing newly available git data. /// @@ -809,6 +810,7 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) /// * `purgatory` - Purgatory instance to check for satisfiable events /// * `git_data_path` - Base path for git repositories +/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion /// /// # Returns /// A `ProcessResult` describing what was processed @@ -819,6 +821,7 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + repo_sync_index: Option, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -848,6 +851,7 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, + repo_sync_index.as_ref(), ) .await; result.merge(announcement_result); @@ -1284,6 +1288,7 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + repo_sync_index: Option<&RepoSyncIndex>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1338,6 +1343,22 @@ async fn process_purgatory_announcements( } } + // Upgrade sync level to Full in repo_sync_index + if let Some(index) = repo_sync_index { + let mut index = index.write().await; + // Use hex pubkey format to match how repo_sync_index keys are built + // (sync/mod.rs uses event.pubkey which is hex, not bech32) + let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); + if let Some(entry) = index.get_mut(&repo_id) { + entry.sync_level = SyncLevel::Full; + debug!( + identifier = %identifier, + repo_id = %repo_id, + "Upgraded sync level to Full after announcement promotion" + ); + } + } + result.announcements_released += 1; } Err(e) => { diff --git a/src/main.rs b/src/main.rs index ab6ede7..3ff30fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,24 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); + // Get a reference to the repo sync index for upgrading sync levels on promotion + let repo_sync_index = sync_manager.repo_sync_index(); + + // Set the repo sync index on the write policy so user-submitted purgatory + // announcements can trigger relay discovery (connect to relays in announcement tags) + relay_with_db + .write_policy + .set_repo_sync_index(repo_sync_index.clone()); + + // Get the action sender BEFORE consuming sync_manager with spawn + let action_tx = sync_manager.action_tx(); + + // Set the sync action sender so the write policy can trigger relay connections + // when user-submitted purgatory announcements are registered with StateOnly level + if let Some(tx) = action_tx.clone() { + relay_with_db.write_policy.set_sync_action_tx(tx); + } + tokio::spawn(async move { sync_manager.run().await; }); @@ -184,6 +202,8 @@ async fn main() -> Result<()> { Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), + Some(repo_sync_index), + action_tx, )); // Create throttle manager for rate limiting remote git servers diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..4c66f6d 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,6 +98,24 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } + /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. + /// + /// When a user submits an announcement that goes to purgatory (no git data yet), + /// the relay needs to discover and connect to relays listed in the announcement's + /// `relays` and `clone` tags. This index is updated when the announcement is accepted + /// into purgatory, triggering the sync system to connect and sync state events. + pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { + self.ctx.set_repo_sync_index(index); + } + + /// Set the sync action sender for sending AddFilters actions to SyncManager. + /// + /// This allows the write policy to notify the SyncManager when user-submitted + /// purgatory announcements need relay discovery (triggering new connections). + pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { + self.ctx.set_sync_action_tx(tx); + } + /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -146,6 +164,106 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); + + // Register in repo_sync_index with StateOnly level so the sync + // system discovers and connects to relays listed in this announcement. + // This is needed for user-submitted announcements (not via sync path) + // to trigger relay discovery and state event sync. + if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].to_string()) + } else { + None + } + }) { + let repo_id = + format!("30617:{}:{}", event.pubkey, identifier); + + // Get relay URLs stored in purgatory for this announcement + let relays = self + .ctx + .purgatory + .find_announcement(&event.pubkey, &identifier) + .map(|entry| entry.relays) + .unwrap_or_default(); + + if !relays.is_empty() { + use crate::sync::{ + AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, + }; + + // Update repo_sync_index with StateOnly for this repo + let new_repos = { + let mut index = repo_sync_index.write().await; + let entry = + index.entry(repo_id.clone()).or_insert_with(|| { + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + entry.relays.extend(relays.iter().cloned()); + // Don't upgrade if already Full + tracing::info!( + repo_id = %repo_id, + relay_count = entry.relays.len(), + "Registered user-submitted purgatory announcement in \ + RepoSyncIndex with StateOnly level for relay discovery" + ); + // Return cloned relays for AddFilters + relays.clone() + }; + + // Send AddFilters to SyncManager so it connects to these relays + if let Some(tx) = self.ctx.get_sync_action_tx() { + // Build state-only filters for this repo + let state_only_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &std::collections::HashSet::new(), + &state_only_repos, + &std::collections::HashSet::new(), + None, + ); + + for relay_url in new_repos { + // Skip our own domain + if relay_url.contains(&self.ctx.domain) { + continue; + } + let action = AddFilters { + relay_url: relay_url.clone(), + items: PendingItems { + repos: state_only_repos.clone(), + root_events: std::collections::HashSet::new(), + }, + filters: filters.clone(), + }; + if let Err(e) = tx.send(action).await { + tracing::warn!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters action for \ + user-submitted purgatory announcement" + ); + } else { + tracing::info!( + relay = %relay_url, + repo_id = %repo_id, + "Sent AddFilters to SyncManager for \ + user-submitted purgatory announcement relay" + ); + } + } + } + } + } + } + WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..78a09fc 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; +use crate::sync::{AddFilters, RepoSyncIndex}; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -34,6 +35,16 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, + /// Optional repo sync index for triggering relay discovery when announcements + /// go to purgatory via user submission (not via the sync path). + /// Wrapped in Arc for interior mutability (PolicyContext is Clone). + pub repo_sync_index: Arc>>, + /// Optional sender for AddFilters actions to SyncManager. + /// Used to trigger relay discovery when user-submitted purgatory announcements + /// are registered with StateOnly sync level. + /// Wrapped in Arc for interior mutability (PolicyContext is Clone). + pub sync_action_tx: + Arc>>>, } impl PolicyContext { @@ -51,6 +62,8 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, + repo_sync_index: Arc::new(std::sync::RwLock::new(None)), + sync_action_tx: Arc::new(std::sync::RwLock::new(None)), } } @@ -68,4 +81,28 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } + + /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + let mut guard = self.repo_sync_index.write().unwrap(); + *guard = Some(index); + } + + /// Get a clone of the repo sync index if it's been set. + pub fn get_repo_sync_index(&self) -> Option { + let guard = self.repo_sync_index.read().unwrap(); + guard.clone() + } + + /// Set the sync action sender for sending AddFilters actions to SyncManager. + pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { + let mut guard = self.sync_action_tx.write().unwrap(); + *guard = Some(tx); + } + + /// Get a clone of the sync action sender if it's been set. + pub fn get_sync_action_tx(&self) -> Option> { + let guard = self.sync_action_tx.read().unwrap(); + guard.clone() + } } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..4dbb402 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -193,6 +193,7 @@ use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::Purgatory; use crate::sync::naughty_list::NaughtyListTracker; +use crate::sync::RepoSyncIndex; use super::functions::extract_domain; @@ -221,6 +222,13 @@ pub struct RealSyncContext { /// Naughty list tracker for git remote domains with persistent errors git_naughty_list: Arc, + + /// Optional repo sync index for upgrading sync level on promotion + repo_sync_index: Option, + + /// Optional sender for AddFilters actions to SyncManager. + /// Used after announcement promotion to trigger PR event subscription on connected relays. + sync_action_tx: Option>, } impl RealSyncContext { @@ -233,6 +241,9 @@ impl RealSyncContext { /// * `our_domain` - Our domain to exclude from clone URLs /// * `local_relay` - Local relay for WebSocket notifications /// * `git_naughty_list` - Naughty list tracker for git remote domains + /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion + /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion + #[allow(clippy::too_many_arguments)] pub fn new( purgatory: Arc, database: SharedDatabase, @@ -240,6 +251,8 @@ impl RealSyncContext { our_domain: Option, local_relay: Option, git_naughty_list: Arc, + repo_sync_index: Option, + sync_action_tx: Option>, ) -> Self { Self { purgatory, @@ -248,9 +261,23 @@ impl RealSyncContext { our_domain_value: our_domain, local_relay, git_naughty_list, + repo_sync_index, + sync_action_tx, } } + /// Set the sync action sender for triggering filter recomputation after announcement promotion. + /// + /// When an announcement is promoted from purgatory to Full sync level, the SyncManager + /// needs to subscribe to PR events for that repo on all connected relays. This sender + /// is used to trigger that subscription. + pub fn set_sync_action_tx( + &mut self, + tx: tokio::sync::mpsc::Sender, + ) { + self.sync_action_tx = Some(tx); + } + /// Get reference to the git naughty list tracker pub fn git_naughty_list(&self) -> &Arc { &self.git_naughty_list @@ -482,9 +509,98 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, + self.repo_sync_index.clone(), ) .await?; + // If announcements were promoted (now Full sync level), notify SyncManager to + // recompute filters so PR event subscriptions are created on connected relays. + if result.announcements_released > 0 { + if let (Some(ref tx), Some(ref repo_sync_index)) = + (&self.sync_action_tx, &self.repo_sync_index) + { + let index = repo_sync_index.read().await; + for (repo_id, needs) in index.iter() { + if needs.sync_level == crate::sync::SyncLevel::Full + && !needs.root_events.is_empty() + { + // Send AddFilters for Full repos with root events + for relay_url in &needs.relays { + if let Some(ref domain) = self.our_domain_value { + if relay_url.contains(domain.as_str()) { + continue; + } + } + let full_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &full_repos, + &std::collections::HashSet::new(), + &needs.root_events, + None, + ); + let action = crate::sync::AddFilters { + relay_url: relay_url.clone(), + items: crate::sync::PendingItems { + repos: full_repos.clone(), + root_events: needs.root_events.clone(), + }, + filters, + }; + if let Err(e) = tx.send(action).await { + debug!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters after announcement promotion" + ); + } else { + debug!( + relay = %relay_url, + repo_id = %repo_id, + "Sent AddFilters to SyncManager after announcement promotion" + ); + } + } + } else if needs.sync_level == crate::sync::SyncLevel::Full { + // Even without root_events, send empty repo filter to ensure + // Layer 2 subscriptions (PR events) are set up + for relay_url in &needs.relays { + if let Some(ref domain) = self.our_domain_value { + if relay_url.contains(domain.as_str()) { + continue; + } + } + let full_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &full_repos, + &std::collections::HashSet::new(), + &std::collections::HashSet::new(), + None, + ); + let action = crate::sync::AddFilters { + relay_url: relay_url.clone(), + items: crate::sync::PendingItems { + repos: full_repos.clone(), + root_events: std::collections::HashSet::new(), + }, + filters, + }; + if let Err(e) = tx.send(action).await { + debug!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters (no root_events) after announcement promotion" + ); + } + } + } + } + } + } + // Convert from git::sync::ProcessResult to our ProcessResult Ok(ProcessResult { states_released: result.states_released, -- cgit v1.2.3 From 3d9359d5ac0045fb93fd8732160e0de8413d6881 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:28:28 +0000 Subject: Revert "feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion" This reverts commit d76003b629a4a03dba23a8a1c41da6e4ac4c30cf. --- src/git/handlers.rs | 1 - src/git/sync.rs | 21 -------- src/main.rs | 20 ------- src/nostr/builder.rs | 118 ------------------------------------------ src/nostr/policy/mod.rs | 37 ------------- src/purgatory/sync/context.rs | 116 ----------------------------------------- 6 files changed, 313 deletions(-) (limited to 'src/git') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 129ca2c..017eee4 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,7 +307,6 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, - None, ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index b3fa11a..4b35023 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -44,7 +44,6 @@ use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; -use crate::sync::{RepoSyncIndex, SyncLevel}; /// Result of processing newly available git data. /// @@ -810,7 +809,6 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) /// * `purgatory` - Purgatory instance to check for satisfiable events /// * `git_data_path` - Base path for git repositories -/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion /// /// # Returns /// A `ProcessResult` describing what was processed @@ -821,7 +819,6 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -851,7 +848,6 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, - repo_sync_index.as_ref(), ) .await; result.merge(announcement_result); @@ -1288,7 +1284,6 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option<&RepoSyncIndex>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1343,22 +1338,6 @@ async fn process_purgatory_announcements( } } - // Upgrade sync level to Full in repo_sync_index - if let Some(index) = repo_sync_index { - let mut index = index.write().await; - // Use hex pubkey format to match how repo_sync_index keys are built - // (sync/mod.rs uses event.pubkey which is hex, not bech32) - let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); - if let Some(entry) = index.get_mut(&repo_id) { - entry.sync_level = SyncLevel::Full; - debug!( - identifier = %identifier, - repo_id = %repo_id, - "Upgraded sync level to Full after announcement promotion" - ); - } - } - result.announcements_released += 1; } Err(e) => { diff --git a/src/main.rs b/src/main.rs index 3ff30fb..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,24 +132,6 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); - // Get a reference to the repo sync index for upgrading sync levels on promotion - let repo_sync_index = sync_manager.repo_sync_index(); - - // Set the repo sync index on the write policy so user-submitted purgatory - // announcements can trigger relay discovery (connect to relays in announcement tags) - relay_with_db - .write_policy - .set_repo_sync_index(repo_sync_index.clone()); - - // Get the action sender BEFORE consuming sync_manager with spawn - let action_tx = sync_manager.action_tx(); - - // Set the sync action sender so the write policy can trigger relay connections - // when user-submitted purgatory announcements are registered with StateOnly level - if let Some(tx) = action_tx.clone() { - relay_with_db.write_policy.set_sync_action_tx(tx); - } - tokio::spawn(async move { sync_manager.run().await; }); @@ -202,8 +184,6 @@ async fn main() -> Result<()> { Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), - Some(repo_sync_index), - action_tx, )); // Create throttle manager for rate limiting remote git servers diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 4c66f6d..aff12a6 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,24 +98,6 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - /// - /// When a user submits an announcement that goes to purgatory (no git data yet), - /// the relay needs to discover and connect to relays listed in the announcement's - /// `relays` and `clone` tags. This index is updated when the announcement is accepted - /// into purgatory, triggering the sync system to connect and sync state events. - pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { - self.ctx.set_repo_sync_index(index); - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - /// - /// This allows the write policy to notify the SyncManager when user-submitted - /// purgatory announcements need relay discovery (triggering new connections). - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - self.ctx.set_sync_action_tx(tx); - } - /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -164,106 +146,6 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); - - // Register in repo_sync_index with StateOnly level so the sync - // system discovers and connects to relays listed in this announcement. - // This is needed for user-submitted announcements (not via sync path) - // to trigger relay discovery and state event sync. - if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = - format!("30617:{}:{}", event.pubkey, identifier); - - // Get relay URLs stored in purgatory for this announcement - let relays = self - .ctx - .purgatory - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - if !relays.is_empty() { - use crate::sync::{ - AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, - }; - - // Update repo_sync_index with StateOnly for this repo - let new_repos = { - let mut index = repo_sync_index.write().await; - let entry = - index.entry(repo_id.clone()).or_insert_with(|| { - RepoSyncNeeds { - relays: std::collections::HashSet::new(), - root_events: std::collections::HashSet::new(), - sync_level: SyncLevel::StateOnly, - } - }); - entry.relays.extend(relays.iter().cloned()); - // Don't upgrade if already Full - tracing::info!( - repo_id = %repo_id, - relay_count = entry.relays.len(), - "Registered user-submitted purgatory announcement in \ - RepoSyncIndex with StateOnly level for relay discovery" - ); - // Return cloned relays for AddFilters - relays.clone() - }; - - // Send AddFilters to SyncManager so it connects to these relays - if let Some(tx) = self.ctx.get_sync_action_tx() { - // Build state-only filters for this repo - let state_only_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &std::collections::HashSet::new(), - &state_only_repos, - &std::collections::HashSet::new(), - None, - ); - - for relay_url in new_repos { - // Skip our own domain - if relay_url.contains(&self.ctx.domain) { - continue; - } - let action = AddFilters { - relay_url: relay_url.clone(), - items: PendingItems { - repos: state_only_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters: filters.clone(), - }; - if let Err(e) = tx.send(action).await { - tracing::warn!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters action for \ - user-submitted purgatory announcement" - ); - } else { - tracing::info!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager for \ - user-submitted purgatory announcement relay" - ); - } - } - } - } - } - } - WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 78a09fc..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; -use crate::sync::{AddFilters, RepoSyncIndex}; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -35,16 +34,6 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, - /// Optional repo sync index for triggering relay discovery when announcements - /// go to purgatory via user submission (not via the sync path). - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub repo_sync_index: Arc>>, - /// Optional sender for AddFilters actions to SyncManager. - /// Used to trigger relay discovery when user-submitted purgatory announcements - /// are registered with StateOnly sync level. - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub sync_action_tx: - Arc>>>, } impl PolicyContext { @@ -62,8 +51,6 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, - repo_sync_index: Arc::new(std::sync::RwLock::new(None)), - sync_action_tx: Arc::new(std::sync::RwLock::new(None)), } } @@ -81,28 +68,4 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } - - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - let mut guard = self.repo_sync_index.write().unwrap(); - *guard = Some(index); - } - - /// Get a clone of the repo sync index if it's been set. - pub fn get_repo_sync_index(&self) -> Option { - let guard = self.repo_sync_index.read().unwrap(); - guard.clone() - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - let mut guard = self.sync_action_tx.write().unwrap(); - *guard = Some(tx); - } - - /// Get a clone of the sync action sender if it's been set. - pub fn get_sync_action_tx(&self) -> Option> { - let guard = self.sync_action_tx.read().unwrap(); - guard.clone() - } } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 4dbb402..3568e89 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -193,7 +193,6 @@ use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::Purgatory; use crate::sync::naughty_list::NaughtyListTracker; -use crate::sync::RepoSyncIndex; use super::functions::extract_domain; @@ -222,13 +221,6 @@ pub struct RealSyncContext { /// Naughty list tracker for git remote domains with persistent errors git_naughty_list: Arc, - - /// Optional repo sync index for upgrading sync level on promotion - repo_sync_index: Option, - - /// Optional sender for AddFilters actions to SyncManager. - /// Used after announcement promotion to trigger PR event subscription on connected relays. - sync_action_tx: Option>, } impl RealSyncContext { @@ -241,9 +233,6 @@ impl RealSyncContext { /// * `our_domain` - Our domain to exclude from clone URLs /// * `local_relay` - Local relay for WebSocket notifications /// * `git_naughty_list` - Naughty list tracker for git remote domains - /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion - /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion - #[allow(clippy::too_many_arguments)] pub fn new( purgatory: Arc, database: SharedDatabase, @@ -251,8 +240,6 @@ impl RealSyncContext { our_domain: Option, local_relay: Option, git_naughty_list: Arc, - repo_sync_index: Option, - sync_action_tx: Option>, ) -> Self { Self { purgatory, @@ -261,23 +248,9 @@ impl RealSyncContext { our_domain_value: our_domain, local_relay, git_naughty_list, - repo_sync_index, - sync_action_tx, } } - /// Set the sync action sender for triggering filter recomputation after announcement promotion. - /// - /// When an announcement is promoted from purgatory to Full sync level, the SyncManager - /// needs to subscribe to PR events for that repo on all connected relays. This sender - /// is used to trigger that subscription. - pub fn set_sync_action_tx( - &mut self, - tx: tokio::sync::mpsc::Sender, - ) { - self.sync_action_tx = Some(tx); - } - /// Get reference to the git naughty list tracker pub fn git_naughty_list(&self) -> &Arc { &self.git_naughty_list @@ -509,98 +482,9 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, - self.repo_sync_index.clone(), ) .await?; - // If announcements were promoted (now Full sync level), notify SyncManager to - // recompute filters so PR event subscriptions are created on connected relays. - if result.announcements_released > 0 { - if let (Some(ref tx), Some(ref repo_sync_index)) = - (&self.sync_action_tx, &self.repo_sync_index) - { - let index = repo_sync_index.read().await; - for (repo_id, needs) in index.iter() { - if needs.sync_level == crate::sync::SyncLevel::Full - && !needs.root_events.is_empty() - { - // Send AddFilters for Full repos with root events - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &needs.root_events, - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: needs.root_events.clone(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters after announcement promotion" - ); - } else { - debug!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager after announcement promotion" - ); - } - } - } else if needs.sync_level == crate::sync::SyncLevel::Full { - // Even without root_events, send empty repo filter to ensure - // Layer 2 subscriptions (PR events) are set up - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &std::collections::HashSet::new(), - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters (no root_events) after announcement promotion" - ); - } - } - } - } - } - } - // Convert from git::sync::ProcessResult to our ProcessResult Ok(ProcessResult { states_released: result.states_released, -- cgit v1.2.3 From 70749ea9df1f6061c332112c617b615f91d79d48 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 11:17:10 +0000 Subject: fix: re-process hot-cache maintainer announcements after git push promotion When an owner announcement is promoted from purgatory via a git push, any maintainer announcements sitting in the rejected_events_index hot cache were never re-processed. The invalidate_and_get call only existed in SyncManager::process_event_static (the nostr sync path); the git push promotion path (http -> handlers -> git::sync) had no access to the rejected_events_index at all. Thread rejected_events_index and write_policy through the git push path: - process_purgatory_announcements: after saving the promoted announcement, parse its maintainers tag and call invalidate_and_get() for each, then re-process any returned hot-cache events via admit_event + save - process_newly_available_git_data: accept optional write_policy and rejected_events_index, pass them through to process_purgatory_announcements - handle_receive_pack: accept Arc and Arc, pass them to process_newly_available_git_data - HttpService / run_server: carry the two new fields, clone into each handle_receive_pack call - main.rs: obtain rejected_events_index from sync_manager before moving it into its task; wrap write_policy in Arc for the HTTP server - RealSyncContext::process_newly_available_git_data: pass None for both new params (purgatory sync path already handles this via SyncManager::process_event_static) Also rewrite the maintainer_reprocessing integration tests to correctly exercise the hot-cache path now that announcements require git data before being released from purgatory: - Start relay_b with relay_a as bootstrap so its SyncManager syncs maintainer announcements via negentropy before the owner git push - Use push_unique_git_data_to_relay (new helper) to give each maintainer a distinct commit hash, preventing git from skipping pack transfer - Make wait_for_event_on_relay poll in a retry loop so transient timing gaps between DB write and query do not cause false negatives --- src/git/handlers.rs | 7 +- src/git/sync.rs | 113 +++++++++++++++- src/http/mod.rs | 22 +++- src/main.rs | 7 + src/purgatory/sync/context.rs | 6 +- tests/sync/maintainer_reprocessing.rs | 235 +++++++++++++++++++++------------- 6 files changed, 298 insertions(+), 92 deletions(-) (limited to 'src/git') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..13d6ba0 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess; use crate::git::authorization::{authorize_push, parse_pushed_refs}; use crate::git::sync::process_newly_available_git_data; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// Handle GET /info/refs?service=git-{upload,receive}-pack /// @@ -195,6 +196,8 @@ pub async fn handle_receive_pack( purgatory: Arc, git_data_path: &str, git_protocol: Option<&str>, + write_policy: Arc, + rejected_events_index: Arc, ) -> Result>, GitError> { debug!("Handling receive-pack for {:?}", repo_path); @@ -307,6 +310,8 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + Some(&write_policy), + Some(&rejected_events_index), ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index 4b35023..8401736 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -32,6 +32,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::process::Command; +use std::sync::Arc; use tracing::{debug, info, warn}; use nostr_sdk::Event; @@ -41,9 +42,10 @@ use crate::git::authorization::{ RepositoryData, }; use crate::git::{self, oid_exists}; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; +use crate::sync::rejected_index::RejectedEventsIndex; /// Result of processing newly available git data. /// @@ -819,6 +821,8 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -848,6 +852,8 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, + write_policy, + rejected_events_index, ) .await; result.merge(announcement_result); @@ -1277,6 +1283,10 @@ async fn process_purgatory_pr_events( /// /// When git data arrives for a repository, any announcements in purgatory /// for that repository should be promoted to the database and served to clients. +/// +/// When `write_policy` and `rejected_events_index` are provided (git push path), +/// any maintainer announcements sitting in the hot cache are re-processed immediately +/// after the owner announcement is promoted, so they don't wait for the next sync cycle. async fn process_purgatory_announcements( identifier: &str, source_repo_path: &Path, @@ -1284,6 +1294,8 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1339,6 +1351,105 @@ async fn process_purgatory_announcements( } result.announcements_released += 1; + + // Re-process any maintainer announcements sitting in the hot cache. + // + // When an owner announcement is promoted from purgatory via a git push, + // maintainer announcements that arrived earlier (via relay sync) may have + // been rejected and stored in the hot cache because the owner announcement + // didn't exist in the DB yet. Now that the owner announcement is saved, + // we must invalidate and re-process those cached events immediately. + // + // This only applies on the git push path (write_policy + rejected_events_index + // are Some). The purgatory sync path already handles this via + // SyncManager::process_event_static. + if let (Some(wp), Some(rei), Some(relay)) = + (write_policy, rejected_events_index, local_relay) + { + use crate::nostr::events::RepositoryAnnouncement; + use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { + if !announcement.maintainers.is_empty() { + debug!( + identifier = %identifier, + event_id = %event.id, + maintainer_count = announcement.maintainers.len(), + "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements" + ); + + for maintainer_hex in &announcement.maintainers { + match nostr_sdk::PublicKey::from_hex(maintainer_hex) { + Ok(maintainer_pubkey) => { + let (removed, hot_events) = rei.invalidate_and_get( + &maintainer_pubkey, + &announcement.identifier, + Some(crate::sync::rejected_index::EventType::Announcement), + ); + + if removed > 0 { + info!( + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + removed_from_cold_index = removed, + hot_cache_events = hot_events.len(), + "Invalidated rejected maintainer announcements after git push promotion" + ); + } + + // Re-process events from hot cache + let dummy_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 0, + ); + for hot_event in hot_events { + info!( + event_id = %hot_event.id, + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + "Re-processing maintainer announcement from hot cache after git push promotion" + ); + match wp.admit_event(&hot_event, &dummy_addr).await { + WritePolicyResult::Accept => { + match database.save_event(&hot_event).await { + Ok(_) => { + relay.notify_event(hot_event.clone()); + info!( + event_id = %hot_event.id, + "Maintainer announcement accepted and saved on re-processing" + ); + } + Err(e) => { + warn!( + event_id = %hot_event.id, + error = %e, + "Failed to save re-processed maintainer announcement" + ); + } + } + } + _ => { + warn!( + event_id = %hot_event.id, + "Maintainer announcement still rejected on re-processing" + ); + } + } + } + } + Err(e) => { + warn!( + maintainer_hex = %maintainer_hex, + error = %e, + "Invalid maintainer public key in promoted announcement" + ); + } + } + } + } + } + } } Err(e) => { warn!( diff --git a/src/http/mod.rs b/src/http/mod.rs index ffb1562..cfd7c52 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -26,8 +26,9 @@ use tokio::net::TcpListener; use crate::config::Config; use crate::git; use crate::metrics::Metrics; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// CORS headers required by GRASP-01 specification (lines 40-47) const CORS_ALLOW_ORIGIN: &str = "*"; @@ -97,6 +98,10 @@ struct HttpService { metrics: Option>, /// Purgatory for event/git coordination purgatory: Arc, + /// Write policy for re-processing hot-cache events after git push promotion + write_policy: Arc, + /// Rejected events index for hot-cache re-processing after git push promotion + rejected_events_index: Arc, } impl HttpService { @@ -107,6 +112,8 @@ impl HttpService { database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> Self { Self { relay, @@ -115,6 +122,8 @@ impl HttpService { database, metrics, purgatory, + write_policy, + rejected_events_index, } } } @@ -132,6 +141,8 @@ impl Service> for HttpService { let git_data_path = self.config.effective_git_data_path(); let database = self.database.clone(); let purgatory = self.purgatory.clone(); + let write_policy = self.write_policy.clone(); + let rejected_events_index = self.rejected_events_index.clone(); // Handle OPTIONS preflight requests (CORS) // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content @@ -293,6 +304,8 @@ impl Service> for HttpService { purgatory.clone(), &git_data_path, git_protocol.as_deref(), + write_policy.clone(), + rejected_events_index.clone(), ) .await; @@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String { /// * `relay` - The LocalRelay for WebSocket connections /// * `database` - The database for direct queries (e.g., push authorization) /// * `metrics` - Optional metrics for Prometheus endpoint +/// * `purgatory` - Purgatory for event/git coordination +/// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion +/// * `rejected_events_index` - Rejected events index for hot-cache re-processing pub async fn run_server( config: Config, relay: LocalRelay, database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> anyhow::Result<()> { let bind_addr: SocketAddr = config.bind_address.parse()?; @@ -582,6 +600,8 @@ pub async fn run_server( database.clone(), metrics.clone(), purgatory.clone(), + write_policy.clone(), + rejected_events_index.clone(), ); tokio::spawn(async move { diff --git a/src/main.rs b/src/main.rs index ab6ede7..6769cf3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,7 +130,9 @@ async fn main() -> Result<()> { } // Get a reference to the rejected events index for shutdown persistence + // and for the HTTP server's git push path (hot-cache re-processing) let shutdown_rejected_index = sync_manager.rejected_events_index(); + let http_rejected_index = shutdown_rejected_index.clone(); tokio::spawn(async move { sync_manager.run().await; @@ -206,6 +208,9 @@ async fn main() -> Result<()> { // Start HTTP server with integrated relay and database info!("Starting HTTP server on {}", config.bind_address); + // Wrap write_policy in Arc for sharing between HTTP server connections + let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); + // Run server until shutdown signal, then cleanup tokio::select! { result = http::run_server( @@ -214,6 +219,8 @@ async fn main() -> Result<()> { relay_with_db.database, metrics, purgatory, + http_write_policy, + http_rejected_index, ) => { result? } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..ece8cd6 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -474,7 +474,9 @@ impl SyncContext for RealSyncContext { source_repo_path: &Path, new_oids: &HashSet, ) -> Result { - // Delegate to the unified function from git::sync + // Delegate to the unified function from git::sync. + // Pass None for write_policy and rejected_events_index: the purgatory sync path + // already handles hot-cache re-processing via SyncManager::process_event_static. let result = crate::git::sync::process_newly_available_git_data( source_repo_path, new_oids, @@ -482,6 +484,8 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, + None, + None, ) .await?; diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 266a437..61d8e14 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -2,51 +2,61 @@ //! //! Tests the two-tier rejected events index and immediate re-processing of //! maintainer announcements when owner announcements are accepted. +//! +//! ## Test design +//! +//! Announcements now require git data before they are released from purgatory and +//! served to other relays. The hot-cache re-processing path we want to exercise is: +//! +//! relay_b syncs maintainer announcement from relay_a +//! → write policy rejects it (no owner announcement in DB yet) +//! → event stored in hot cache +//! owner git push to relay_b promotes owner announcement from purgatory +//! → our new code calls rejected_events_index.invalidate_and_get() +//! → maintainer announcement re-processed and accepted +//! +//! To guarantee the maintainer announcements arrive at relay_b *before* the owner +//! git push, relay_b is started with relay_a as its bootstrap relay. That way +//! relay_b's SyncManager connects to relay_a immediately and syncs whatever is +//! already in relay_a's DB. We push the maintainer git data first (so the +//! announcements are in relay_a's DB), wait briefly for the sync round-trip, then +//! send the owner announcement + git push. use std::time::Duration; use nostr_sdk::prelude::*; -use crate::common::{ - sync_helpers::*, - TestRelay, -}; +use crate::common::{sync_helpers::*, TestRelay}; -/// Test that maintainer announcements are re-processed immediately when owner announcement accepted +/// Test that a maintainer announcement is re-processed immediately when the owner +/// announcement is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Maintainer sends announcement (gets rejected - doesn't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + maintainer) -/// 3. relay_b syncs from relay_a, maintainer announcement enters rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes maintainer announcement +/// 1. relay_a: Maintainer sends announcement + git data → accepted into relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs maintainer announcement +/// → rejected by write policy (no owner in DB) → stored in hot cache +/// 3. relay_b: Owner sends announcement → purgatory (no git data yet) +/// 4. relay_b: Owner git push → owner announcement promoted from purgatory +/// → hot-cache re-processing fires → maintainer announcement accepted /// 5. Both announcements should be in relay_b's database -/// -/// Expected time: <5 seconds (vs 24 hours without hot cache) #[tokio::test] async fn test_maintainer_announcement_reprocessed_immediately() { // Start relay_a (where maintainer announcement will be sent) let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer_keys = Keys::generate(); - let identifier = "test-repo"; - let start = std::time::Instant::now(); - - // Step 1: Send maintainer announcement to relay_a (will be rejected by relay_b - doesn't list relay_b) - // Use HTTP clone URL pointing to relay_a's git endpoint so it can be released from purgatory + // Step 1: Send maintainer announcement to relay_a then push git data so it lands in + // relay_a's DB. The announcement lists relay_a only (not relay_b), so relay_b's write + // policy will reject it when it arrives via sync. let maintainer_npub = maintainer_keys .public_key() .to_bech32() .expect("Failed to get npub"); - let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -60,27 +70,33 @@ async fn test_maintainer_announcement_reprocessed_immediately() { identifier )], ), - Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), + Tag::custom( + TagKind::custom("relays"), + vec![relay_a.url().to_string()], + ), ]) .sign_with_keys(&maintainer_keys) .unwrap(); + send_to_relay(&relay_a, &maintainer_announcement).await.unwrap(); + let _git_dir_maintainer = + push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()]) + .await; + println!("✓ Maintainer announcement + git data pushed to relay_a"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // relay_b's initial negentropy sync will pick up the maintainer announcement and reject it + // (no owner announcement in relay_b's DB yet), storing it in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); - send_to_relay(&relay_a, &maintainer_announcement) - .await - .unwrap(); - println!("✓ Maintainer announcement sent to relay_a"); - - // Push git data for maintainer's repo to relay_a → releases maintainer announcement from purgatory - let _git_dir_maintainer = push_git_data_to_relay( - &relay_a, - &maintainer_keys, - identifier, - &[&relay_a.domain()], - ) - .await; - println!("✓ Maintainer git data pushed to relay_a (announcement released from purgatory)"); - - // Step 2: Set up owner announcement on relay_b (lists relay_a + maintainer) with git data + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcement should be in hot cache)"); + + let start = std::time::Instant::now(); + + // Step 3: Send owner announcement to relay_b → goes to purgatory (no git data yet). + // The announcement lists relay_a + relay_b and names the maintainer. let owner_npub = owner_keys .public_key() .to_bech32() @@ -111,19 +127,21 @@ async fn test_maintainer_announcement_reprocessed_immediately() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner's repo to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory, which triggers hot-cache + // re-processing of the maintainer announcement via our new code path. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)"); - // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes) - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; let elapsed = start.elapsed(); - // Step 4: Verify both announcements are in relay_b's database + // Step 6: Verify both announcements are in relay_b's database. let owner_filter = Filter::new() .kind(Kind::GitRepoAnnouncement) .author(owner_keys.public_key()) @@ -145,7 +163,6 @@ async fn test_maintainer_announcement_reprocessed_immediately() { "Maintainer announcement should be re-processed and accepted in relay_b" ); - // Step 5: Verify it happened quickly (not 24 hours!) assert!( elapsed.as_secs() < 15, "Re-processing should happen in <15 seconds, took {:?}", @@ -258,13 +275,16 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() { relay.stop().await; } -/// Test multiple maintainers are all re-processed when owner announcement accepted +/// Test that all maintainer announcements are re-processed when the owner announcement +/// is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Three maintainers send announcements (get rejected - don't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + all three maintainers) -/// 3. relay_b syncs from relay_a, all maintainer announcements enter rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes all maintainer announcements +/// 1. relay_a: Three maintainers send announcements + git data → in relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs all three maintainer +/// announcements → all rejected (no owner in DB) → all in hot cache +/// 3. relay_b: Owner sends announcement → purgatory +/// 4. relay_b: Owner git push → owner promoted → hot-cache re-processing fires for +/// all three maintainers /// 5. All four announcements should be in relay_b's database #[tokio::test] async fn test_multiple_maintainers_all_reprocessed() { @@ -272,21 +292,23 @@ async fn test_multiple_maintainers_all_reprocessed() { let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer1_keys = Keys::generate(); let maintainer2_keys = Keys::generate(); let maintainer3_keys = Keys::generate(); - let identifier = "multi-maintainer-repo"; + // Use a unique identifier per test run to avoid cross-test interference when + // tests run in parallel (each test gets its own namespace on relay_a). + let identifier = &format!( + "multi-maintainer-repo-{}", + owner_keys.public_key().to_hex()[..8].to_string() + ); - // Step 1: Send three maintainer announcements to relay_a with git data - // (purgatory requires git data before announcements are accepted) - let mut git_dirs_maintainers = Vec::new(); + // Step 1: Send each maintainer announcement to relay_a then push git data so all three + // land in relay_a's DB. Each announcement lists relay_a only, so relay_b will reject + // them when syncing (no owner announcement in relay_b's DB yet). + let mut git_dirs = Vec::new(); for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] .iter() .enumerate() @@ -295,13 +317,12 @@ async fn test_multiple_maintainers_all_reprocessed() { .public_key() .to_bech32() .expect("Failed to get npub"); - let announcement = EventBuilder::new( Kind::GitRepoAnnouncement, format!("Maintainer {} repository", idx + 1), ) .tags(vec![ - Tag::identifier(identifier), + Tag::identifier(identifier.as_str()), Tag::custom( TagKind::custom("clone"), vec![format!( @@ -315,18 +336,53 @@ async fn test_multiple_maintainers_all_reprocessed() { ]) .sign_with_keys(maintainer_keys) .unwrap(); - send_to_relay(&relay_a, &announcement).await.unwrap(); + // Use push_unique_git_data_to_relay so each maintainer gets a distinct commit + // hash. Identical hashes cause git to skip pack transfer when the object + // already exists on the server, leaving the announcement in purgatory. + let git_dir = push_unique_git_data_to_relay( + &relay_a, + maintainer_keys, + identifier, + &[&relay_a.domain()], + &m_npub, + ) + .await; + git_dirs.push(git_dir); + } + println!("✓ Three maintainer announcements + git data pushed to relay_a"); - // Push git data to release each maintainer's announcement from purgatory - let git_dir = - push_git_data_to_relay(&relay_a, maintainer_keys, identifier, &[&relay_a.domain()]) - .await; - git_dirs_maintainers.push(git_dir); + // Confirm all three announcements are queryable on relay_a before starting relay_b. + // This eliminates the race between relay_a's DB writes and relay_b's initial negentropy sync. + for (name, keys) in [ + ("maintainer1", &maintainer1_keys), + ("maintainer2", &maintainer2_keys), + ("maintainer3", &maintainer3_keys), + ] { + let filter = Filter::new() + .kind(Kind::GitRepoAnnouncement) + .author(keys.public_key()) + .identifier(identifier); + let found = + wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await; + assert!(found, "{} announcement should be in relay_a before starting relay_b", name); } - println!("✓ Three maintainer announcements sent to relay_a with git data"); + println!("✓ All three maintainer announcements confirmed in relay_a's DB"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // Because all three maintainer announcements are confirmed in relay_a's DB, relay_b's + // initial negentropy sync will pick them all up and reject them (no owner announcement + // in relay_b's DB yet), storing them in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); + + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // allow extra time for slow CI environments. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); - // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers) + // Step 3: Send owner announcement to relay_b → goes to purgatory. let owner_npub = owner_keys .public_key() .to_bech32() @@ -361,17 +417,19 @@ async fn test_multiple_maintainers_all_reprocessed() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory and triggers hot-cache + // re-processing for all three maintainer announcements. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (hot-cache re-processing should fire)"); - // Step 3: Wait for sync and re-processing - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; - // Step 4: Verify all four announcements are in relay_b's database + // Step 6: Verify all four announcements are in relay_b's database. for (name, keys) in [ ("owner", &owner_keys), ("maintainer1", &maintainer1_keys), @@ -396,10 +454,10 @@ async fn test_multiple_maintainers_all_reprocessed() { /// Test that invalid maintainer public keys don't cause panics /// /// Flow: -/// 1. Maintainer announcement arrives → Rejected -/// 2. Owner announcement arrives with INVALID maintainer hex → Should handle gracefully -/// 3. Owner announcement should still be accepted -/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey) +/// 1. Maintainer announcement arrives → Rejected (doesn't list our relay) +/// 2. Owner announcement + git push → accepted, with INVALID maintainer hex in maintainers tag +/// 3. Owner announcement should be accepted +/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey can't be parsed) #[tokio::test] async fn test_invalid_maintainer_pubkey_handled_gracefully() { let relay = TestRelay::start().await; @@ -410,8 +468,12 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { let identifier = "invalid-maintainer-repo"; + // Create client using TestClient helper + let client = TestClient::new(relay.url(), owner_keys.clone()) + .await + .expect("Failed to connect to relay"); + // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) - // This one uses example.com clone URL - it goes to purgatory on relay, never promoted let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -428,12 +490,13 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .sign_with_keys(&maintainer_keys) .unwrap(); - // Send maintainer announcement - expect it to be rejected (purgatory / policy) - send_to_relay(&relay, &maintainer_announcement).await.ok(); + // Send maintainer announcement - expect it to be rejected + let _ = client.send_event(&maintainer_announcement).await; tokio::time::sleep(Duration::from_millis(200)).await; - // Step 2: Set up owner announcement with INVALID maintainer hex and git data - // Use HTTP clone URL to relay's git endpoint so it can be released from purgatory + // Step 2: Send owner announcement with INVALID maintainer hex, then push git data. + // The announcement goes to purgatory first; the git push promotes it. + // The invalid maintainer hex should be handled gracefully (no panic). let owner_npub = owner_keys .public_key() .to_bech32() @@ -461,13 +524,8 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .unwrap(); send_to_relay(&relay, &owner_announcement).await.unwrap(); - - // Push git data to relay → releases owner announcement from purgatory let _git_dir = push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await; - println!("✓ Owner git data pushed to relay (announcement released from purgatory)"); - - // Wait for processing tokio::time::sleep(Duration::from_millis(500)).await; // Step 3: Verify owner announcement accepted, maintainer not re-processed @@ -497,5 +555,6 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { println!("✅ Invalid maintainer pubkey handled gracefully without panic"); + client.disconnect().await; relay.stop().await; } -- cgit v1.2.3 From c3dedb7a5b527c3a3deb1e781aba9d562c6eb294 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:54:57 +0000 Subject: feat: extend purgatory announcement expiry during git push authorization Per design doc decision #4: when git auth finds a matching state event in purgatory that authorizes a push, extend the announcement's expiry. The repo is actively receiving git data so the announcement should not expire prematurely. Also triggers revival of soft-expired announcements. --- src/git/authorization.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'src/git') diff --git a/src/git/authorization.rs b/src/git/authorization.rs index 9d53c4f..69a0751 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs @@ -661,6 +661,27 @@ pub async fn get_state_authorization_for_specific_owner_repo( .unwrap_or_else(|_| latest_authorized.pubkey.to_hex()) ); + // Extend purgatory announcement expiry for the owner. + // + // Per design doc decision #4: git auth extending a state event's expiry + // also extends the announcement's expiry. The repo is actively receiving + // git data, so the announcement should not expire prematurely. + // This also revives soft-expired announcements (recreates bare repo). + if let Ok(owner_pk) = PublicKey::parse(owner_pubkey) { + if purgatory.has_purgatory_announcement(&owner_pk, identifier) { + purgatory.extend_announcement_expiry( + &owner_pk, + identifier, + std::time::Duration::from_secs(1800), + ); + debug!( + identifier = %identifier, + owner = %owner_pubkey, + "Extended purgatory announcement expiry due to git push authorization" + ); + } + } + return Ok(AuthorizationResult { authorized: true, reason: "Authorized by state event in purgatory".to_string(), -- cgit v1.2.3