From a804164468d3beafb243ece12555b4d1692a075d Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:28:44 +0000 Subject: Revert "fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery" This reverts commit 806936e7d1aab5dfd0c2ad6b98a115122dc1785c. --- src/sync/mod.rs | 115 ++++---------------------------------------------------- 1 file changed, 7 insertions(+), 108 deletions(-) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6ab8d33..519017b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -557,13 +557,6 @@ pub struct SyncManager { /// Purgatory for read-only access to events awaiting git data purgatory: Arc, /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) - // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters - // actions when user-submitted purgatory announcements need to trigger relay discovery. - /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) - #[allow(dead_code)] - action_tx: Option>, - /// Receiver for AddFilters actions (taken by run() when the event loop starts) - action_rx: Option>, local_relay: LocalRelay, /// Configuration reference for sync settings config: Config, @@ -650,11 +643,6 @@ impl SyncManager { } } - // Create action channel upfront so callers (e.g. write policy) can send AddFilters - // actions before run() is called (e.g. when user-submitted purgatory announcements - // need to trigger relay discovery). - let (action_tx, action_rx) = tokio::sync::mpsc::channel::(100); - Self { bootstrap_relay_url, service_domain, @@ -675,22 +663,9 @@ impl SyncManager { connect_tx: None, shutdown_tx: None, metrics: sync_metrics, - action_tx: Some(action_tx), - action_rx: Some(action_rx), } } - /// Get a clone of the action sender for external use. - /// - /// This allows the write policy to send AddFilters actions to the SyncManager - /// when user-submitted purgatory announcements need to trigger relay discovery. - /// - /// # Returns - /// Clone of the action sender, or None if the channel was never created. - pub fn action_tx(&self) -> Option> { - self.action_tx.clone() - } - /// Generate a unique batch ID /// /// Increments the internal counter and returns the new value. @@ -711,17 +686,6 @@ impl SyncManager { self.rejected_events_index.clone() } - /// Get a clone of the repo sync index. - /// - /// This allows access to the repo sync index for upgrading sync levels - /// when announcements are promoted from purgatory. - /// - /// # Returns - /// Clone of the repo sync index (Arc>) - pub fn repo_sync_index(&self) -> RepoSyncIndex { - self.repo_sync_index.clone() - } - /// Save rejected events index to disk. /// /// This is called during shutdown to persist the rejected events cache, @@ -985,31 +949,11 @@ impl SyncManager { // Drop the lock before async operations drop(pending); - // Create REQ+EOSE subscriptions using sync-level-aware filters. + // Create REQ+EOSE subscriptions using original semantic filters // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail. - // - // CRITICAL: Use build_sync_level_aware_filters to avoid generating - // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements - // are still in purgatory. If we send Layer 2 filters too early, the - // remote relay may return PR events that our write policy rejects as - // "orphan" (no promoted repo). nostr-sdk deduplication then silently - // drops the event on retry, making it permanently unavailable. - let (full_repos, state_only_repos) = { - let index = self.repo_sync_index.read().await; - let mut full = HashSet::new(); - let mut state_only = HashSet::new(); - for repo_id in &batch_repos { - match index.get(repo_id).map(|n| n.sync_level) { - Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } - _ => { full.insert(repo_id.clone()); } - } - } - (full, state_only) - }; - let fallback_filters = filters::build_sync_level_aware_filters( - &full_repos, - &state_only_repos, + // succeed even when ID-based queries fail + let fallback_filters = filters::build_layer2_and_layer3_filters( + &batch_repos, &batch_root_events, None, ); @@ -1093,20 +1037,8 @@ impl SyncManager { pending.remove(&relay_url_for_fallback); } drop(pending); - let is_generic_filter = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - - // Trigger filter recomputation for generic filter batches - if is_generic_filter { - tracing::info!( - relay = %relay_url_for_fallback, - "Announcement batch complete (fallback path) - triggering filter recomputation" - ); - self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) - .await; - } } } return; @@ -1204,20 +1136,8 @@ impl SyncManager { pending.remove(&relay_url_for_retry); } drop(pending); - let is_generic_filter = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - - // Trigger filter recomputation for generic filter batches - if is_generic_filter { - tracing::info!( - relay = %relay_url_for_retry, - "Announcement batch complete (retry path) - triggering filter recomputation" - ); - self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) - .await; - } } } return; @@ -1238,20 +1158,7 @@ impl SyncManager { drop(pending); // 4. Confirm the batch (moves items to RelayState) - let is_generic_filter = - completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter batches (announcements), trigger filter recomputation - // to subscribe to state events for purgatory announcements that were registered - // during event processing. - if is_generic_filter { - tracing::info!( - relay = %relay_url, - "Announcement batch complete - triggering filter recomputation for purgatory repos" - ); - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1530,16 +1437,8 @@ impl SyncManager { "SyncManager starting" ); - // 1. Take action channel receiver (created in new()) - sender is shared with write policy - let mut action_rx = self - .action_rx - .take() - .expect("action_rx should be set in new()"); - // Get a clone of action_tx for the self-subscriber - let action_tx_for_subscriber = self - .action_tx - .clone() - .expect("action_tx should be set in new()"); + // 1. Create action channel for self-subscriber -> manager communication + let (action_tx, mut action_rx) = mpsc::channel::(100); // 2. Create disconnect channel for spawned tasks -> manager communication let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); @@ -1558,7 +1457,7 @@ impl SyncManager { format!("ws://{}", self.config.bind_address), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), - action_tx_for_subscriber, + action_tx, ); let subscriber_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); -- cgit v1.2.3