upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-18 17:12:04 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-18 17:12:04 +0000
commit806936e7d1aab5dfd0c2ad6b98a115122dc1785c (patch)
tree38e9ebaacbf03d519c4713b1c961fa8708e2b8a6 /src/sync
parentc7a3eaf2898236b85790dd34213facbbdc9900d9 (diff)
fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery
StateOnly repos in a pending batch had their repo IDs included in the negentropy REQ+EOSE fallback, which called build_layer2_and_layer3_filters. This generated #a/#A/#q tag filters for repos whose announcements were still in purgatory (not yet promoted to the database). When the remote relay responded with PR events matching those filters, the write policy correctly rejected them as 'orphan' (no accepted repo in DB yet). However, nostr-sdk's client-level deduplication then silently dropped the same event on all subsequent deliveries, making it permanently unavailable even after the announcement was promoted. Fix: split batch_repos into full vs state-only by consulting repo_sync_index at fallback time, then call build_sync_level_aware_filters which only generates #a/#A/#q filters for Full repos. StateOnly repos only get the kind 30618 + #d filter they were originally subscribed with.
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs115
1 files changed, 108 insertions, 7 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 519017b..6ab8d33 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -557,6 +557,13 @@ pub struct SyncManager {
557 /// Purgatory for read-only access to events awaiting git data 557 /// Purgatory for read-only access to events awaiting git data
558 purgatory: Arc<crate::purgatory::Purgatory>, 558 purgatory: Arc<crate::purgatory::Purgatory>,
559 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) 559 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers)
560 // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters
561 // actions when user-submitted purgatory announcements need to trigger relay discovery.
562 /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called)
563 #[allow(dead_code)]
564 action_tx: Option<tokio::sync::mpsc::Sender<AddFilters>>,
565 /// Receiver for AddFilters actions (taken by run() when the event loop starts)
566 action_rx: Option<tokio::sync::mpsc::Receiver<AddFilters>>,
560 local_relay: LocalRelay, 567 local_relay: LocalRelay,
561 /// Configuration reference for sync settings 568 /// Configuration reference for sync settings
562 config: Config, 569 config: Config,
@@ -643,6 +650,11 @@ impl SyncManager {
643 } 650 }
644 } 651 }
645 652
653 // Create action channel upfront so callers (e.g. write policy) can send AddFilters
654 // actions before run() is called (e.g. when user-submitted purgatory announcements
655 // need to trigger relay discovery).
656 let (action_tx, action_rx) = tokio::sync::mpsc::channel::<AddFilters>(100);
657
646 Self { 658 Self {
647 bootstrap_relay_url, 659 bootstrap_relay_url,
648 service_domain, 660 service_domain,
@@ -663,9 +675,22 @@ impl SyncManager {
663 connect_tx: None, 675 connect_tx: None,
664 shutdown_tx: None, 676 shutdown_tx: None,
665 metrics: sync_metrics, 677 metrics: sync_metrics,
678 action_tx: Some(action_tx),
679 action_rx: Some(action_rx),
666 } 680 }
667 } 681 }
668 682
683 /// Get a clone of the action sender for external use.
684 ///
685 /// This allows the write policy to send AddFilters actions to the SyncManager
686 /// when user-submitted purgatory announcements need to trigger relay discovery.
687 ///
688 /// # Returns
689 /// Clone of the action sender, or None if the channel was never created.
690 pub fn action_tx(&self) -> Option<tokio::sync::mpsc::Sender<AddFilters>> {
691 self.action_tx.clone()
692 }
693
669 /// Generate a unique batch ID 694 /// Generate a unique batch ID
670 /// 695 ///
671 /// Increments the internal counter and returns the new value. 696 /// Increments the internal counter and returns the new value.
@@ -686,6 +711,17 @@ impl SyncManager {
686 self.rejected_events_index.clone() 711 self.rejected_events_index.clone()
687 } 712 }
688 713
714 /// Get a clone of the repo sync index.
715 ///
716 /// This allows access to the repo sync index for upgrading sync levels
717 /// when announcements are promoted from purgatory.
718 ///
719 /// # Returns
720 /// Clone of the repo sync index (Arc<RwLock<HashMap>>)
721 pub fn repo_sync_index(&self) -> RepoSyncIndex {
722 self.repo_sync_index.clone()
723 }
724
689 /// Save rejected events index to disk. 725 /// Save rejected events index to disk.
690 /// 726 ///
691 /// This is called during shutdown to persist the rejected events cache, 727 /// This is called during shutdown to persist the rejected events cache,
@@ -949,11 +985,31 @@ impl SyncManager {
949 // Drop the lock before async operations 985 // Drop the lock before async operations
950 drop(pending); 986 drop(pending);
951 987
952 // Create REQ+EOSE subscriptions using original semantic filters 988 // Create REQ+EOSE subscriptions using sync-level-aware filters.
953 // This queries by kind/author/tags instead of by ID, which may 989 // This queries by kind/author/tags instead of by ID, which may
954 // succeed even when ID-based queries fail 990 // succeed even when ID-based queries fail.
955 let fallback_filters = filters::build_layer2_and_layer3_filters( 991 //
956 &batch_repos, 992 // CRITICAL: Use build_sync_level_aware_filters to avoid generating
993 // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements
994 // are still in purgatory. If we send Layer 2 filters too early, the
995 // remote relay may return PR events that our write policy rejects as
996 // "orphan" (no promoted repo). nostr-sdk deduplication then silently
997 // drops the event on retry, making it permanently unavailable.
998 let (full_repos, state_only_repos) = {
999 let index = self.repo_sync_index.read().await;
1000 let mut full = HashSet::new();
1001 let mut state_only = HashSet::new();
1002 for repo_id in &batch_repos {
1003 match index.get(repo_id).map(|n| n.sync_level) {
1004 Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); }
1005 _ => { full.insert(repo_id.clone()); }
1006 }
1007 }
1008 (full, state_only)
1009 };
1010 let fallback_filters = filters::build_sync_level_aware_filters(
1011 &full_repos,
1012 &state_only_repos,
957 &batch_root_events, 1013 &batch_root_events,
958 None, 1014 None,
959 ); 1015 );
@@ -1037,8 +1093,20 @@ impl SyncManager {
1037 pending.remove(&relay_url_for_fallback); 1093 pending.remove(&relay_url_for_fallback);
1038 } 1094 }
1039 drop(pending); 1095 drop(pending);
1096 let is_generic_filter = completed_batch.items.repos.is_empty()
1097 && completed_batch.items.root_events.is_empty();
1040 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1098 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1041 .await; 1099 .await;
1100
1101 // Trigger filter recomputation for generic filter batches
1102 if is_generic_filter {
1103 tracing::info!(
1104 relay = %relay_url_for_fallback,
1105 "Announcement batch complete (fallback path) - triggering filter recomputation"
1106 );
1107 self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback)
1108 .await;
1109 }
1042 } 1110 }
1043 } 1111 }
1044 return; 1112 return;
@@ -1136,8 +1204,20 @@ impl SyncManager {
1136 pending.remove(&relay_url_for_retry); 1204 pending.remove(&relay_url_for_retry);
1137 } 1205 }
1138 drop(pending); 1206 drop(pending);
1207 let is_generic_filter = completed_batch.items.repos.is_empty()
1208 && completed_batch.items.root_events.is_empty();
1139 self.confirm_batch(&relay_url_for_retry, completed_batch) 1209 self.confirm_batch(&relay_url_for_retry, completed_batch)
1140 .await; 1210 .await;
1211
1212 // Trigger filter recomputation for generic filter batches
1213 if is_generic_filter {
1214 tracing::info!(
1215 relay = %relay_url_for_retry,
1216 "Announcement batch complete (retry path) - triggering filter recomputation"
1217 );
1218 self.recompute_new_sync_filters_for_relay(&relay_url_for_retry)
1219 .await;
1220 }
1141 } 1221 }
1142 } 1222 }
1143 return; 1223 return;
@@ -1158,7 +1238,20 @@ impl SyncManager {
1158 drop(pending); 1238 drop(pending);
1159 1239
1160 // 4. Confirm the batch (moves items to RelayState) 1240 // 4. Confirm the batch (moves items to RelayState)
1241 let is_generic_filter =
1242 completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty();
1161 self.confirm_batch(relay_url, completed_batch).await; 1243 self.confirm_batch(relay_url, completed_batch).await;
1244
1245 // 5. For generic filter batches (announcements), trigger filter recomputation
1246 // to subscribe to state events for purgatory announcements that were registered
1247 // during event processing.
1248 if is_generic_filter {
1249 tracing::info!(
1250 relay = %relay_url,
1251 "Announcement batch complete - triggering filter recomputation for purgatory repos"
1252 );
1253 self.recompute_new_sync_filters_for_relay(relay_url).await;
1254 }
1162 } 1255 }
1163 1256
1164 /// Confirm a completed batch by moving items to RelayState 1257 /// Confirm a completed batch by moving items to RelayState
@@ -1437,8 +1530,16 @@ impl SyncManager {
1437 "SyncManager starting" 1530 "SyncManager starting"
1438 ); 1531 );
1439 1532
1440 // 1. Create action channel for self-subscriber -> manager communication 1533 // 1. Take action channel receiver (created in new()) - sender is shared with write policy
1441 let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100); 1534 let mut action_rx = self
1535 .action_rx
1536 .take()
1537 .expect("action_rx should be set in new()");
1538 // Get a clone of action_tx for the self-subscriber
1539 let action_tx_for_subscriber = self
1540 .action_tx
1541 .clone()
1542 .expect("action_tx should be set in new()");
1442 1543
1443 // 2. Create disconnect channel for spawned tasks -> manager communication 1544 // 2. Create disconnect channel for spawned tasks -> manager communication
1444 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); 1545 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100);
@@ -1457,7 +1558,7 @@ impl SyncManager {
1457 format!("ws://{}", self.config.bind_address), 1558 format!("ws://{}", self.config.bind_address),
1458 self.service_domain.clone(), 1559 self.service_domain.clone(),
1459 Arc::clone(&self.repo_sync_index), 1560 Arc::clone(&self.repo_sync_index),
1460 action_tx, 1561 action_tx_for_subscriber,
1461 ); 1562 );
1462 let subscriber_shutdown = shutdown_tx.subscribe(); 1563 let subscriber_shutdown = shutdown_tx.subscribe();
1463 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); 1564 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });