From d76003b629a4a03dba23a8a1c41da6e4ac4c30cf Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:17 +0000 Subject: feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion When git data arrives for a purgatory announcement and promotes it to the database, the relay now: 1. Upgrades the announcement's sync level in RepoSyncIndex from StateOnly to Full (git/sync.rs: process_purgatory_announcements) 2. Sends AddFilters actions to SyncManager for all connected relays, using Full sync filters (Layer 2 #a/#A/#q) to subscribe to PR events (purgatory/sync/context.rs: RealSyncContext.process_newly_available_git_data) 3. For user-submitted purgatory announcements, registers the repo in RepoSyncIndex with StateOnly level and sends AddFilters to SyncManager so it discovers and connects to relays listed in the announcement tags (nostr/builder.rs: handle_announcement AcceptPurgatory path) The RealSyncContext now accepts optional repo_sync_index and sync_action_tx parameters. main.rs wires these up from SyncManager. PolicyContext gains repo_sync_index and sync_action_tx fields for the write policy path. --- src/git/handlers.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/git/handlers.rs') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..129ca2c 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,6 +307,7 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + None, ) .await { -- cgit v1.2.3 From 3d9359d5ac0045fb93fd8732160e0de8413d6881 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:28:28 +0000 Subject: Revert "feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion" This reverts commit d76003b629a4a03dba23a8a1c41da6e4ac4c30cf. --- src/git/handlers.rs | 1 - src/git/sync.rs | 21 -------- src/main.rs | 20 ------- src/nostr/builder.rs | 118 ------------------------------------------ src/nostr/policy/mod.rs | 37 ------------- src/purgatory/sync/context.rs | 116 ----------------------------------------- 6 files changed, 313 deletions(-) (limited to 'src/git/handlers.rs') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 129ca2c..017eee4 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,7 +307,6 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, - None, ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index b3fa11a..4b35023 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -44,7 +44,6 @@ use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; -use crate::sync::{RepoSyncIndex, SyncLevel}; /// Result of processing newly available git data. /// @@ -810,7 +809,6 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) /// * `purgatory` - Purgatory instance to check for satisfiable events /// * `git_data_path` - Base path for git repositories -/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion /// /// # Returns /// A `ProcessResult` describing what was processed @@ -821,7 +819,6 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -851,7 +848,6 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, - repo_sync_index.as_ref(), ) .await; result.merge(announcement_result); @@ -1288,7 +1284,6 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option<&RepoSyncIndex>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1343,22 +1338,6 @@ async fn process_purgatory_announcements( } } - // Upgrade sync level to Full in repo_sync_index - if let Some(index) = repo_sync_index { - let mut index = index.write().await; - // Use hex pubkey format to match how repo_sync_index keys are built - // (sync/mod.rs uses event.pubkey which is hex, not bech32) - let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); - if let Some(entry) = index.get_mut(&repo_id) { - entry.sync_level = SyncLevel::Full; - debug!( - identifier = %identifier, - repo_id = %repo_id, - "Upgraded sync level to Full after announcement promotion" - ); - } - } - result.announcements_released += 1; } Err(e) => { diff --git a/src/main.rs b/src/main.rs index 3ff30fb..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,24 +132,6 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); - // Get a reference to the repo sync index for upgrading sync levels on promotion - let repo_sync_index = sync_manager.repo_sync_index(); - - // Set the repo sync index on the write policy so user-submitted purgatory - // announcements can trigger relay discovery (connect to relays in announcement tags) - relay_with_db - .write_policy - .set_repo_sync_index(repo_sync_index.clone()); - - // Get the action sender BEFORE consuming sync_manager with spawn - let action_tx = sync_manager.action_tx(); - - // Set the sync action sender so the write policy can trigger relay connections - // when user-submitted purgatory announcements are registered with StateOnly level - if let Some(tx) = action_tx.clone() { - relay_with_db.write_policy.set_sync_action_tx(tx); - } - tokio::spawn(async move { sync_manager.run().await; }); @@ -202,8 +184,6 @@ async fn main() -> Result<()> { Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), - Some(repo_sync_index), - action_tx, )); // Create throttle manager for rate limiting remote git servers diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 4c66f6d..aff12a6 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,24 +98,6 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - /// - /// When a user submits an announcement that goes to purgatory (no git data yet), - /// the relay needs to discover and connect to relays listed in the announcement's - /// `relays` and `clone` tags. This index is updated when the announcement is accepted - /// into purgatory, triggering the sync system to connect and sync state events. - pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { - self.ctx.set_repo_sync_index(index); - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - /// - /// This allows the write policy to notify the SyncManager when user-submitted - /// purgatory announcements need relay discovery (triggering new connections). - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - self.ctx.set_sync_action_tx(tx); - } - /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -164,106 +146,6 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); - - // Register in repo_sync_index with StateOnly level so the sync - // system discovers and connects to relays listed in this announcement. - // This is needed for user-submitted announcements (not via sync path) - // to trigger relay discovery and state event sync. - if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = - format!("30617:{}:{}", event.pubkey, identifier); - - // Get relay URLs stored in purgatory for this announcement - let relays = self - .ctx - .purgatory - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - if !relays.is_empty() { - use crate::sync::{ - AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, - }; - - // Update repo_sync_index with StateOnly for this repo - let new_repos = { - let mut index = repo_sync_index.write().await; - let entry = - index.entry(repo_id.clone()).or_insert_with(|| { - RepoSyncNeeds { - relays: std::collections::HashSet::new(), - root_events: std::collections::HashSet::new(), - sync_level: SyncLevel::StateOnly, - } - }); - entry.relays.extend(relays.iter().cloned()); - // Don't upgrade if already Full - tracing::info!( - repo_id = %repo_id, - relay_count = entry.relays.len(), - "Registered user-submitted purgatory announcement in \ - RepoSyncIndex with StateOnly level for relay discovery" - ); - // Return cloned relays for AddFilters - relays.clone() - }; - - // Send AddFilters to SyncManager so it connects to these relays - if let Some(tx) = self.ctx.get_sync_action_tx() { - // Build state-only filters for this repo - let state_only_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &std::collections::HashSet::new(), - &state_only_repos, - &std::collections::HashSet::new(), - None, - ); - - for relay_url in new_repos { - // Skip our own domain - if relay_url.contains(&self.ctx.domain) { - continue; - } - let action = AddFilters { - relay_url: relay_url.clone(), - items: PendingItems { - repos: state_only_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters: filters.clone(), - }; - if let Err(e) = tx.send(action).await { - tracing::warn!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters action for \ - user-submitted purgatory announcement" - ); - } else { - tracing::info!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager for \ - user-submitted purgatory announcement relay" - ); - } - } - } - } - } - } - WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 78a09fc..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; -use crate::sync::{AddFilters, RepoSyncIndex}; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -35,16 +34,6 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, - /// Optional repo sync index for triggering relay discovery when announcements - /// go to purgatory via user submission (not via the sync path). - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub repo_sync_index: Arc>>, - /// Optional sender for AddFilters actions to SyncManager. - /// Used to trigger relay discovery when user-submitted purgatory announcements - /// are registered with StateOnly sync level. - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub sync_action_tx: - Arc>>>, } impl PolicyContext { @@ -62,8 +51,6 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, - repo_sync_index: Arc::new(std::sync::RwLock::new(None)), - sync_action_tx: Arc::new(std::sync::RwLock::new(None)), } } @@ -81,28 +68,4 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } - - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - let mut guard = self.repo_sync_index.write().unwrap(); - *guard = Some(index); - } - - /// Get a clone of the repo sync index if it's been set. - pub fn get_repo_sync_index(&self) -> Option { - let guard = self.repo_sync_index.read().unwrap(); - guard.clone() - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - let mut guard = self.sync_action_tx.write().unwrap(); - *guard = Some(tx); - } - - /// Get a clone of the sync action sender if it's been set. - pub fn get_sync_action_tx(&self) -> Option> { - let guard = self.sync_action_tx.read().unwrap(); - guard.clone() - } } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 4dbb402..3568e89 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -193,7 +193,6 @@ use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::Purgatory; use crate::sync::naughty_list::NaughtyListTracker; -use crate::sync::RepoSyncIndex; use super::functions::extract_domain; @@ -222,13 +221,6 @@ pub struct RealSyncContext { /// Naughty list tracker for git remote domains with persistent errors git_naughty_list: Arc, - - /// Optional repo sync index for upgrading sync level on promotion - repo_sync_index: Option, - - /// Optional sender for AddFilters actions to SyncManager. - /// Used after announcement promotion to trigger PR event subscription on connected relays. - sync_action_tx: Option>, } impl RealSyncContext { @@ -241,9 +233,6 @@ impl RealSyncContext { /// * `our_domain` - Our domain to exclude from clone URLs /// * `local_relay` - Local relay for WebSocket notifications /// * `git_naughty_list` - Naughty list tracker for git remote domains - /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion - /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion - #[allow(clippy::too_many_arguments)] pub fn new( purgatory: Arc, database: SharedDatabase, @@ -251,8 +240,6 @@ impl RealSyncContext { our_domain: Option, local_relay: Option, git_naughty_list: Arc, - repo_sync_index: Option, - sync_action_tx: Option>, ) -> Self { Self { purgatory, @@ -261,23 +248,9 @@ impl RealSyncContext { our_domain_value: our_domain, local_relay, git_naughty_list, - repo_sync_index, - sync_action_tx, } } - /// Set the sync action sender for triggering filter recomputation after announcement promotion. - /// - /// When an announcement is promoted from purgatory to Full sync level, the SyncManager - /// needs to subscribe to PR events for that repo on all connected relays. This sender - /// is used to trigger that subscription. - pub fn set_sync_action_tx( - &mut self, - tx: tokio::sync::mpsc::Sender, - ) { - self.sync_action_tx = Some(tx); - } - /// Get reference to the git naughty list tracker pub fn git_naughty_list(&self) -> &Arc { &self.git_naughty_list @@ -509,98 +482,9 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, - self.repo_sync_index.clone(), ) .await?; - // If announcements were promoted (now Full sync level), notify SyncManager to - // recompute filters so PR event subscriptions are created on connected relays. - if result.announcements_released > 0 { - if let (Some(ref tx), Some(ref repo_sync_index)) = - (&self.sync_action_tx, &self.repo_sync_index) - { - let index = repo_sync_index.read().await; - for (repo_id, needs) in index.iter() { - if needs.sync_level == crate::sync::SyncLevel::Full - && !needs.root_events.is_empty() - { - // Send AddFilters for Full repos with root events - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &needs.root_events, - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: needs.root_events.clone(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters after announcement promotion" - ); - } else { - debug!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager after announcement promotion" - ); - } - } - } else if needs.sync_level == crate::sync::SyncLevel::Full { - // Even without root_events, send empty repo filter to ensure - // Layer 2 subscriptions (PR events) are set up - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &std::collections::HashSet::new(), - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters (no root_events) after announcement promotion" - ); - } - } - } - } - } - } - // Convert from git::sync::ProcessResult to our ProcessResult Ok(ProcessResult { states_released: result.states_released, -- cgit v1.2.3 From 70749ea9df1f6061c332112c617b615f91d79d48 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 11:17:10 +0000 Subject: fix: re-process hot-cache maintainer announcements after git push promotion When an owner announcement is promoted from purgatory via a git push, any maintainer announcements sitting in the rejected_events_index hot cache were never re-processed. The invalidate_and_get call only existed in SyncManager::process_event_static (the nostr sync path); the git push promotion path (http -> handlers -> git::sync) had no access to the rejected_events_index at all. Thread rejected_events_index and write_policy through the git push path: - process_purgatory_announcements: after saving the promoted announcement, parse its maintainers tag and call invalidate_and_get() for each, then re-process any returned hot-cache events via admit_event + save - process_newly_available_git_data: accept optional write_policy and rejected_events_index, pass them through to process_purgatory_announcements - handle_receive_pack: accept Arc and Arc, pass them to process_newly_available_git_data - HttpService / run_server: carry the two new fields, clone into each handle_receive_pack call - main.rs: obtain rejected_events_index from sync_manager before moving it into its task; wrap write_policy in Arc for the HTTP server - RealSyncContext::process_newly_available_git_data: pass None for both new params (purgatory sync path already handles this via SyncManager::process_event_static) Also rewrite the maintainer_reprocessing integration tests to correctly exercise the hot-cache path now that announcements require git data before being released from purgatory: - Start relay_b with relay_a as bootstrap so its SyncManager syncs maintainer announcements via negentropy before the owner git push - Use push_unique_git_data_to_relay (new helper) to give each maintainer a distinct commit hash, preventing git from skipping pack transfer - Make wait_for_event_on_relay poll in a retry loop so transient timing gaps between DB write and query do not cause false negatives --- src/git/handlers.rs | 7 +- src/git/sync.rs | 113 +++++++++++++++- src/http/mod.rs | 22 +++- src/main.rs | 7 + src/purgatory/sync/context.rs | 6 +- tests/sync/maintainer_reprocessing.rs | 235 +++++++++++++++++++++------------- 6 files changed, 298 insertions(+), 92 deletions(-) (limited to 'src/git/handlers.rs') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..13d6ba0 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess; use crate::git::authorization::{authorize_push, parse_pushed_refs}; use crate::git::sync::process_newly_available_git_data; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// Handle GET /info/refs?service=git-{upload,receive}-pack /// @@ -195,6 +196,8 @@ pub async fn handle_receive_pack( purgatory: Arc, git_data_path: &str, git_protocol: Option<&str>, + write_policy: Arc, + rejected_events_index: Arc, ) -> Result>, GitError> { debug!("Handling receive-pack for {:?}", repo_path); @@ -307,6 +310,8 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + Some(&write_policy), + Some(&rejected_events_index), ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index 4b35023..8401736 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -32,6 +32,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::process::Command; +use std::sync::Arc; use tracing::{debug, info, warn}; use nostr_sdk::Event; @@ -41,9 +42,10 @@ use crate::git::authorization::{ RepositoryData, }; use crate::git::{self, oid_exists}; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; +use crate::sync::rejected_index::RejectedEventsIndex; /// Result of processing newly available git data. /// @@ -819,6 +821,8 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -848,6 +852,8 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, + write_policy, + rejected_events_index, ) .await; result.merge(announcement_result); @@ -1277,6 +1283,10 @@ async fn process_purgatory_pr_events( /// /// When git data arrives for a repository, any announcements in purgatory /// for that repository should be promoted to the database and served to clients. +/// +/// When `write_policy` and `rejected_events_index` are provided (git push path), +/// any maintainer announcements sitting in the hot cache are re-processed immediately +/// after the owner announcement is promoted, so they don't wait for the next sync cycle. async fn process_purgatory_announcements( identifier: &str, source_repo_path: &Path, @@ -1284,6 +1294,8 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1339,6 +1351,105 @@ async fn process_purgatory_announcements( } result.announcements_released += 1; + + // Re-process any maintainer announcements sitting in the hot cache. + // + // When an owner announcement is promoted from purgatory via a git push, + // maintainer announcements that arrived earlier (via relay sync) may have + // been rejected and stored in the hot cache because the owner announcement + // didn't exist in the DB yet. Now that the owner announcement is saved, + // we must invalidate and re-process those cached events immediately. + // + // This only applies on the git push path (write_policy + rejected_events_index + // are Some). The purgatory sync path already handles this via + // SyncManager::process_event_static. + if let (Some(wp), Some(rei), Some(relay)) = + (write_policy, rejected_events_index, local_relay) + { + use crate::nostr::events::RepositoryAnnouncement; + use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { + if !announcement.maintainers.is_empty() { + debug!( + identifier = %identifier, + event_id = %event.id, + maintainer_count = announcement.maintainers.len(), + "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements" + ); + + for maintainer_hex in &announcement.maintainers { + match nostr_sdk::PublicKey::from_hex(maintainer_hex) { + Ok(maintainer_pubkey) => { + let (removed, hot_events) = rei.invalidate_and_get( + &maintainer_pubkey, + &announcement.identifier, + Some(crate::sync::rejected_index::EventType::Announcement), + ); + + if removed > 0 { + info!( + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + removed_from_cold_index = removed, + hot_cache_events = hot_events.len(), + "Invalidated rejected maintainer announcements after git push promotion" + ); + } + + // Re-process events from hot cache + let dummy_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 0, + ); + for hot_event in hot_events { + info!( + event_id = %hot_event.id, + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + "Re-processing maintainer announcement from hot cache after git push promotion" + ); + match wp.admit_event(&hot_event, &dummy_addr).await { + WritePolicyResult::Accept => { + match database.save_event(&hot_event).await { + Ok(_) => { + relay.notify_event(hot_event.clone()); + info!( + event_id = %hot_event.id, + "Maintainer announcement accepted and saved on re-processing" + ); + } + Err(e) => { + warn!( + event_id = %hot_event.id, + error = %e, + "Failed to save re-processed maintainer announcement" + ); + } + } + } + _ => { + warn!( + event_id = %hot_event.id, + "Maintainer announcement still rejected on re-processing" + ); + } + } + } + } + Err(e) => { + warn!( + maintainer_hex = %maintainer_hex, + error = %e, + "Invalid maintainer public key in promoted announcement" + ); + } + } + } + } + } + } } Err(e) => { warn!( diff --git a/src/http/mod.rs b/src/http/mod.rs index ffb1562..cfd7c52 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -26,8 +26,9 @@ use tokio::net::TcpListener; use crate::config::Config; use crate::git; use crate::metrics::Metrics; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// CORS headers required by GRASP-01 specification (lines 40-47) const CORS_ALLOW_ORIGIN: &str = "*"; @@ -97,6 +98,10 @@ struct HttpService { metrics: Option>, /// Purgatory for event/git coordination purgatory: Arc, + /// Write policy for re-processing hot-cache events after git push promotion + write_policy: Arc, + /// Rejected events index for hot-cache re-processing after git push promotion + rejected_events_index: Arc, } impl HttpService { @@ -107,6 +112,8 @@ impl HttpService { database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> Self { Self { relay, @@ -115,6 +122,8 @@ impl HttpService { database, metrics, purgatory, + write_policy, + rejected_events_index, } } } @@ -132,6 +141,8 @@ impl Service> for HttpService { let git_data_path = self.config.effective_git_data_path(); let database = self.database.clone(); let purgatory = self.purgatory.clone(); + let write_policy = self.write_policy.clone(); + let rejected_events_index = self.rejected_events_index.clone(); // Handle OPTIONS preflight requests (CORS) // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content @@ -293,6 +304,8 @@ impl Service> for HttpService { purgatory.clone(), &git_data_path, git_protocol.as_deref(), + write_policy.clone(), + rejected_events_index.clone(), ) .await; @@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String { /// * `relay` - The LocalRelay for WebSocket connections /// * `database` - The database for direct queries (e.g., push authorization) /// * `metrics` - Optional metrics for Prometheus endpoint +/// * `purgatory` - Purgatory for event/git coordination +/// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion +/// * `rejected_events_index` - Rejected events index for hot-cache re-processing pub async fn run_server( config: Config, relay: LocalRelay, database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> anyhow::Result<()> { let bind_addr: SocketAddr = config.bind_address.parse()?; @@ -582,6 +600,8 @@ pub async fn run_server( database.clone(), metrics.clone(), purgatory.clone(), + write_policy.clone(), + rejected_events_index.clone(), ); tokio::spawn(async move { diff --git a/src/main.rs b/src/main.rs index ab6ede7..6769cf3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,7 +130,9 @@ async fn main() -> Result<()> { } // Get a reference to the rejected events index for shutdown persistence + // and for the HTTP server's git push path (hot-cache re-processing) let shutdown_rejected_index = sync_manager.rejected_events_index(); + let http_rejected_index = shutdown_rejected_index.clone(); tokio::spawn(async move { sync_manager.run().await; @@ -206,6 +208,9 @@ async fn main() -> Result<()> { // Start HTTP server with integrated relay and database info!("Starting HTTP server on {}", config.bind_address); + // Wrap write_policy in Arc for sharing between HTTP server connections + let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); + // Run server until shutdown signal, then cleanup tokio::select! { result = http::run_server( @@ -214,6 +219,8 @@ async fn main() -> Result<()> { relay_with_db.database, metrics, purgatory, + http_write_policy, + http_rejected_index, ) => { result? } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..ece8cd6 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -474,7 +474,9 @@ impl SyncContext for RealSyncContext { source_repo_path: &Path, new_oids: &HashSet, ) -> Result { - // Delegate to the unified function from git::sync + // Delegate to the unified function from git::sync. + // Pass None for write_policy and rejected_events_index: the purgatory sync path + // already handles hot-cache re-processing via SyncManager::process_event_static. let result = crate::git::sync::process_newly_available_git_data( source_repo_path, new_oids, @@ -482,6 +484,8 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, + None, + None, ) .await?; diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 266a437..61d8e14 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -2,51 +2,61 @@ //! //! Tests the two-tier rejected events index and immediate re-processing of //! maintainer announcements when owner announcements are accepted. +//! +//! ## Test design +//! +//! Announcements now require git data before they are released from purgatory and +//! served to other relays. The hot-cache re-processing path we want to exercise is: +//! +//! relay_b syncs maintainer announcement from relay_a +//! → write policy rejects it (no owner announcement in DB yet) +//! → event stored in hot cache +//! owner git push to relay_b promotes owner announcement from purgatory +//! → our new code calls rejected_events_index.invalidate_and_get() +//! → maintainer announcement re-processed and accepted +//! +//! To guarantee the maintainer announcements arrive at relay_b *before* the owner +//! git push, relay_b is started with relay_a as its bootstrap relay. That way +//! relay_b's SyncManager connects to relay_a immediately and syncs whatever is +//! already in relay_a's DB. We push the maintainer git data first (so the +//! announcements are in relay_a's DB), wait briefly for the sync round-trip, then +//! send the owner announcement + git push. use std::time::Duration; use nostr_sdk::prelude::*; -use crate::common::{ - sync_helpers::*, - TestRelay, -}; +use crate::common::{sync_helpers::*, TestRelay}; -/// Test that maintainer announcements are re-processed immediately when owner announcement accepted +/// Test that a maintainer announcement is re-processed immediately when the owner +/// announcement is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Maintainer sends announcement (gets rejected - doesn't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + maintainer) -/// 3. relay_b syncs from relay_a, maintainer announcement enters rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes maintainer announcement +/// 1. relay_a: Maintainer sends announcement + git data → accepted into relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs maintainer announcement +/// → rejected by write policy (no owner in DB) → stored in hot cache +/// 3. relay_b: Owner sends announcement → purgatory (no git data yet) +/// 4. relay_b: Owner git push → owner announcement promoted from purgatory +/// → hot-cache re-processing fires → maintainer announcement accepted /// 5. Both announcements should be in relay_b's database -/// -/// Expected time: <5 seconds (vs 24 hours without hot cache) #[tokio::test] async fn test_maintainer_announcement_reprocessed_immediately() { // Start relay_a (where maintainer announcement will be sent) let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer_keys = Keys::generate(); - let identifier = "test-repo"; - let start = std::time::Instant::now(); - - // Step 1: Send maintainer announcement to relay_a (will be rejected by relay_b - doesn't list relay_b) - // Use HTTP clone URL pointing to relay_a's git endpoint so it can be released from purgatory + // Step 1: Send maintainer announcement to relay_a then push git data so it lands in + // relay_a's DB. The announcement lists relay_a only (not relay_b), so relay_b's write + // policy will reject it when it arrives via sync. let maintainer_npub = maintainer_keys .public_key() .to_bech32() .expect("Failed to get npub"); - let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -60,27 +70,33 @@ async fn test_maintainer_announcement_reprocessed_immediately() { identifier )], ), - Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), + Tag::custom( + TagKind::custom("relays"), + vec![relay_a.url().to_string()], + ), ]) .sign_with_keys(&maintainer_keys) .unwrap(); + send_to_relay(&relay_a, &maintainer_announcement).await.unwrap(); + let _git_dir_maintainer = + push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()]) + .await; + println!("✓ Maintainer announcement + git data pushed to relay_a"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // relay_b's initial negentropy sync will pick up the maintainer announcement and reject it + // (no owner announcement in relay_b's DB yet), storing it in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); - send_to_relay(&relay_a, &maintainer_announcement) - .await - .unwrap(); - println!("✓ Maintainer announcement sent to relay_a"); - - // Push git data for maintainer's repo to relay_a → releases maintainer announcement from purgatory - let _git_dir_maintainer = push_git_data_to_relay( - &relay_a, - &maintainer_keys, - identifier, - &[&relay_a.domain()], - ) - .await; - println!("✓ Maintainer git data pushed to relay_a (announcement released from purgatory)"); - - // Step 2: Set up owner announcement on relay_b (lists relay_a + maintainer) with git data + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcement should be in hot cache)"); + + let start = std::time::Instant::now(); + + // Step 3: Send owner announcement to relay_b → goes to purgatory (no git data yet). + // The announcement lists relay_a + relay_b and names the maintainer. let owner_npub = owner_keys .public_key() .to_bech32() @@ -111,19 +127,21 @@ async fn test_maintainer_announcement_reprocessed_immediately() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner's repo to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory, which triggers hot-cache + // re-processing of the maintainer announcement via our new code path. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)"); - // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes) - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; let elapsed = start.elapsed(); - // Step 4: Verify both announcements are in relay_b's database + // Step 6: Verify both announcements are in relay_b's database. let owner_filter = Filter::new() .kind(Kind::GitRepoAnnouncement) .author(owner_keys.public_key()) @@ -145,7 +163,6 @@ async fn test_maintainer_announcement_reprocessed_immediately() { "Maintainer announcement should be re-processed and accepted in relay_b" ); - // Step 5: Verify it happened quickly (not 24 hours!) assert!( elapsed.as_secs() < 15, "Re-processing should happen in <15 seconds, took {:?}", @@ -258,13 +275,16 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() { relay.stop().await; } -/// Test multiple maintainers are all re-processed when owner announcement accepted +/// Test that all maintainer announcements are re-processed when the owner announcement +/// is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Three maintainers send announcements (get rejected - don't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + all three maintainers) -/// 3. relay_b syncs from relay_a, all maintainer announcements enter rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes all maintainer announcements +/// 1. relay_a: Three maintainers send announcements + git data → in relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs all three maintainer +/// announcements → all rejected (no owner in DB) → all in hot cache +/// 3. relay_b: Owner sends announcement → purgatory +/// 4. relay_b: Owner git push → owner promoted → hot-cache re-processing fires for +/// all three maintainers /// 5. All four announcements should be in relay_b's database #[tokio::test] async fn test_multiple_maintainers_all_reprocessed() { @@ -272,21 +292,23 @@ async fn test_multiple_maintainers_all_reprocessed() { let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer1_keys = Keys::generate(); let maintainer2_keys = Keys::generate(); let maintainer3_keys = Keys::generate(); - let identifier = "multi-maintainer-repo"; + // Use a unique identifier per test run to avoid cross-test interference when + // tests run in parallel (each test gets its own namespace on relay_a). + let identifier = &format!( + "multi-maintainer-repo-{}", + owner_keys.public_key().to_hex()[..8].to_string() + ); - // Step 1: Send three maintainer announcements to relay_a with git data - // (purgatory requires git data before announcements are accepted) - let mut git_dirs_maintainers = Vec::new(); + // Step 1: Send each maintainer announcement to relay_a then push git data so all three + // land in relay_a's DB. Each announcement lists relay_a only, so relay_b will reject + // them when syncing (no owner announcement in relay_b's DB yet). + let mut git_dirs = Vec::new(); for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] .iter() .enumerate() @@ -295,13 +317,12 @@ async fn test_multiple_maintainers_all_reprocessed() { .public_key() .to_bech32() .expect("Failed to get npub"); - let announcement = EventBuilder::new( Kind::GitRepoAnnouncement, format!("Maintainer {} repository", idx + 1), ) .tags(vec![ - Tag::identifier(identifier), + Tag::identifier(identifier.as_str()), Tag::custom( TagKind::custom("clone"), vec![format!( @@ -315,18 +336,53 @@ async fn test_multiple_maintainers_all_reprocessed() { ]) .sign_with_keys(maintainer_keys) .unwrap(); - send_to_relay(&relay_a, &announcement).await.unwrap(); + // Use push_unique_git_data_to_relay so each maintainer gets a distinct commit + // hash. Identical hashes cause git to skip pack transfer when the object + // already exists on the server, leaving the announcement in purgatory. + let git_dir = push_unique_git_data_to_relay( + &relay_a, + maintainer_keys, + identifier, + &[&relay_a.domain()], + &m_npub, + ) + .await; + git_dirs.push(git_dir); + } + println!("✓ Three maintainer announcements + git data pushed to relay_a"); - // Push git data to release each maintainer's announcement from purgatory - let git_dir = - push_git_data_to_relay(&relay_a, maintainer_keys, identifier, &[&relay_a.domain()]) - .await; - git_dirs_maintainers.push(git_dir); + // Confirm all three announcements are queryable on relay_a before starting relay_b. + // This eliminates the race between relay_a's DB writes and relay_b's initial negentropy sync. + for (name, keys) in [ + ("maintainer1", &maintainer1_keys), + ("maintainer2", &maintainer2_keys), + ("maintainer3", &maintainer3_keys), + ] { + let filter = Filter::new() + .kind(Kind::GitRepoAnnouncement) + .author(keys.public_key()) + .identifier(identifier); + let found = + wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await; + assert!(found, "{} announcement should be in relay_a before starting relay_b", name); } - println!("✓ Three maintainer announcements sent to relay_a with git data"); + println!("✓ All three maintainer announcements confirmed in relay_a's DB"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // Because all three maintainer announcements are confirmed in relay_a's DB, relay_b's + // initial negentropy sync will pick them all up and reject them (no owner announcement + // in relay_b's DB yet), storing them in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); + + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // allow extra time for slow CI environments. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); - // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers) + // Step 3: Send owner announcement to relay_b → goes to purgatory. let owner_npub = owner_keys .public_key() .to_bech32() @@ -361,17 +417,19 @@ async fn test_multiple_maintainers_all_reprocessed() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory and triggers hot-cache + // re-processing for all three maintainer announcements. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (hot-cache re-processing should fire)"); - // Step 3: Wait for sync and re-processing - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; - // Step 4: Verify all four announcements are in relay_b's database + // Step 6: Verify all four announcements are in relay_b's database. for (name, keys) in [ ("owner", &owner_keys), ("maintainer1", &maintainer1_keys), @@ -396,10 +454,10 @@ async fn test_multiple_maintainers_all_reprocessed() { /// Test that invalid maintainer public keys don't cause panics /// /// Flow: -/// 1. Maintainer announcement arrives → Rejected -/// 2. Owner announcement arrives with INVALID maintainer hex → Should handle gracefully -/// 3. Owner announcement should still be accepted -/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey) +/// 1. Maintainer announcement arrives → Rejected (doesn't list our relay) +/// 2. Owner announcement + git push → accepted, with INVALID maintainer hex in maintainers tag +/// 3. Owner announcement should be accepted +/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey can't be parsed) #[tokio::test] async fn test_invalid_maintainer_pubkey_handled_gracefully() { let relay = TestRelay::start().await; @@ -410,8 +468,12 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { let identifier = "invalid-maintainer-repo"; + // Create client using TestClient helper + let client = TestClient::new(relay.url(), owner_keys.clone()) + .await + .expect("Failed to connect to relay"); + // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) - // This one uses example.com clone URL - it goes to purgatory on relay, never promoted let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -428,12 +490,13 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .sign_with_keys(&maintainer_keys) .unwrap(); - // Send maintainer announcement - expect it to be rejected (purgatory / policy) - send_to_relay(&relay, &maintainer_announcement).await.ok(); + // Send maintainer announcement - expect it to be rejected + let _ = client.send_event(&maintainer_announcement).await; tokio::time::sleep(Duration::from_millis(200)).await; - // Step 2: Set up owner announcement with INVALID maintainer hex and git data - // Use HTTP clone URL to relay's git endpoint so it can be released from purgatory + // Step 2: Send owner announcement with INVALID maintainer hex, then push git data. + // The announcement goes to purgatory first; the git push promotes it. + // The invalid maintainer hex should be handled gracefully (no panic). let owner_npub = owner_keys .public_key() .to_bech32() @@ -461,13 +524,8 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .unwrap(); send_to_relay(&relay, &owner_announcement).await.unwrap(); - - // Push git data to relay → releases owner announcement from purgatory let _git_dir = push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await; - println!("✓ Owner git data pushed to relay (announcement released from purgatory)"); - - // Wait for processing tokio::time::sleep(Duration::from_millis(500)).await; // Step 3: Verify owner announcement accepted, maintainer not re-processed @@ -497,5 +555,6 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { println!("✅ Invalid maintainer pubkey handled gracefully without panic"); + client.disconnect().await; relay.stop().await; } -- cgit v1.2.3