From 3d9359d5ac0045fb93fd8732160e0de8413d6881 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:28:28 +0000 Subject: Revert "feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion" This reverts commit d76003b629a4a03dba23a8a1c41da6e4ac4c30cf. --- src/git/handlers.rs | 1 - src/git/sync.rs | 21 -------- src/main.rs | 20 ------- src/nostr/builder.rs | 118 ------------------------------------------ src/nostr/policy/mod.rs | 37 ------------- src/purgatory/sync/context.rs | 116 ----------------------------------------- 6 files changed, 313 deletions(-) diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 129ca2c..017eee4 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -307,7 +307,6 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, - None, ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index b3fa11a..4b35023 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -44,7 +44,6 @@ use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; -use crate::sync::{RepoSyncIndex, SyncLevel}; /// Result of processing newly available git data. /// @@ -810,7 +809,6 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) /// * `purgatory` - Purgatory instance to check for satisfiable events /// * `git_data_path` - Base path for git repositories -/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion /// /// # Returns /// A `ProcessResult` describing what was processed @@ -821,7 +819,6 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -851,7 +848,6 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, - repo_sync_index.as_ref(), ) .await; result.merge(announcement_result); @@ -1288,7 +1284,6 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, - repo_sync_index: Option<&RepoSyncIndex>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1343,22 +1338,6 @@ async fn process_purgatory_announcements( } } - // Upgrade sync level to Full in repo_sync_index - if let Some(index) = repo_sync_index { - let mut index = index.write().await; - // Use hex pubkey format to match how repo_sync_index keys are built - // (sync/mod.rs uses event.pubkey which is hex, not bech32) - let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); - if let Some(entry) = index.get_mut(&repo_id) { - entry.sync_level = SyncLevel::Full; - debug!( - identifier = %identifier, - repo_id = %repo_id, - "Upgraded sync level to Full after announcement promotion" - ); - } - } - result.announcements_released += 1; } Err(e) => { diff --git a/src/main.rs b/src/main.rs index 3ff30fb..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,24 +132,6 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); - // Get a reference to the repo sync index for upgrading sync levels on promotion - let repo_sync_index = sync_manager.repo_sync_index(); - - // Set the repo sync index on the write policy so user-submitted purgatory - // announcements can trigger relay discovery (connect to relays in announcement tags) - relay_with_db - .write_policy - .set_repo_sync_index(repo_sync_index.clone()); - - // Get the action sender BEFORE consuming sync_manager with spawn - let action_tx = sync_manager.action_tx(); - - // Set the sync action sender so the write policy can trigger relay connections - // when user-submitted purgatory announcements are registered with StateOnly level - if let Some(tx) = action_tx.clone() { - relay_with_db.write_policy.set_sync_action_tx(tx); - } - tokio::spawn(async move { sync_manager.run().await; }); @@ -202,8 +184,6 @@ async fn main() -> Result<()> { Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), - Some(repo_sync_index), - action_tx, )); // Create throttle manager for rate limiting remote git servers diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 4c66f6d..aff12a6 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -98,24 +98,6 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - /// - /// When a user submits an announcement that goes to purgatory (no git data yet), - /// the relay needs to discover and connect to relays listed in the announcement's - /// `relays` and `clone` tags. This index is updated when the announcement is accepted - /// into purgatory, triggering the sync system to connect and sync state events. - pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) { - self.ctx.set_repo_sync_index(index); - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - /// - /// This allows the write policy to notify the SyncManager when user-submitted - /// purgatory announcements need relay discovery (triggering new connections). - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - self.ctx.set_sync_action_tx(tx); - } - /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -164,106 +146,6 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); - - // Register in repo_sync_index with StateOnly level so the sync - // system discovers and connects to relays listed in this announcement. - // This is needed for user-submitted announcements (not via sync path) - // to trigger relay discovery and state event sync. - if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = - format!("30617:{}:{}", event.pubkey, identifier); - - // Get relay URLs stored in purgatory for this announcement - let relays = self - .ctx - .purgatory - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - if !relays.is_empty() { - use crate::sync::{ - AddFilters, PendingItems, RepoSyncNeeds, SyncLevel, - }; - - // Update repo_sync_index with StateOnly for this repo - let new_repos = { - let mut index = repo_sync_index.write().await; - let entry = - index.entry(repo_id.clone()).or_insert_with(|| { - RepoSyncNeeds { - relays: std::collections::HashSet::new(), - root_events: std::collections::HashSet::new(), - sync_level: SyncLevel::StateOnly, - } - }); - entry.relays.extend(relays.iter().cloned()); - // Don't upgrade if already Full - tracing::info!( - repo_id = %repo_id, - relay_count = entry.relays.len(), - "Registered user-submitted purgatory announcement in \ - RepoSyncIndex with StateOnly level for relay discovery" - ); - // Return cloned relays for AddFilters - relays.clone() - }; - - // Send AddFilters to SyncManager so it connects to these relays - if let Some(tx) = self.ctx.get_sync_action_tx() { - // Build state-only filters for this repo - let state_only_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &std::collections::HashSet::new(), - &state_only_repos, - &std::collections::HashSet::new(), - None, - ); - - for relay_url in new_repos { - // Skip our own domain - if relay_url.contains(&self.ctx.domain) { - continue; - } - let action = AddFilters { - relay_url: relay_url.clone(), - items: PendingItems { - repos: state_only_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters: filters.clone(), - }; - if let Err(e) = tx.send(action).await { - tracing::warn!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters action for \ - user-submitted purgatory announcement" - ); - } else { - tracing::info!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager for \ - user-submitted purgatory announcement relay" - ); - } - } - } - } - } - } - WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 78a09fc..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; -use crate::sync::{AddFilters, RepoSyncIndex}; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -35,16 +34,6 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, - /// Optional repo sync index for triggering relay discovery when announcements - /// go to purgatory via user submission (not via the sync path). - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub repo_sync_index: Arc>>, - /// Optional sender for AddFilters actions to SyncManager. - /// Used to trigger relay discovery when user-submitted purgatory announcements - /// are registered with StateOnly sync level. - /// Wrapped in Arc for interior mutability (PolicyContext is Clone). - pub sync_action_tx: - Arc>>>, } impl PolicyContext { @@ -62,8 +51,6 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, - repo_sync_index: Arc::new(std::sync::RwLock::new(None)), - sync_action_tx: Arc::new(std::sync::RwLock::new(None)), } } @@ -81,28 +68,4 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } - - /// Set the repo sync index for relay discovery from user-submitted purgatory announcements. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - let mut guard = self.repo_sync_index.write().unwrap(); - *guard = Some(index); - } - - /// Get a clone of the repo sync index if it's been set. - pub fn get_repo_sync_index(&self) -> Option { - let guard = self.repo_sync_index.read().unwrap(); - guard.clone() - } - - /// Set the sync action sender for sending AddFilters actions to SyncManager. - pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender) { - let mut guard = self.sync_action_tx.write().unwrap(); - *guard = Some(tx); - } - - /// Get a clone of the sync action sender if it's been set. - pub fn get_sync_action_tx(&self) -> Option> { - let guard = self.sync_action_tx.read().unwrap(); - guard.clone() - } } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 4dbb402..3568e89 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -193,7 +193,6 @@ use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; use crate::purgatory::Purgatory; use crate::sync::naughty_list::NaughtyListTracker; -use crate::sync::RepoSyncIndex; use super::functions::extract_domain; @@ -222,13 +221,6 @@ pub struct RealSyncContext { /// Naughty list tracker for git remote domains with persistent errors git_naughty_list: Arc, - - /// Optional repo sync index for upgrading sync level on promotion - repo_sync_index: Option, - - /// Optional sender for AddFilters actions to SyncManager. - /// Used after announcement promotion to trigger PR event subscription on connected relays. - sync_action_tx: Option>, } impl RealSyncContext { @@ -241,9 +233,6 @@ impl RealSyncContext { /// * `our_domain` - Our domain to exclude from clone URLs /// * `local_relay` - Local relay for WebSocket notifications /// * `git_naughty_list` - Naughty list tracker for git remote domains - /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion - /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion - #[allow(clippy::too_many_arguments)] pub fn new( purgatory: Arc, database: SharedDatabase, @@ -251,8 +240,6 @@ impl RealSyncContext { our_domain: Option, local_relay: Option, git_naughty_list: Arc, - repo_sync_index: Option, - sync_action_tx: Option>, ) -> Self { Self { purgatory, @@ -261,23 +248,9 @@ impl RealSyncContext { our_domain_value: our_domain, local_relay, git_naughty_list, - repo_sync_index, - sync_action_tx, } } - /// Set the sync action sender for triggering filter recomputation after announcement promotion. - /// - /// When an announcement is promoted from purgatory to Full sync level, the SyncManager - /// needs to subscribe to PR events for that repo on all connected relays. This sender - /// is used to trigger that subscription. - pub fn set_sync_action_tx( - &mut self, - tx: tokio::sync::mpsc::Sender, - ) { - self.sync_action_tx = Some(tx); - } - /// Get reference to the git naughty list tracker pub fn git_naughty_list(&self) -> &Arc { &self.git_naughty_list @@ -509,98 +482,9 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, - self.repo_sync_index.clone(), ) .await?; - // If announcements were promoted (now Full sync level), notify SyncManager to - // recompute filters so PR event subscriptions are created on connected relays. - if result.announcements_released > 0 { - if let (Some(ref tx), Some(ref repo_sync_index)) = - (&self.sync_action_tx, &self.repo_sync_index) - { - let index = repo_sync_index.read().await; - for (repo_id, needs) in index.iter() { - if needs.sync_level == crate::sync::SyncLevel::Full - && !needs.root_events.is_empty() - { - // Send AddFilters for Full repos with root events - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &needs.root_events, - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: needs.root_events.clone(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters after announcement promotion" - ); - } else { - debug!( - relay = %relay_url, - repo_id = %repo_id, - "Sent AddFilters to SyncManager after announcement promotion" - ); - } - } - } else if needs.sync_level == crate::sync::SyncLevel::Full { - // Even without root_events, send empty repo filter to ensure - // Layer 2 subscriptions (PR events) are set up - for relay_url in &needs.relays { - if let Some(ref domain) = self.our_domain_value { - if relay_url.contains(domain.as_str()) { - continue; - } - } - let full_repos: std::collections::HashSet = - std::iter::once(repo_id.clone()).collect(); - let filters = - crate::sync::filters::build_sync_level_aware_filters( - &full_repos, - &std::collections::HashSet::new(), - &std::collections::HashSet::new(), - None, - ); - let action = crate::sync::AddFilters { - relay_url: relay_url.clone(), - items: crate::sync::PendingItems { - repos: full_repos.clone(), - root_events: std::collections::HashSet::new(), - }, - filters, - }; - if let Err(e) = tx.send(action).await { - debug!( - relay = %relay_url, - error = %e, - "Failed to send AddFilters (no root_events) after announcement promotion" - ); - } - } - } - } - } - } - // Convert from git::sync::ProcessResult to our ProcessResult Ok(ProcessResult { states_released: result.states_released, -- cgit v1.2.3