diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 17:12:04 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 17:12:04 +0000 |
| commit | 806936e7d1aab5dfd0c2ad6b98a115122dc1785c (patch) | |
| tree | 38e9ebaacbf03d519c4713b1c961fa8708e2b8a6 /src/sync | |
| parent | c7a3eaf2898236b85790dd34213facbbdc9900d9 (diff) | |
fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery
StateOnly repos in a pending batch had their repo IDs included in the
negentropy REQ+EOSE fallback, which called build_layer2_and_layer3_filters.
This generated #a/#A/#q tag filters for repos whose announcements were
still in purgatory (not yet promoted to the database).
When the remote relay responded with PR events matching those filters,
the write policy correctly rejected them as 'orphan' (no accepted repo
in DB yet). However, nostr-sdk's client-level deduplication then silently
dropped the same event on all subsequent deliveries, making it permanently
unavailable even after the announcement was promoted.
Fix: split batch_repos into full vs state-only by consulting repo_sync_index
at fallback time, then call build_sync_level_aware_filters which only
generates #a/#A/#q filters for Full repos. StateOnly repos only get
the kind 30618 + #d filter they were originally subscribed with.
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 115 |
1 files changed, 108 insertions, 7 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..6ab8d33 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -557,6 +557,13 @@ pub struct SyncManager { | |||
| 557 | /// Purgatory for read-only access to events awaiting git data | 557 | /// Purgatory for read-only access to events awaiting git data |
| 558 | purgatory: Arc<crate::purgatory::Purgatory>, | 558 | purgatory: Arc<crate::purgatory::Purgatory>, |
| 559 | /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) | 559 | /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) |
| 560 | // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters | ||
| 561 | // actions when user-submitted purgatory announcements need to trigger relay discovery. | ||
| 562 | /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) | ||
| 563 | #[allow(dead_code)] | ||
| 564 | action_tx: Option<tokio::sync::mpsc::Sender<AddFilters>>, | ||
| 565 | /// Receiver for AddFilters actions (taken by run() when the event loop starts) | ||
| 566 | action_rx: Option<tokio::sync::mpsc::Receiver<AddFilters>>, | ||
| 560 | local_relay: LocalRelay, | 567 | local_relay: LocalRelay, |
| 561 | /// Configuration reference for sync settings | 568 | /// Configuration reference for sync settings |
| 562 | config: Config, | 569 | config: Config, |
| @@ -643,6 +650,11 @@ impl SyncManager { | |||
| 643 | } | 650 | } |
| 644 | } | 651 | } |
| 645 | 652 | ||
| 653 | // Create action channel upfront so callers (e.g. write policy) can send AddFilters | ||
| 654 | // actions before run() is called (e.g. when user-submitted purgatory announcements | ||
| 655 | // need to trigger relay discovery). | ||
| 656 | let (action_tx, action_rx) = tokio::sync::mpsc::channel::<AddFilters>(100); | ||
| 657 | |||
| 646 | Self { | 658 | Self { |
| 647 | bootstrap_relay_url, | 659 | bootstrap_relay_url, |
| 648 | service_domain, | 660 | service_domain, |
| @@ -663,9 +675,22 @@ impl SyncManager { | |||
| 663 | connect_tx: None, | 675 | connect_tx: None, |
| 664 | shutdown_tx: None, | 676 | shutdown_tx: None, |
| 665 | metrics: sync_metrics, | 677 | metrics: sync_metrics, |
| 678 | action_tx: Some(action_tx), | ||
| 679 | action_rx: Some(action_rx), | ||
| 666 | } | 680 | } |
| 667 | } | 681 | } |
| 668 | 682 | ||
| 683 | /// Get a clone of the action sender for external use. | ||
| 684 | /// | ||
| 685 | /// This allows the write policy to send AddFilters actions to the SyncManager | ||
| 686 | /// when user-submitted purgatory announcements need to trigger relay discovery. | ||
| 687 | /// | ||
| 688 | /// # Returns | ||
| 689 | /// Clone of the action sender, or None if the channel was never created. | ||
| 690 | pub fn action_tx(&self) -> Option<tokio::sync::mpsc::Sender<AddFilters>> { | ||
| 691 | self.action_tx.clone() | ||
| 692 | } | ||
| 693 | |||
| 669 | /// Generate a unique batch ID | 694 | /// Generate a unique batch ID |
| 670 | /// | 695 | /// |
| 671 | /// Increments the internal counter and returns the new value. | 696 | /// Increments the internal counter and returns the new value. |
| @@ -686,6 +711,17 @@ impl SyncManager { | |||
| 686 | self.rejected_events_index.clone() | 711 | self.rejected_events_index.clone() |
| 687 | } | 712 | } |
| 688 | 713 | ||
| 714 | /// Get a clone of the repo sync index. | ||
| 715 | /// | ||
| 716 | /// This allows access to the repo sync index for upgrading sync levels | ||
| 717 | /// when announcements are promoted from purgatory. | ||
| 718 | /// | ||
| 719 | /// # Returns | ||
| 720 | /// Clone of the repo sync index (Arc<RwLock<HashMap>>) | ||
| 721 | pub fn repo_sync_index(&self) -> RepoSyncIndex { | ||
| 722 | self.repo_sync_index.clone() | ||
| 723 | } | ||
| 724 | |||
| 689 | /// Save rejected events index to disk. | 725 | /// Save rejected events index to disk. |
| 690 | /// | 726 | /// |
| 691 | /// This is called during shutdown to persist the rejected events cache, | 727 | /// This is called during shutdown to persist the rejected events cache, |
| @@ -949,11 +985,31 @@ impl SyncManager { | |||
| 949 | // Drop the lock before async operations | 985 | // Drop the lock before async operations |
| 950 | drop(pending); | 986 | drop(pending); |
| 951 | 987 | ||
| 952 | // Create REQ+EOSE subscriptions using original semantic filters | 988 | // Create REQ+EOSE subscriptions using sync-level-aware filters. |
| 953 | // This queries by kind/author/tags instead of by ID, which may | 989 | // This queries by kind/author/tags instead of by ID, which may |
| 954 | // succeed even when ID-based queries fail | 990 | // succeed even when ID-based queries fail. |
| 955 | let fallback_filters = filters::build_layer2_and_layer3_filters( | 991 | // |
| 956 | &batch_repos, | 992 | // CRITICAL: Use build_sync_level_aware_filters to avoid generating |
| 993 | // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements | ||
| 994 | // are still in purgatory. If we send Layer 2 filters too early, the | ||
| 995 | // remote relay may return PR events that our write policy rejects as | ||
| 996 | // "orphan" (no promoted repo). nostr-sdk deduplication then silently | ||
| 997 | // drops the event on retry, making it permanently unavailable. | ||
| 998 | let (full_repos, state_only_repos) = { | ||
| 999 | let index = self.repo_sync_index.read().await; | ||
| 1000 | let mut full = HashSet::new(); | ||
| 1001 | let mut state_only = HashSet::new(); | ||
| 1002 | for repo_id in &batch_repos { | ||
| 1003 | match index.get(repo_id).map(|n| n.sync_level) { | ||
| 1004 | Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } | ||
| 1005 | _ => { full.insert(repo_id.clone()); } | ||
| 1006 | } | ||
| 1007 | } | ||
| 1008 | (full, state_only) | ||
| 1009 | }; | ||
| 1010 | let fallback_filters = filters::build_sync_level_aware_filters( | ||
| 1011 | &full_repos, | ||
| 1012 | &state_only_repos, | ||
| 957 | &batch_root_events, | 1013 | &batch_root_events, |
| 958 | None, | 1014 | None, |
| 959 | ); | 1015 | ); |
| @@ -1037,8 +1093,20 @@ impl SyncManager { | |||
| 1037 | pending.remove(&relay_url_for_fallback); | 1093 | pending.remove(&relay_url_for_fallback); |
| 1038 | } | 1094 | } |
| 1039 | drop(pending); | 1095 | drop(pending); |
| 1096 | let is_generic_filter = completed_batch.items.repos.is_empty() | ||
| 1097 | && completed_batch.items.root_events.is_empty(); | ||
| 1040 | self.confirm_batch(&relay_url_for_fallback, completed_batch) | 1098 | self.confirm_batch(&relay_url_for_fallback, completed_batch) |
| 1041 | .await; | 1099 | .await; |
| 1100 | |||
| 1101 | // Trigger filter recomputation for generic filter batches | ||
| 1102 | if is_generic_filter { | ||
| 1103 | tracing::info!( | ||
| 1104 | relay = %relay_url_for_fallback, | ||
| 1105 | "Announcement batch complete (fallback path) - triggering filter recomputation" | ||
| 1106 | ); | ||
| 1107 | self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) | ||
| 1108 | .await; | ||
| 1109 | } | ||
| 1042 | } | 1110 | } |
| 1043 | } | 1111 | } |
| 1044 | return; | 1112 | return; |
| @@ -1136,8 +1204,20 @@ impl SyncManager { | |||
| 1136 | pending.remove(&relay_url_for_retry); | 1204 | pending.remove(&relay_url_for_retry); |
| 1137 | } | 1205 | } |
| 1138 | drop(pending); | 1206 | drop(pending); |
| 1207 | let is_generic_filter = completed_batch.items.repos.is_empty() | ||
| 1208 | && completed_batch.items.root_events.is_empty(); | ||
| 1139 | self.confirm_batch(&relay_url_for_retry, completed_batch) | 1209 | self.confirm_batch(&relay_url_for_retry, completed_batch) |
| 1140 | .await; | 1210 | .await; |
| 1211 | |||
| 1212 | // Trigger filter recomputation for generic filter batches | ||
| 1213 | if is_generic_filter { | ||
| 1214 | tracing::info!( | ||
| 1215 | relay = %relay_url_for_retry, | ||
| 1216 | "Announcement batch complete (retry path) - triggering filter recomputation" | ||
| 1217 | ); | ||
| 1218 | self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) | ||
| 1219 | .await; | ||
| 1220 | } | ||
| 1141 | } | 1221 | } |
| 1142 | } | 1222 | } |
| 1143 | return; | 1223 | return; |
| @@ -1158,7 +1238,20 @@ impl SyncManager { | |||
| 1158 | drop(pending); | 1238 | drop(pending); |
| 1159 | 1239 | ||
| 1160 | // 4. Confirm the batch (moves items to RelayState) | 1240 | // 4. Confirm the batch (moves items to RelayState) |
| 1241 | let is_generic_filter = | ||
| 1242 | completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); | ||
| 1161 | self.confirm_batch(relay_url, completed_batch).await; | 1243 | self.confirm_batch(relay_url, completed_batch).await; |
| 1244 | |||
| 1245 | // 5. For generic filter batches (announcements), trigger filter recomputation | ||
| 1246 | // to subscribe to state events for purgatory announcements that were registered | ||
| 1247 | // during event processing. | ||
| 1248 | if is_generic_filter { | ||
| 1249 | tracing::info!( | ||
| 1250 | relay = %relay_url, | ||
| 1251 | "Announcement batch complete - triggering filter recomputation for purgatory repos" | ||
| 1252 | ); | ||
| 1253 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1254 | } | ||
| 1162 | } | 1255 | } |
| 1163 | 1256 | ||
| 1164 | /// Confirm a completed batch by moving items to RelayState | 1257 | /// Confirm a completed batch by moving items to RelayState |
| @@ -1437,8 +1530,16 @@ impl SyncManager { | |||
| 1437 | "SyncManager starting" | 1530 | "SyncManager starting" |
| 1438 | ); | 1531 | ); |
| 1439 | 1532 | ||
| 1440 | // 1. Create action channel for self-subscriber -> manager communication | 1533 | // 1. Take action channel receiver (created in new()) - sender is shared with write policy |
| 1441 | let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100); | 1534 | let mut action_rx = self |
| 1535 | .action_rx | ||
| 1536 | .take() | ||
| 1537 | .expect("action_rx should be set in new()"); | ||
| 1538 | // Get a clone of action_tx for the self-subscriber | ||
| 1539 | let action_tx_for_subscriber = self | ||
| 1540 | .action_tx | ||
| 1541 | .clone() | ||
| 1542 | .expect("action_tx should be set in new()"); | ||
| 1442 | 1543 | ||
| 1443 | // 2. Create disconnect channel for spawned tasks -> manager communication | 1544 | // 2. Create disconnect channel for spawned tasks -> manager communication |
| 1444 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); | 1545 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); |
| @@ -1457,7 +1558,7 @@ impl SyncManager { | |||
| 1457 | format!("ws://{}", self.config.bind_address), | 1558 | format!("ws://{}", self.config.bind_address), |
| 1458 | self.service_domain.clone(), | 1559 | self.service_domain.clone(), |
| 1459 | Arc::clone(&self.repo_sync_index), | 1560 | Arc::clone(&self.repo_sync_index), |
| 1460 | action_tx, | 1561 | action_tx_for_subscriber, |
| 1461 | ); | 1562 | ); |
| 1462 | let subscriber_shutdown = shutdown_tx.subscribe(); | 1563 | let subscriber_shutdown = shutdown_tx.subscribe(); |
| 1463 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); | 1564 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); |