From ee113a654e2971a6ebdb07398cc5638dbe59b48c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 20:32:13 +0000 Subject: fix: replace repo_sync_index wiring with purgatory announcement sync timer Instead of threading repo_sync_index through PolicyContext/builder.rs/main.rs to handle user-submitted purgatory announcements, add a simple background timer (run_purgatory_announcement_sync, every 5s) that scans the purgatory for announcement entries and registers them in repo_sync_index as StateOnly. This is simpler and covers both flows: - Sync-path announcements: inline registration still happens during event processing (sync/mod.rs:1839+), timer provides a safety net - User-submitted announcements: SelfSubscriber never sees them (rejected from DB), timer is the primary registration path The timer calls sync_purgatory_announcements_to_index() which: 1. Snapshots purgatory via new announcements_for_sync() public method 2. Or_inserts StateOnly entries (never downgrades Full entries) 3. Detects newly added relay URLs and calls handle_new_sync_filters to connect and subscribe - fixing the failing test that expected relay discovery from a user-submitted purgatory announcement Removes: repo_sync_index field from PolicyContext, set/get_repo_sync_index methods, set_repo_sync_index on Nip34WritePolicy, wiring in main.rs, and the inline AcceptPurgatory registration block in builder.rs. --- src/sync/mod.rs | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 116 insertions(+), 9 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 916e2b0..ed5b6e7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -397,6 +397,37 @@ async fn run_daily_timer( } } +/// Background task that periodically syncs purgatory announcements into repo_sync_index. +/// +/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there +/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` +/// which connects to the relay URLs listed in the announcement and subscribes to state +/// events (kind 30618). +/// +/// This covers two cases: +/// - Sync-path announcements: registered inline during event processing, but this +/// provides a safety net in case the inline registration was missed. +/// - User-submitted purgatory announcements: the SelfSubscriber never sees them +/// (they're rejected from DB), so this timer is the primary registration path. +async fn run_purgatory_announcement_sync( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { + let interval = Duration::from_secs(5); + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + let mut manager = sync_manager.lock().await; + manager.sync_purgatory_announcements_to_index().await; + } + _ = shutdown_rx.recv() => { + tracing::debug!("Purgatory announcement sync timer received shutdown signal"); + break; + } + } + } +} + // Combined Health and Metrics Checker /// Background task for cleaning up expired entries from the rejected events index @@ -700,14 +731,6 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } - /// Get a clone of the repo sync index Arc. - /// - /// This allows the write policy to register user-submitted purgatory announcements - /// in the sync index so that state event sync starts promptly. - pub fn repo_sync_index(&self) -> RepoSyncIndex { - self.repo_sync_index.clone() - } - /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -1560,7 +1583,17 @@ impl SyncManager { run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; }); - // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications + // 11. Spawn purgatory announcement sync timer (every 5s) + // Ensures purgatory announcements (including user-submitted ones that never + // touch the DB) are registered in repo_sync_index as StateOnly so that + // state event subscriptions are established on their listed relay URLs. + let purgatory_sync_manager = Arc::clone(&sync_manager); + let purgatory_sync_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { + run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await; + }); + + // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications loop { // Wait for an event without holding the lock tokio::select! { @@ -2419,6 +2452,80 @@ impl SyncManager { } } + /// Sync purgatory announcements into repo_sync_index as StateOnly entries. + /// + /// Called periodically by the purgatory announcement sync timer (every 5s). + /// For each announcement currently in purgatory, ensures a `StateOnly` entry + /// exists in `repo_sync_index`. New entries are then picked up by + /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes + /// to state events for that repo. + /// + /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full). + async fn sync_purgatory_announcements_to_index(&mut self) { + use crate::sync::algorithms::{compute_actions, derive_relay_targets}; + + // Collect all purgatory announcements (snapshot - no async holds) + let announcements = self.purgatory.announcements_for_sync(); + + if announcements.is_empty() { + return; + } + + // Register any new entries in repo_sync_index as StateOnly + let mut new_relay_urls: std::collections::HashSet = std::collections::HashSet::new(); + { + let mut index = self.repo_sync_index.write().await; + for (repo_id, relays) in &announcements { + let entry = index.entry(repo_id.clone()).or_insert_with(|| { + tracing::debug!( + repo_id = %repo_id, + "Registering purgatory announcement in repo_sync_index as StateOnly" + ); + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + // Don't downgrade an already-Full entry + // Add any new relay URLs + for relay in relays { + if entry.relays.insert(relay.clone()) { + new_relay_urls.insert(relay.clone()); + } + } + } + } + + if new_relay_urls.is_empty() { + return; + } + + // For any relay URLs that are new, compute and send AddFilters actions + let all_targets = { + let repo_index = self.repo_sync_index.read().await; + derive_relay_targets(&repo_index) + }; + + let actions = { + let pending_index = self.pending_sync_index.read().await; + let relay_index = self.relay_sync_index.read().await; + compute_actions(&all_targets, &pending_index, &relay_index) + }; + + for action in actions { + // Only act on relays that have new URLs (avoids redundant work) + if new_relay_urls.contains(&action.relay_url) { + tracing::info!( + relay = %action.relay_url, + repos = action.items.repos.len(), + "Purgatory sync timer: connecting to new relay from purgatory announcement" + ); + self.handle_new_sync_filters(action).await; + } + } + } + /// Handle a relay disconnection /// /// This method is called when the event loop terminates and sends a disconnect notification. -- cgit v1.2.3