From ee113a654e2971a6ebdb07398cc5638dbe59b48c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 20:32:13 +0000 Subject: fix: replace repo_sync_index wiring with purgatory announcement sync timer Instead of threading repo_sync_index through PolicyContext/builder.rs/main.rs to handle user-submitted purgatory announcements, add a simple background timer (run_purgatory_announcement_sync, every 5s) that scans the purgatory for announcement entries and registers them in repo_sync_index as StateOnly. This is simpler and covers both flows: - Sync-path announcements: inline registration still happens during event processing (sync/mod.rs:1839+), timer provides a safety net - User-submitted announcements: SelfSubscriber never sees them (rejected from DB), timer is the primary registration path The timer calls sync_purgatory_announcements_to_index() which: 1. Snapshots purgatory via new announcements_for_sync() public method 2. Or_inserts StateOnly entries (never downgrades Full entries) 3. Detects newly added relay URLs and calls handle_new_sync_filters to connect and subscribe - fixing the failing test that expected relay discovery from a user-submitted purgatory announcement Removes: repo_sync_index field from PolicyContext, set/get_repo_sync_index methods, set_repo_sync_index on Nip34WritePolicy, wiring in main.rs, and the inline AcceptPurgatory registration block in builder.rs. --- src/main.rs | 7 --- src/nostr/builder.rs | 54 +-------------------- src/nostr/policy/mod.rs | 19 -------- src/purgatory/mod.rs | 17 +++++++ src/sync/mod.rs | 125 ++++++++++++++++++++++++++++++++++++++++++++---- 5 files changed, 134 insertions(+), 88 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index ebe05a3..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,13 +132,6 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); - // Wire repo_sync_index into write policy so user-submitted purgatory announcements - // get registered for state event sync immediately (Fix 3). - let repo_sync_index = sync_manager.repo_sync_index(); - relay_with_db - .write_policy - .set_repo_sync_index(repo_sync_index); - tokio::spawn(async move { sync_manager.run().await; }); diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 8d1e461..c2d4939 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -17,7 +17,7 @@ use crate::nostr::policy::{ AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; -use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; + /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; @@ -99,14 +99,6 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } - /// Set the repo sync index so that user-submitted purgatory announcements can - /// be registered for state event sync immediately. - /// - /// This must be called after SyncManager is created. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - self.ctx.set_repo_sync_index(index); - } - /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -156,50 +148,6 @@ impl Nip34WritePolicy { event_id_str ); - // Register repo in repo_sync_index with StateOnly level so that - // state event sync starts promptly via the next batch EOSE recompute. - // This handles user-submitted purgatory announcements - the SelfSubscriber - // only sees DB events, so it won't pick these up automatically. - if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { - if let Ok(announcement) = - RepositoryAnnouncement::from_event(event.clone()) - { - use std::collections::HashSet; - let repo_id = format!( - "30617:{}:{}", - event.pubkey, - announcement.identifier - ); - - // Extract relay URLs from the announcement event tags - let relays: HashSet = event - .tags - .iter() - .flat_map(|tag| { - let tag_vec = tag.as_slice(); - if !tag_vec.is_empty() && tag_vec[0] == "relays" { - tag_vec[1..].iter().map(|s| s.to_string()).collect::>() - } else { - vec![] - } - }) - .collect(); - - let mut index = repo_sync_index.write().await; - index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { - relays, - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - drop(index); - - tracing::debug!( - repo_id = %repo_id, - "Registered purgatory announcement in repo_sync_index as StateOnly" - ); - } - } - WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index c958586..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; -use crate::sync::RepoSyncIndex; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -35,8 +34,6 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, - /// Repo sync index for registering purgatory announcements (set after SyncManager creation) - pub repo_sync_index: Arc>>, } impl PolicyContext { @@ -54,7 +51,6 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, - repo_sync_index: Arc::new(std::sync::RwLock::new(None)), } } @@ -72,19 +68,4 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } - - /// Set the repo sync index after SyncManager has been created. - /// - /// This allows purgatory announcements submitted by users to be registered - /// in the sync index so state event sync starts promptly. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - let mut guard = self.repo_sync_index.write().unwrap(); - *guard = Some(index); - } - - /// Get a clone of the repo sync index if it has been set. - pub fn get_repo_sync_index(&self) -> Option { - let guard = self.repo_sync_index.read().unwrap(); - guard.clone() - } } diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 3b5514b..1894738 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -680,6 +680,23 @@ impl Purgatory { self.announcement_purgatory.len() } + /// Collect (repo_id, relay_urls) for all announcements currently in purgatory. + /// + /// Returns a vec of `(repo_id, relay_urls)` where `repo_id` is the addressable + /// coordinate string `"30617:{pubkey_hex}:{identifier}"`. Used by the purgatory + /// announcement sync timer to register StateOnly entries in `repo_sync_index`. + pub fn announcements_for_sync(&self) -> Vec<(String, HashSet)> { + self.announcement_purgatory + .iter() + .map(|entry| { + let (owner, identifier) = entry.key(); + let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); + let relays = entry.value().relays.clone(); + (repo_id, relays) + }) + .collect() + } + /// Get all event IDs currently stored in purgatory AND previously expired events. /// /// Returns a HashSet of all event IDs for: diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 916e2b0..ed5b6e7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -397,6 +397,37 @@ async fn run_daily_timer( } } +/// Background task that periodically syncs purgatory announcements into repo_sync_index. +/// +/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there +/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` +/// which connects to the relay URLs listed in the announcement and subscribes to state +/// events (kind 30618). +/// +/// This covers two cases: +/// - Sync-path announcements: registered inline during event processing, but this +/// provides a safety net in case the inline registration was missed. +/// - User-submitted purgatory announcements: the SelfSubscriber never sees them +/// (they're rejected from DB), so this timer is the primary registration path. +async fn run_purgatory_announcement_sync( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { + let interval = Duration::from_secs(5); + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + let mut manager = sync_manager.lock().await; + manager.sync_purgatory_announcements_to_index().await; + } + _ = shutdown_rx.recv() => { + tracing::debug!("Purgatory announcement sync timer received shutdown signal"); + break; + } + } + } +} + // Combined Health and Metrics Checker /// Background task for cleaning up expired entries from the rejected events index @@ -700,14 +731,6 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } - /// Get a clone of the repo sync index Arc. - /// - /// This allows the write policy to register user-submitted purgatory announcements - /// in the sync index so that state event sync starts promptly. - pub fn repo_sync_index(&self) -> RepoSyncIndex { - self.repo_sync_index.clone() - } - /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -1560,7 +1583,17 @@ impl SyncManager { run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; }); - // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications + // 11. Spawn purgatory announcement sync timer (every 5s) + // Ensures purgatory announcements (including user-submitted ones that never + // touch the DB) are registered in repo_sync_index as StateOnly so that + // state event subscriptions are established on their listed relay URLs. + let purgatory_sync_manager = Arc::clone(&sync_manager); + let purgatory_sync_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { + run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await; + }); + + // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications loop { // Wait for an event without holding the lock tokio::select! { @@ -2419,6 +2452,80 @@ impl SyncManager { } } + /// Sync purgatory announcements into repo_sync_index as StateOnly entries. + /// + /// Called periodically by the purgatory announcement sync timer (every 5s). + /// For each announcement currently in purgatory, ensures a `StateOnly` entry + /// exists in `repo_sync_index`. New entries are then picked up by + /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes + /// to state events for that repo. + /// + /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full). + async fn sync_purgatory_announcements_to_index(&mut self) { + use crate::sync::algorithms::{compute_actions, derive_relay_targets}; + + // Collect all purgatory announcements (snapshot - no async holds) + let announcements = self.purgatory.announcements_for_sync(); + + if announcements.is_empty() { + return; + } + + // Register any new entries in repo_sync_index as StateOnly + let mut new_relay_urls: std::collections::HashSet = std::collections::HashSet::new(); + { + let mut index = self.repo_sync_index.write().await; + for (repo_id, relays) in &announcements { + let entry = index.entry(repo_id.clone()).or_insert_with(|| { + tracing::debug!( + repo_id = %repo_id, + "Registering purgatory announcement in repo_sync_index as StateOnly" + ); + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + // Don't downgrade an already-Full entry + // Add any new relay URLs + for relay in relays { + if entry.relays.insert(relay.clone()) { + new_relay_urls.insert(relay.clone()); + } + } + } + } + + if new_relay_urls.is_empty() { + return; + } + + // For any relay URLs that are new, compute and send AddFilters actions + let all_targets = { + let repo_index = self.repo_sync_index.read().await; + derive_relay_targets(&repo_index) + }; + + let actions = { + let pending_index = self.pending_sync_index.read().await; + let relay_index = self.relay_sync_index.read().await; + compute_actions(&all_targets, &pending_index, &relay_index) + }; + + for action in actions { + // Only act on relays that have new URLs (avoids redundant work) + if new_relay_urls.contains(&action.relay_url) { + tracing::info!( + relay = %action.relay_url, + repos = action.items.repos.len(), + "Purgatory sync timer: connecting to new relay from purgatory announcement" + ); + self.handle_new_sync_filters(action).await; + } + } + } + /// Handle a relay disconnection /// /// This method is called when the event loop terminates and sends a disconnect notification. -- cgit v1.2.3