From d76003b629a4a03dba23a8a1c41da6e4ac4c30cf Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:17 +0000 Subject: feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion When git data arrives for a purgatory announcement and promotes it to the database, the relay now: 1. Upgrades the announcement's sync level in RepoSyncIndex from StateOnly to Full (git/sync.rs: process_purgatory_announcements) 2. Sends AddFilters actions to SyncManager for all connected relays, using Full sync filters (Layer 2 #a/#A/#q) to subscribe to PR events (purgatory/sync/context.rs: RealSyncContext.process_newly_available_git_data) 3. For user-submitted purgatory announcements, registers the repo in RepoSyncIndex with StateOnly level and sends AddFilters to SyncManager so it discovers and connects to relays listed in the announcement tags (nostr/builder.rs: handle_announcement AcceptPurgatory path) The RealSyncContext now accepts optional repo_sync_index and sync_action_tx parameters. main.rs wires these up from SyncManager. PolicyContext gains repo_sync_index and sync_action_tx fields for the write policy path. --- src/nostr/builder.rs | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) (limited to 'src/nostr/builder.rs') diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..4c66f6d 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,6 +98,24 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } + /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. + /// + /// When a user submits an announcement that goes to purgatory (no git data yet), + /// the relay needs to discover and connect to relays listed in the announcement's + /// `relays` and `clone` tags. This index is updated when the announcement is accepted + /// into purgatory, triggering the sync system to connect and sync state events. + pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { + self.ctx.set_repo_sync_index(index); + } + + /// Set the sync action sender for sending AddFilters actions to SyncManager. + /// + /// This allows the write policy to notify the SyncManager when user-submitted + /// purgatory announcements need relay discovery (triggering new connections). + pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { + self.ctx.set_sync_action_tx(tx); + } + /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -146,6 +164,106 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); + + // Register in repo_sync_index with StateOnly level so the sync + // system discovers and connects to relays listed in this announcement. + // This is needed for user-submitted announcements (not via sync path) + // to trigger relay discovery and state event sync. + if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].to_string()) + } else { + None + } + }) { + let repo_id = + format!("30617:{}:{}", event.pubkey, identifier); + + // Get relay URLs stored in purgatory for this announcement + let relays = self + .ctx + .purgatory + .find_announcement(&event.pubkey, &identifier) + .map(|entry| entry.relays) + .unwrap_or_default(); + + if !relays.is_empty() { + use crate::sync::{ + AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, + }; + + // Update repo_sync_index with StateOnly for this repo + let new_repos = { + let mut index = repo_sync_index.write().await; + let entry = + index.entry(repo_id.clone()).or_insert_with(|| { + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + entry.relays.extend(relays.iter().cloned()); + // Don't upgrade if already Full + tracing::info!( + repo_id = %repo_id, + relay_count = entry.relays.len(), + "Registered user-submitted purgatory announcement in \ + RepoSyncIndex with StateOnly level for relay discovery" + ); + // Return cloned relays for AddFilters + relays.clone() + }; + + // Send AddFilters to SyncManager so it connects to these relays + if let Some(tx) = self.ctx.get_sync_action_tx() { + // Build state-only filters for this repo + let state_only_repos: std::collections::HashSet = + std::iter::once(repo_id.clone()).collect(); + let filters = + crate::sync::filters::build_sync_level_aware_filters( + &std::collections::HashSet::new(), + &state_only_repos, + &std::collections::HashSet::new(), + None, + ); + + for relay_url in new_repos { + // Skip our own domain + if relay_url.contains(&self.ctx.domain) { + continue; + } + let action = AddFilters { + relay_url: relay_url.clone(), + items: PendingItems { + repos: state_only_repos.clone(), + root_events: std::collections::HashSet::new(), + }, + filters: filters.clone(), + }; + if let Err(e) = tx.send(action).await { + tracing::warn!( + relay = %relay_url, + error = %e, + "Failed to send AddFilters action for \ + user-submitted purgatory announcement" + ); + } else { + tracing::info!( + relay = %relay_url, + repo_id = %repo_id, + "Sent AddFilters to SyncManager for \ + user-submitted purgatory announcement relay" + ); + } + } + } + } + } + } + WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), -- cgit v1.2.3