From 806936e7d1aab5dfd0c2ad6b98a115122dc1785c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:04 +0000 Subject: fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery StateOnly repos in a pending batch had their repo IDs included in the negentropy REQ+EOSE fallback, which called build_layer2_and_layer3_filters. This generated #a/#A/#q tag filters for repos whose announcements were still in purgatory (not yet promoted to the database). When the remote relay responded with PR events matching those filters, the write policy correctly rejected them as 'orphan' (no accepted repo in DB yet). However, nostr-sdk's client-level deduplication then silently dropped the same event on all subsequent deliveries, making it permanently unavailable even after the announcement was promoted. Fix: split batch_repos into full vs state-only by consulting repo_sync_index at fallback time, then call build_sync_level_aware_filters which only generates #a/#A/#q filters for Full repos. StateOnly repos only get the kind 30618 + #d filter they were originally subscribed with. --- src/sync/mod.rs | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 108 insertions(+), 7 deletions(-) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..6ab8d33 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -557,6 +557,13 @@ pub struct SyncManager { /// Purgatory for read-only access to events awaiting git data purgatory: Arc, /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) + // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters + // actions when user-submitted purgatory announcements need to trigger relay discovery. + /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) + #[allow(dead_code)] + action_tx: Option>, + /// Receiver for AddFilters actions (taken by run() when the event loop starts) + action_rx: Option>, local_relay: LocalRelay, /// Configuration reference for sync settings config: Config, @@ -643,6 +650,11 @@ impl SyncManager { } } + // Create action channel upfront so callers (e.g. write policy) can send AddFilters + // actions before run() is called (e.g. when user-submitted purgatory announcements + // need to trigger relay discovery). + let (action_tx, action_rx) = tokio::sync::mpsc::channel::(100); + Self { bootstrap_relay_url, service_domain, @@ -663,9 +675,22 @@ impl SyncManager { connect_tx: None, shutdown_tx: None, metrics: sync_metrics, + action_tx: Some(action_tx), + action_rx: Some(action_rx), } } + /// Get a clone of the action sender for external use. + /// + /// This allows the write policy to send AddFilters actions to the SyncManager + /// when user-submitted purgatory announcements need to trigger relay discovery. + /// + /// # Returns + /// Clone of the action sender, or None if the channel was never created. + pub fn action_tx(&self) -> Option> { + self.action_tx.clone() + } + /// Generate a unique batch ID /// /// Increments the internal counter and returns the new value. @@ -686,6 +711,17 @@ impl SyncManager { self.rejected_events_index.clone() } + /// Get a clone of the repo sync index. + /// + /// This allows access to the repo sync index for upgrading sync levels + /// when announcements are promoted from purgatory. + /// + /// # Returns + /// Clone of the repo sync index (Arc>) + pub fn repo_sync_index(&self) -> RepoSyncIndex { + self.repo_sync_index.clone() + } + /// Save rejected events index to disk. /// /// This is called during shutdown to persist the rejected events cache, @@ -949,11 +985,31 @@ impl SyncManager { // Drop the lock before async operations drop(pending); - // Create REQ+EOSE subscriptions using original semantic filters + // Create REQ+EOSE subscriptions using sync-level-aware filters. // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail - let fallback_filters = filters::build_layer2_and_layer3_filters( - &batch_repos, + // succeed even when ID-based queries fail. + // + // CRITICAL: Use build_sync_level_aware_filters to avoid generating + // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements + // are still in purgatory. If we send Layer 2 filters too early, the + // remote relay may return PR events that our write policy rejects as + // "orphan" (no promoted repo). nostr-sdk deduplication then silently + // drops the event on retry, making it permanently unavailable. + let (full_repos, state_only_repos) = { + let index = self.repo_sync_index.read().await; + let mut full = HashSet::new(); + let mut state_only = HashSet::new(); + for repo_id in &batch_repos { + match index.get(repo_id).map(|n| n.sync_level) { + Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } + _ => { full.insert(repo_id.clone()); } + } + } + (full, state_only) + }; + let fallback_filters = filters::build_sync_level_aware_filters( + &full_repos, + &state_only_repos, &batch_root_events, None, ); @@ -1037,8 +1093,20 @@ impl SyncManager { pending.remove(&relay_url_for_fallback); } drop(pending); + let is_generic_filter = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; + + // Trigger filter recomputation for generic filter batches + if is_generic_filter { + tracing::info!( + relay = %relay_url_for_fallback, + "Announcement batch complete (fallback path) - triggering filter recomputation" + ); + self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) + .await; + } } } return; @@ -1136,8 +1204,20 @@ impl SyncManager { pending.remove(&relay_url_for_retry); } drop(pending); + let is_generic_filter = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; + + // Trigger filter recomputation for generic filter batches + if is_generic_filter { + tracing::info!( + relay = %relay_url_for_retry, + "Announcement batch complete (retry path) - triggering filter recomputation" + ); + self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) + .await; + } } } return; @@ -1158,7 +1238,20 @@ impl SyncManager { drop(pending); // 4. Confirm the batch (moves items to RelayState) + let is_generic_filter = + completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); self.confirm_batch(relay_url, completed_batch).await; + + // 5. For generic filter batches (announcements), trigger filter recomputation + // to subscribe to state events for purgatory announcements that were registered + // during event processing. + if is_generic_filter { + tracing::info!( + relay = %relay_url, + "Announcement batch complete - triggering filter recomputation for purgatory repos" + ); + self.recompute_new_sync_filters_for_relay(relay_url).await; + } } /// Confirm a completed batch by moving items to RelayState @@ -1437,8 +1530,16 @@ impl SyncManager { "SyncManager starting" ); - // 1. Create action channel for self-subscriber -> manager communication - let (action_tx, mut action_rx) = mpsc::channel::(100); + // 1. Take action channel receiver (created in new()) - sender is shared with write policy + let mut action_rx = self + .action_rx + .take() + .expect("action_rx should be set in new()"); + // Get a clone of action_tx for the self-subscriber + let action_tx_for_subscriber = self + .action_tx + .clone() + .expect("action_tx should be set in new()"); // 2. Create disconnect channel for spawned tasks -> manager communication let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); @@ -1457,7 +1558,7 @@ impl SyncManager { format!("ws://{}", self.config.bind_address), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), - action_tx, + action_tx_for_subscriber, ); let subscriber_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); -- cgit v1.2.3