From d76003b629a4a03dba23a8a1c41da6e4ac4c30cf Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:17 +0000 Subject: feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion When git data arrives for a purgatory announcement and promotes it to the database, the relay now: 1. Upgrades the announcement's sync level in RepoSyncIndex from StateOnly to Full (git/sync.rs: process_purgatory_announcements) 2. Sends AddFilters actions to SyncManager for all connected relays, using Full sync filters (Layer 2 #a/#A/#q) to subscribe to PR events (purgatory/sync/context.rs: RealSyncContext.process_newly_available_git_data) 3. For user-submitted purgatory announcements, registers the repo in RepoSyncIndex with StateOnly level and sends AddFilters to SyncManager so it discovers and connects to relays listed in the announcement tags (nostr/builder.rs: handle_announcement AcceptPurgatory path) The RealSyncContext now accepts optional repo_sync_index and sync_action_tx parameters. main.rs wires these up from SyncManager. PolicyContext gains repo_sync_index and sync_action_tx fields for the write policy path. --- src/git/handlers.rs | 1 + src/git/sync.rs | 21 ++++++++ src/main.rs | 20 +++++++ src/nostr/builder.rs | 118 ++++++++++++++++++++++++++++++++++++++++++ src/nostr/policy/mod.rs | 37 +++++++++++++ src/purgatory/sync/context.rs | 116 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 313 insertions(+) (limited to 'src') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..129ca2c 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,6 +307,7 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + None, ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index 4b35023..b3fa11a 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -44,6 +44,7 @@ use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; +use crate::sync::{RepoSyncIndex, SyncLevel}; /// Result of processing newly available git data. /// @@ -809,6 +810,7 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) /// * `purgatory` - Purgatory instance to check for satisfiable events /// * `git_data_path` - Base path for git repositories +/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion /// /// # Returns /// A `ProcessResult` describing what was processed @@ -819,6 +821,7 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + repo_sync_index: Option, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -848,6 +851,7 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, + repo_sync_index.as_ref(), ) .await; result.merge(announcement_result); @@ -1284,6 +1288,7 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + repo_sync_index: Option<&RepoSyncIndex>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1338,6 +1343,22 @@ async fn process_purgatory_announcements( } } + // Upgrade sync level to Full in repo_sync_index + if let Some(index) = repo_sync_index { + let mut index = index.write().await; + // Use hex pubkey format to match how repo_sync_index keys are built + // (sync/mod.rs uses event.pubkey which is hex, not bech32) + let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); + if let Some(entry) = index.get_mut(&repo_id) { + entry.sync_level = SyncLevel::Full; + debug!( + identifier = %identifier, + repo_id = %repo_id, + "Upgraded sync level to Full after announcement promotion" + ); + } + } + result.announcements_released += 1; } Err(e) => { diff --git a/src/main.rs b/src/main.rs index ab6ede7..3ff30fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,24 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); + // Get a reference to the repo sync index for upgrading sync levels on promotion + let repo_sync_index = sync_manager.repo_sync_index(); + + // Set the repo sync index on the write policy so user-submitted purgatory + // announcements can trigger relay discovery (connect to relays in announcement tags) + relay_with_db + .write_policy + .set_repo_sync_index(repo_sync_index.clone()); + + // Get the action sender BEFORE consuming sync_manager with spawn + let action_tx = sync_manager.action_tx(); + + // Set the sync action sender so the write policy can trigger relay connections + // when user-submitted purgatory announcements are registered with StateOnly level + if let Some(tx) = action_tx.clone() { + relay_with_db.write_policy.set_sync_action_tx(tx); + } + tokio::spawn(async move { sync_manager.run().await; }); @@ -184,6 +202,8 @@ async fn main() -> Result<()> { Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), + Some(repo_sync_index), + action_tx, )); // Create throttle manager for rate limiting remote git servers diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..4c66f6d 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,6 +98,24 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } + /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. + /// + /// When a user submits an announcement that goes to purgatory (no git data yet), + /// the relay needs to discover and connect to relays listed in the announcement's + /// `relays` and `clone` tags. This index is updated when the announcement is accepted + /// into purgatory, triggering the sync system to connect and sync state events. + pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { + self.ctx.set_repo_sync_index(index); + } + + /// Set the sync action sender for sending AddFilters actions to SyncManager. + /// + /// This allows the write policy to notify the SyncManager when user-submitted + /// purgatory announcements need relay discovery (triggering new connections). + pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { + self.ctx.set_sync_action_tx(tx); + } + /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -146,6 +164,106 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); + + // Register in repo_sync_index with StateOnly level so the sync + // system discovers and connects to relays listed in this announcement. + // This is needed for user-submitted announcements (not via sync path) + // to trigger relay discovery and state event sync. + if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].to_string()) + } else { + None + } + }) { + let repo_id = + format!("30617:{}:{}", event.pubkey, identifier); + + // Get relay URLs stored in purgatory for this announcement + let relays = self + .ctx + .purgatory + .find_announcement(&event.pubkey, &identifier) + .map(|entry| entry.relays) + .unwrap_or_default(); + + if !relays.is_empty() { + use crate::sync::{ + AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, + }; + + // Update repo_sync_index with StateOnly for this repo + let new_repos = { + let mut index = repo_sync_index.write().await; + let entry = + index.entry(repo_id.clone()).or_insert_with(|| { + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + entry.relays.extend(relays.iter().cloned()); + // Don't upgrade if already Full + tracing::info!( + repo_id = %repo_id, + relay_count = entry.relays.len(), + "Registered user-submitted purgatory announcement in \ + RepoSyncIndex with StateOnly level for relay discovery" + ); + // Return cloned relays for AddFilters + relays.clone() + }; + + // Send AddFilters to SyncManager so it connects to these relays + if let Some(tx) = self.ctx.get_sync_action_tx() { + // Build state-only filters for this repo + let state_only_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &std::collections::HashSet::new(), + &state_only_repos, + &std::collections::HashSet::new(), + None, + ); + + for relay_url in new_repos { + // Skip our own domain + if relay_url.contains(&self.ctx.domain) { + continue; + } + let action = AddFilters { + relay_url: relay_url.clone(), + items: PendingItems { + repos: state_only_repos.clone(), + root_events: std::collections::HashSet::new(), + }, + filters: filters.clone(), + }; + if let Err(e) = tx.send(action).await { + tracing::warn!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters action for \ + user-submitted purgatory announcement" + ); + } else { + tracing::info!( + relay = %relay_url, + repo_id = %repo_id, + "Sent AddFilters to SyncManager for \ + user-submitted purgatory announcement relay" + ); + } + } + } + } + } + } + WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..78a09fc 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; +use crate::sync::{AddFilters, RepoSyncIndex}; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -34,6 +35,16 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, + /// Optional repo sync index for triggering relay discovery when announcements + /// go to purgatory via user submission (not via the sync path). + /// Wrapped in Arc for interior mutability (PolicyContext is Clone). + pub repo_sync_index: Arc>>, + /// Optional sender for AddFilters actions to SyncManager. + /// Used to trigger relay discovery when user-submitted purgatory announcements + /// are registered with StateOnly sync level. + /// Wrapped in Arc for interior mutability (PolicyContext is Clone). + pub sync_action_tx: + Arc>>>, } impl PolicyContext { @@ -51,6 +62,8 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, + repo_sync_index: Arc::new(std::sync::RwLock::new(None)), + sync_action_tx: Arc::new(std::sync::RwLock::new(None)), } } @@ -68,4 +81,28 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } + + /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + let mut guard = self.repo_sync_index.write().unwrap(); + *guard = Some(index); + } + + /// Get a clone of the repo sync index if it's been set. + pub fn get_repo_sync_index(&self) -> Option { + let guard = self.repo_sync_index.read().unwrap(); + guard.clone() + } + + /// Set the sync action sender for sending AddFilters actions to SyncManager. + pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { + let mut guard = self.sync_action_tx.write().unwrap(); + *guard = Some(tx); + } + + /// Get a clone of the sync action sender if it's been set. + pub fn get_sync_action_tx(&self) -> Option> { + let guard = self.sync_action_tx.read().unwrap(); + guard.clone() + } } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..4dbb402 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -193,6 +193,7 @@ use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::Purgatory; use crate::sync::naughty_list::NaughtyListTracker; +use crate::sync::RepoSyncIndex; use super::functions::extract_domain; @@ -221,6 +222,13 @@ pub struct RealSyncContext { /// Naughty list tracker for git remote domains with persistent errors git_naughty_list: Arc, + + /// Optional repo sync index for upgrading sync level on promotion + repo_sync_index: Option, + + /// Optional sender for AddFilters actions to SyncManager. + /// Used after announcement promotion to trigger PR event subscription on connected relays. + sync_action_tx: Option>, } impl RealSyncContext { @@ -233,6 +241,9 @@ impl RealSyncContext { /// * `our_domain` - Our domain to exclude from clone URLs /// * `local_relay` - Local relay for WebSocket notifications /// * `git_naughty_list` - Naughty list tracker for git remote domains + /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion + /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion + #[allow(clippy::too_many_arguments)] pub fn new( purgatory: Arc, database: SharedDatabase, @@ -240,6 +251,8 @@ impl RealSyncContext { our_domain: Option, local_relay: Option, git_naughty_list: Arc, + repo_sync_index: Option, + sync_action_tx: Option>, ) -> Self { Self { purgatory, @@ -248,9 +261,23 @@ impl RealSyncContext { our_domain_value: our_domain, local_relay, git_naughty_list, + repo_sync_index, + sync_action_tx, } } + /// Set the sync action sender for triggering filter recomputation after announcement promotion. + /// + /// When an announcement is promoted from purgatory to Full sync level, the SyncManager + /// needs to subscribe to PR events for that repo on all connected relays. This sender + /// is used to trigger that subscription. + pub fn set_sync_action_tx( + &mut self, + tx: tokio::sync::mpsc::Sender, + ) { + self.sync_action_tx = Some(tx); + } + /// Get reference to the git naughty list tracker pub fn git_naughty_list(&self) -> &Arc { &self.git_naughty_list @@ -482,9 +509,98 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, + self.repo_sync_index.clone(), ) .await?; + // If announcements were promoted (now Full sync level), notify SyncManager to + // recompute filters so PR event subscriptions are created on connected relays. + if result.announcements_released > 0 { + if let (Some(ref tx), Some(ref repo_sync_index)) = + (&self.sync_action_tx, &self.repo_sync_index) + { + let index = repo_sync_index.read().await; + for (repo_id, needs) in index.iter() { + if needs.sync_level == crate::sync::SyncLevel::Full + && !needs.root_events.is_empty() + { + // Send AddFilters for Full repos with root events + for relay_url in &needs.relays { + if let Some(ref domain) = self.our_domain_value { + if relay_url.contains(domain.as_str()) { + continue; + } + } + let full_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &full_repos, + &std::collections::HashSet::new(), + &needs.root_events, + None, + ); + let action = crate::sync::AddFilters { + relay_url: relay_url.clone(), + items: crate::sync::PendingItems { + repos: full_repos.clone(), + root_events: needs.root_events.clone(), + }, + filters, + }; + if let Err(e) = tx.send(action).await { + debug!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters after announcement promotion" + ); + } else { + debug!( + relay = %relay_url, + repo_id = %repo_id, + "Sent AddFilters to SyncManager after announcement promotion" + ); + } + } + } else if needs.sync_level == crate::sync::SyncLevel::Full { + // Even without root_events, send empty repo filter to ensure + // Layer 2 subscriptions (PR events) are set up + for relay_url in &needs.relays { + if let Some(ref domain) = self.our_domain_value { + if relay_url.contains(domain.as_str()) { + continue; + } + } + let full_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &full_repos, + &std::collections::HashSet::new(), + &std::collections::HashSet::new(), + None, + ); + let action = crate::sync::AddFilters { + relay_url: relay_url.clone(), + items: crate::sync::PendingItems { + repos: full_repos.clone(), + root_events: std::collections::HashSet::new(), + }, + filters, + }; + if let Err(e) = tx.send(action).await { + debug!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters (no root_events) after announcement promotion" + ); + } + } + } + } + } + } + // Convert from git::sync::ProcessResult to our ProcessResult Ok(ProcessResult { states_released: result.states_released, -- cgit v1.2.3