diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 19:28:44 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 19:28:44 +0000 |
| commit | a804164468d3beafb243ece12555b4d1692a075d (patch) | |
| tree | e3efa03579672adc6ea5ca83c47f429f340ac5ef /src/sync/mod.rs | |
| parent | 3d9359d5ac0045fb93fd8732160e0de8413d6881 (diff) | |
Revert "fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery"
This reverts commit 806936e7d1aab5dfd0c2ad6b98a115122dc1785c.
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 115 |
1 files changed, 7 insertions, 108 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6ab8d33..519017b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -557,13 +557,6 @@ pub struct SyncManager { | |||
| 557 | /// Purgatory for read-only access to events awaiting git data | 557 | /// Purgatory for read-only access to events awaiting git data |
| 558 | purgatory: Arc<crate::purgatory::Purgatory>, | 558 | purgatory: Arc<crate::purgatory::Purgatory>, |
| 559 | /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) | 559 | /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) |
| 560 | // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters | ||
| 561 | // actions when user-submitted purgatory announcements need to trigger relay discovery. | ||
| 562 | /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) | ||
| 563 | #[allow(dead_code)] | ||
| 564 | action_tx: Option<tokio::sync::mpsc::Sender<AddFilters>>, | ||
| 565 | /// Receiver for AddFilters actions (taken by run() when the event loop starts) | ||
| 566 | action_rx: Option<tokio::sync::mpsc::Receiver<AddFilters>>, | ||
| 567 | local_relay: LocalRelay, | 560 | local_relay: LocalRelay, |
| 568 | /// Configuration reference for sync settings | 561 | /// Configuration reference for sync settings |
| 569 | config: Config, | 562 | config: Config, |
| @@ -650,11 +643,6 @@ impl SyncManager { | |||
| 650 | } | 643 | } |
| 651 | } | 644 | } |
| 652 | 645 | ||
| 653 | // Create action channel upfront so callers (e.g. write policy) can send AddFilters | ||
| 654 | // actions before run() is called (e.g. when user-submitted purgatory announcements | ||
| 655 | // need to trigger relay discovery). | ||
| 656 | let (action_tx, action_rx) = tokio::sync::mpsc::channel::<AddFilters>(100); | ||
| 657 | |||
| 658 | Self { | 646 | Self { |
| 659 | bootstrap_relay_url, | 647 | bootstrap_relay_url, |
| 660 | service_domain, | 648 | service_domain, |
| @@ -675,22 +663,9 @@ impl SyncManager { | |||
| 675 | connect_tx: None, | 663 | connect_tx: None, |
| 676 | shutdown_tx: None, | 664 | shutdown_tx: None, |
| 677 | metrics: sync_metrics, | 665 | metrics: sync_metrics, |
| 678 | action_tx: Some(action_tx), | ||
| 679 | action_rx: Some(action_rx), | ||
| 680 | } | 666 | } |
| 681 | } | 667 | } |
| 682 | 668 | ||
| 683 | /// Get a clone of the action sender for external use. | ||
| 684 | /// | ||
| 685 | /// This allows the write policy to send AddFilters actions to the SyncManager | ||
| 686 | /// when user-submitted purgatory announcements need to trigger relay discovery. | ||
| 687 | /// | ||
| 688 | /// # Returns | ||
| 689 | /// Clone of the action sender, or None if the channel was never created. | ||
| 690 | pub fn action_tx(&self) -> Option<tokio::sync::mpsc::Sender<AddFilters>> { | ||
| 691 | self.action_tx.clone() | ||
| 692 | } | ||
| 693 | |||
| 694 | /// Generate a unique batch ID | 669 | /// Generate a unique batch ID |
| 695 | /// | 670 | /// |
| 696 | /// Increments the internal counter and returns the new value. | 671 | /// Increments the internal counter and returns the new value. |
| @@ -711,17 +686,6 @@ impl SyncManager { | |||
| 711 | self.rejected_events_index.clone() | 686 | self.rejected_events_index.clone() |
| 712 | } | 687 | } |
| 713 | 688 | ||
| 714 | /// Get a clone of the repo sync index. | ||
| 715 | /// | ||
| 716 | /// This allows access to the repo sync index for upgrading sync levels | ||
| 717 | /// when announcements are promoted from purgatory. | ||
| 718 | /// | ||
| 719 | /// # Returns | ||
| 720 | /// Clone of the repo sync index (Arc<RwLock<HashMap>>) | ||
| 721 | pub fn repo_sync_index(&self) -> RepoSyncIndex { | ||
| 722 | self.repo_sync_index.clone() | ||
| 723 | } | ||
| 724 | |||
| 725 | /// Save rejected events index to disk. | 689 | /// Save rejected events index to disk. |
| 726 | /// | 690 | /// |
| 727 | /// This is called during shutdown to persist the rejected events cache, | 691 | /// This is called during shutdown to persist the rejected events cache, |
| @@ -985,31 +949,11 @@ impl SyncManager { | |||
| 985 | // Drop the lock before async operations | 949 | // Drop the lock before async operations |
| 986 | drop(pending); | 950 | drop(pending); |
| 987 | 951 | ||
| 988 | // Create REQ+EOSE subscriptions using sync-level-aware filters. | 952 | // Create REQ+EOSE subscriptions using original semantic filters |
| 989 | // This queries by kind/author/tags instead of by ID, which may | 953 | // This queries by kind/author/tags instead of by ID, which may |
| 990 | // succeed even when ID-based queries fail. | 954 | // succeed even when ID-based queries fail |
| 991 | // | 955 | let fallback_filters = filters::build_layer2_and_layer3_filters( |
| 992 | // CRITICAL: Use build_sync_level_aware_filters to avoid generating | 956 | &batch_repos, |
| 993 | // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements | ||
| 994 | // are still in purgatory. If we send Layer 2 filters too early, the | ||
| 995 | // remote relay may return PR events that our write policy rejects as | ||
| 996 | // "orphan" (no promoted repo). nostr-sdk deduplication then silently | ||
| 997 | // drops the event on retry, making it permanently unavailable. | ||
| 998 | let (full_repos, state_only_repos) = { | ||
| 999 | let index = self.repo_sync_index.read().await; | ||
| 1000 | let mut full = HashSet::new(); | ||
| 1001 | let mut state_only = HashSet::new(); | ||
| 1002 | for repo_id in &batch_repos { | ||
| 1003 | match index.get(repo_id).map(|n| n.sync_level) { | ||
| 1004 | Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } | ||
| 1005 | _ => { full.insert(repo_id.clone()); } | ||
| 1006 | } | ||
| 1007 | } | ||
| 1008 | (full, state_only) | ||
| 1009 | }; | ||
| 1010 | let fallback_filters = filters::build_sync_level_aware_filters( | ||
| 1011 | &full_repos, | ||
| 1012 | &state_only_repos, | ||
| 1013 | &batch_root_events, | 957 | &batch_root_events, |
| 1014 | None, | 958 | None, |
| 1015 | ); | 959 | ); |
| @@ -1093,20 +1037,8 @@ impl SyncManager { | |||
| 1093 | pending.remove(&relay_url_for_fallback); | 1037 | pending.remove(&relay_url_for_fallback); |
| 1094 | } | 1038 | } |
| 1095 | drop(pending); | 1039 | drop(pending); |
| 1096 | let is_generic_filter = completed_batch.items.repos.is_empty() | ||
| 1097 | && completed_batch.items.root_events.is_empty(); | ||
| 1098 | self.confirm_batch(&relay_url_for_fallback, completed_batch) | 1040 | self.confirm_batch(&relay_url_for_fallback, completed_batch) |
| 1099 | .await; | 1041 | .await; |
| 1100 | |||
| 1101 | // Trigger filter recomputation for generic filter batches | ||
| 1102 | if is_generic_filter { | ||
| 1103 | tracing::info!( | ||
| 1104 | relay = %relay_url_for_fallback, | ||
| 1105 | "Announcement batch complete (fallback path) - triggering filter recomputation" | ||
| 1106 | ); | ||
| 1107 | self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) | ||
| 1108 | .await; | ||
| 1109 | } | ||
| 1110 | } | 1042 | } |
| 1111 | } | 1043 | } |
| 1112 | return; | 1044 | return; |
| @@ -1204,20 +1136,8 @@ impl SyncManager { | |||
| 1204 | pending.remove(&relay_url_for_retry); | 1136 | pending.remove(&relay_url_for_retry); |
| 1205 | } | 1137 | } |
| 1206 | drop(pending); | 1138 | drop(pending); |
| 1207 | let is_generic_filter = completed_batch.items.repos.is_empty() | ||
| 1208 | && completed_batch.items.root_events.is_empty(); | ||
| 1209 | self.confirm_batch(&relay_url_for_retry, completed_batch) | 1139 | self.confirm_batch(&relay_url_for_retry, completed_batch) |
| 1210 | .await; | 1140 | .await; |
| 1211 | |||
| 1212 | // Trigger filter recomputation for generic filter batches | ||
| 1213 | if is_generic_filter { | ||
| 1214 | tracing::info!( | ||
| 1215 | relay = %relay_url_for_retry, | ||
| 1216 | "Announcement batch complete (retry path) - triggering filter recomputation" | ||
| 1217 | ); | ||
| 1218 | self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) | ||
| 1219 | .await; | ||
| 1220 | } | ||
| 1221 | } | 1141 | } |
| 1222 | } | 1142 | } |
| 1223 | return; | 1143 | return; |
| @@ -1238,20 +1158,7 @@ impl SyncManager { | |||
| 1238 | drop(pending); | 1158 | drop(pending); |
| 1239 | 1159 | ||
| 1240 | // 4. Confirm the batch (moves items to RelayState) | 1160 | // 4. Confirm the batch (moves items to RelayState) |
| 1241 | let is_generic_filter = | ||
| 1242 | completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); | ||
| 1243 | self.confirm_batch(relay_url, completed_batch).await; | 1161 | self.confirm_batch(relay_url, completed_batch).await; |
| 1244 | |||
| 1245 | // 5. For generic filter batches (announcements), trigger filter recomputation | ||
| 1246 | // to subscribe to state events for purgatory announcements that were registered | ||
| 1247 | // during event processing. | ||
| 1248 | if is_generic_filter { | ||
| 1249 | tracing::info!( | ||
| 1250 | relay = %relay_url, | ||
| 1251 | "Announcement batch complete - triggering filter recomputation for purgatory repos" | ||
| 1252 | ); | ||
| 1253 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1254 | } | ||
| 1255 | } | 1162 | } |
| 1256 | 1163 | ||
| 1257 | /// Confirm a completed batch by moving items to RelayState | 1164 | /// Confirm a completed batch by moving items to RelayState |
| @@ -1530,16 +1437,8 @@ impl SyncManager { | |||
| 1530 | "SyncManager starting" | 1437 | "SyncManager starting" |
| 1531 | ); | 1438 | ); |
| 1532 | 1439 | ||
| 1533 | // 1. Take action channel receiver (created in new()) - sender is shared with write policy | 1440 | // 1. Create action channel for self-subscriber -> manager communication |
| 1534 | let mut action_rx = self | 1441 | let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100); |
| 1535 | .action_rx | ||
| 1536 | .take() | ||
| 1537 | .expect("action_rx should be set in new()"); | ||
| 1538 | // Get a clone of action_tx for the self-subscriber | ||
| 1539 | let action_tx_for_subscriber = self | ||
| 1540 | .action_tx | ||
| 1541 | .clone() | ||
| 1542 | .expect("action_tx should be set in new()"); | ||
| 1543 | 1442 | ||
| 1544 | // 2. Create disconnect channel for spawned tasks -> manager communication | 1443 | // 2. Create disconnect channel for spawned tasks -> manager communication |
| 1545 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); | 1444 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); |
| @@ -1558,7 +1457,7 @@ impl SyncManager { | |||
| 1558 | format!("ws://{}", self.config.bind_address), | 1457 | format!("ws://{}", self.config.bind_address), |
| 1559 | self.service_domain.clone(), | 1458 | self.service_domain.clone(), |
| 1560 | Arc::clone(&self.repo_sync_index), | 1459 | Arc::clone(&self.repo_sync_index), |
| 1561 | action_tx_for_subscriber, | 1460 | action_tx, |
| 1562 | ); | 1461 | ); |
| 1563 | let subscriber_shutdown = shutdown_tx.subscribe(); | 1462 | let subscriber_shutdown = shutdown_tx.subscribe(); |
| 1564 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); | 1463 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); |