upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/sync/mod.rs115
1 files changed, 108 insertions, 7 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 519017b..6ab8d33 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -557,6 +557,13 @@ pub struct SyncManager {
557 /// Purgatory for read-only access to events awaiting git data 557 /// Purgatory for read-only access to events awaiting git data
558 purgatory: Arc<crate::purgatory::Purgatory>, 558 purgatory: Arc<crate::purgatory::Purgatory>,
559 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) 559 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers)
560 // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters
561 // actions when user-submitted purgatory announcements need to trigger relay discovery.
562 /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called)
563 #[allow(dead_code)]
564 action_tx: Option<tokio::sync::mpsc::Sender<AddFilters>>,
565 /// Receiver for AddFilters actions (taken by run() when the event loop starts)
566 action_rx: Option<tokio::sync::mpsc::Receiver<AddFilters>>,
560 local_relay: LocalRelay, 567 local_relay: LocalRelay,
561 /// Configuration reference for sync settings 568 /// Configuration reference for sync settings
562 config: Config, 569 config: Config,
@@ -643,6 +650,11 @@ impl SyncManager {
643 } 650 }
644 } 651 }
645 652
653 // Create action channel upfront so callers (e.g. write policy) can send AddFilters
654 // actions before run() is called (e.g. when user-submitted purgatory announcements
655 // need to trigger relay discovery).
656 let (action_tx, action_rx) = tokio::sync::mpsc::channel::<AddFilters>(100);
657
646 Self { 658 Self {
647 bootstrap_relay_url, 659 bootstrap_relay_url,
648 service_domain, 660 service_domain,
@@ -663,9 +675,22 @@ impl SyncManager {
663 connect_tx: None, 675 connect_tx: None,
664 shutdown_tx: None, 676 shutdown_tx: None,
665 metrics: sync_metrics, 677 metrics: sync_metrics,
678 action_tx: Some(action_tx),
679 action_rx: Some(action_rx),
666 } 680 }
667 } 681 }
668 682
683 /// Get a clone of the action sender for external use.
684 ///
685 /// This allows the write policy to send AddFilters actions to the SyncManager
686 /// when user-submitted purgatory announcements need to trigger relay discovery.
687 ///
688 /// # Returns
689 /// Clone of the action sender, or None if the channel was never created.
690 pub fn action_tx(&self) -> Option<tokio::sync::mpsc::Sender<AddFilters>> {
691 self.action_tx.clone()
692 }
693
669 /// Generate a unique batch ID 694 /// Generate a unique batch ID
670 /// 695 ///
671 /// Increments the internal counter and returns the new value. 696 /// Increments the internal counter and returns the new value.
@@ -686,6 +711,17 @@ impl SyncManager {
686 self.rejected_events_index.clone() 711 self.rejected_events_index.clone()
687 } 712 }
688 713
714 /// Get a clone of the repo sync index.
715 ///
716 /// This allows access to the repo sync index for upgrading sync levels
717 /// when announcements are promoted from purgatory.
718 ///
719 /// # Returns
720 /// Clone of the repo sync index (Arc<RwLock<HashMap>>)
721 pub fn repo_sync_index(&self) -> RepoSyncIndex {
722 self.repo_sync_index.clone()
723 }
724
689 /// Save rejected events index to disk. 725 /// Save rejected events index to disk.
690 /// 726 ///
691 /// This is called during shutdown to persist the rejected events cache, 727 /// This is called during shutdown to persist the rejected events cache,
@@ -949,11 +985,31 @@ impl SyncManager {
949 // Drop the lock before async operations 985 // Drop the lock before async operations
950 drop(pending); 986 drop(pending);
951 987
952 // Create REQ+EOSE subscriptions using original semantic filters 988 // Create REQ+EOSE subscriptions using sync-level-aware filters.
953 // This queries by kind/author/tags instead of by ID, which may 989 // This queries by kind/author/tags instead of by ID, which may
954 // succeed even when ID-based queries fail 990 // succeed even when ID-based queries fail.
955 let fallback_filters = filters::build_layer2_and_layer3_filters( 991 //
956 &batch_repos, 992 // CRITICAL: Use build_sync_level_aware_filters to avoid generating
993 // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements
994 // are still in purgatory. If we send Layer 2 filters too early, the
995 // remote relay may return PR events that our write policy rejects as
996 // "orphan" (no promoted repo). nostr-sdk deduplication then silently
997 // drops the event on retry, making it permanently unavailable.
998 let (full_repos, state_only_repos) = {
999 let index = self.repo_sync_index.read().await;
1000 let mut full = HashSet::new();
1001 let mut state_only = HashSet::new();
1002 for repo_id in &batch_repos {
1003 match index.get(repo_id).map(|n| n.sync_level) {
1004 Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); }
1005 _ => { full.insert(repo_id.clone()); }
1006 }
1007 }
1008 (full, state_only)
1009 };
1010 let fallback_filters = filters::build_sync_level_aware_filters(
1011 &full_repos,
1012 &state_only_repos,
957 &batch_root_events, 1013 &batch_root_events,
958 None, 1014 None,
959 ); 1015 );
@@ -1037,8 +1093,20 @@ impl SyncManager {
1037 pending.remove(&relay_url_for_fallback); 1093 pending.remove(&relay_url_for_fallback);
1038 } 1094 }
1039 drop(pending); 1095 drop(pending);
1096 let is_generic_filter = completed_batch.items.repos.is_empty()
1097 && completed_batch.items.root_events.is_empty();
1040 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1098 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1041 .await; 1099 .await;
1100
1101 // Trigger filter recomputation for generic filter batches
1102 if is_generic_filter {
1103 tracing::info!(
1104 relay = %relay_url_for_fallback,
1105 "Announcement batch complete (fallback path) - triggering filter recomputation"
1106 );
1107 self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback)
1108 .await;
1109 }
1042 } 1110 }
1043 } 1111 }
1044 return; 1112 return;
@@ -1136,8 +1204,20 @@ impl SyncManager {
1136 pending.remove(&relay_url_for_retry); 1204 pending.remove(&relay_url_for_retry);
1137 } 1205 }
1138 drop(pending); 1206 drop(pending);
1207 let is_generic_filter = completed_batch.items.repos.is_empty()
1208 && completed_batch.items.root_events.is_empty();
1139 self.confirm_batch(&relay_url_for_retry, completed_batch) 1209 self.confirm_batch(&relay_url_for_retry, completed_batch)
1140 .await; 1210 .await;
1211
1212 // Trigger filter recomputation for generic filter batches
1213 if is_generic_filter {
1214 tracing::info!(
1215 relay = %relay_url_for_retry,
1216 "Announcement batch complete (retry path) - triggering filter recomputation"
1217 );
1218 self.recompute_new_sync_filters_for_relay(&relay_url_for_retry)
1219 .await;
1220 }
1141 } 1221 }
1142 } 1222 }
1143 return; 1223 return;
@@ -1158,7 +1238,20 @@ impl SyncManager {
1158 drop(pending); 1238 drop(pending);
1159 1239
1160 // 4. Confirm the batch (moves items to RelayState) 1240 // 4. Confirm the batch (moves items to RelayState)
1241 let is_generic_filter =
1242 completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty();
1161 self.confirm_batch(relay_url, completed_batch).await; 1243 self.confirm_batch(relay_url, completed_batch).await;
1244
1245 // 5. For generic filter batches (announcements), trigger filter recomputation
1246 // to subscribe to state events for purgatory announcements that were registered
1247 // during event processing.
1248 if is_generic_filter {
1249 tracing::info!(
1250 relay = %relay_url,
1251 "Announcement batch complete - triggering filter recomputation for purgatory repos"
1252 );
1253 self.recompute_new_sync_filters_for_relay(relay_url).await;
1254 }
1162 } 1255 }
1163 1256
1164 /// Confirm a completed batch by moving items to RelayState 1257 /// Confirm a completed batch by moving items to RelayState
@@ -1437,8 +1530,16 @@ impl SyncManager {
1437 "SyncManager starting" 1530 "SyncManager starting"
1438 ); 1531 );
1439 1532
1440 // 1. Create action channel for self-subscriber -> manager communication 1533 // 1. Take action channel receiver (created in new()) - sender is shared with write policy
1441 let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100); 1534 let mut action_rx = self
1535 .action_rx
1536 .take()
1537 .expect("action_rx should be set in new()");
1538 // Get a clone of action_tx for the self-subscriber
1539 let action_tx_for_subscriber = self
1540 .action_tx
1541 .clone()
1542 .expect("action_tx should be set in new()");
1442 1543
1443 // 2. Create disconnect channel for spawned tasks -> manager communication 1544 // 2. Create disconnect channel for spawned tasks -> manager communication
1444 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); 1545 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100);
@@ -1457,7 +1558,7 @@ impl SyncManager {
1457 format!("ws://{}", self.config.bind_address), 1558 format!("ws://{}", self.config.bind_address),
1458 self.service_domain.clone(), 1559 self.service_domain.clone(),
1459 Arc::clone(&self.repo_sync_index), 1560 Arc::clone(&self.repo_sync_index),
1460 action_tx, 1561 action_tx_for_subscriber,
1461 ); 1562 );
1462 let subscriber_shutdown = shutdown_tx.subscribe(); 1563 let subscriber_shutdown = shutdown_tx.subscribe();
1463 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); 1564 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });