upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs115
1 files changed, 7 insertions, 108 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 6ab8d33..519017b 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -557,13 +557,6 @@ pub struct SyncManager {
557 /// Purgatory for read-only access to events awaiting git data 557 /// Purgatory for read-only access to events awaiting git data
558 purgatory: Arc<crate::purgatory::Purgatory>, 558 purgatory: Arc<crate::purgatory::Purgatory>,
559 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) 559 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers)
560 // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters
561 // actions when user-submitted purgatory announcements need to trigger relay discovery.
562 /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called)
563 #[allow(dead_code)]
564 action_tx: Option<tokio::sync::mpsc::Sender<AddFilters>>,
565 /// Receiver for AddFilters actions (taken by run() when the event loop starts)
566 action_rx: Option<tokio::sync::mpsc::Receiver<AddFilters>>,
567 local_relay: LocalRelay, 560 local_relay: LocalRelay,
568 /// Configuration reference for sync settings 561 /// Configuration reference for sync settings
569 config: Config, 562 config: Config,
@@ -650,11 +643,6 @@ impl SyncManager {
650 } 643 }
651 } 644 }
652 645
653 // Create action channel upfront so callers (e.g. write policy) can send AddFilters
654 // actions before run() is called (e.g. when user-submitted purgatory announcements
655 // need to trigger relay discovery).
656 let (action_tx, action_rx) = tokio::sync::mpsc::channel::<AddFilters>(100);
657
658 Self { 646 Self {
659 bootstrap_relay_url, 647 bootstrap_relay_url,
660 service_domain, 648 service_domain,
@@ -675,22 +663,9 @@ impl SyncManager {
675 connect_tx: None, 663 connect_tx: None,
676 shutdown_tx: None, 664 shutdown_tx: None,
677 metrics: sync_metrics, 665 metrics: sync_metrics,
678 action_tx: Some(action_tx),
679 action_rx: Some(action_rx),
680 } 666 }
681 } 667 }
682 668
683 /// Get a clone of the action sender for external use.
684 ///
685 /// This allows the write policy to send AddFilters actions to the SyncManager
686 /// when user-submitted purgatory announcements need to trigger relay discovery.
687 ///
688 /// # Returns
689 /// Clone of the action sender, or None if the channel was never created.
690 pub fn action_tx(&self) -> Option<tokio::sync::mpsc::Sender<AddFilters>> {
691 self.action_tx.clone()
692 }
693
694 /// Generate a unique batch ID 669 /// Generate a unique batch ID
695 /// 670 ///
696 /// Increments the internal counter and returns the new value. 671 /// Increments the internal counter and returns the new value.
@@ -711,17 +686,6 @@ impl SyncManager {
711 self.rejected_events_index.clone() 686 self.rejected_events_index.clone()
712 } 687 }
713 688
714 /// Get a clone of the repo sync index.
715 ///
716 /// This allows access to the repo sync index for upgrading sync levels
717 /// when announcements are promoted from purgatory.
718 ///
719 /// # Returns
720 /// Clone of the repo sync index (Arc<RwLock<HashMap>>)
721 pub fn repo_sync_index(&self) -> RepoSyncIndex {
722 self.repo_sync_index.clone()
723 }
724
725 /// Save rejected events index to disk. 689 /// Save rejected events index to disk.
726 /// 690 ///
727 /// This is called during shutdown to persist the rejected events cache, 691 /// This is called during shutdown to persist the rejected events cache,
@@ -985,31 +949,11 @@ impl SyncManager {
985 // Drop the lock before async operations 949 // Drop the lock before async operations
986 drop(pending); 950 drop(pending);
987 951
988 // Create REQ+EOSE subscriptions using sync-level-aware filters. 952 // Create REQ+EOSE subscriptions using original semantic filters
989 // This queries by kind/author/tags instead of by ID, which may 953 // This queries by kind/author/tags instead of by ID, which may
990 // succeed even when ID-based queries fail. 954 // succeed even when ID-based queries fail
991 // 955 let fallback_filters = filters::build_layer2_and_layer3_filters(
992 // CRITICAL: Use build_sync_level_aware_filters to avoid generating 956 &batch_repos,
993 // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements
994 // are still in purgatory. If we send Layer 2 filters too early, the
995 // remote relay may return PR events that our write policy rejects as
996 // "orphan" (no promoted repo). nostr-sdk deduplication then silently
997 // drops the event on retry, making it permanently unavailable.
998 let (full_repos, state_only_repos) = {
999 let index = self.repo_sync_index.read().await;
1000 let mut full = HashSet::new();
1001 let mut state_only = HashSet::new();
1002 for repo_id in &batch_repos {
1003 match index.get(repo_id).map(|n| n.sync_level) {
1004 Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); }
1005 _ => { full.insert(repo_id.clone()); }
1006 }
1007 }
1008 (full, state_only)
1009 };
1010 let fallback_filters = filters::build_sync_level_aware_filters(
1011 &full_repos,
1012 &state_only_repos,
1013 &batch_root_events, 957 &batch_root_events,
1014 None, 958 None,
1015 ); 959 );
@@ -1093,20 +1037,8 @@ impl SyncManager {
1093 pending.remove(&relay_url_for_fallback); 1037 pending.remove(&relay_url_for_fallback);
1094 } 1038 }
1095 drop(pending); 1039 drop(pending);
1096 let is_generic_filter = completed_batch.items.repos.is_empty()
1097 && completed_batch.items.root_events.is_empty();
1098 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1040 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1099 .await; 1041 .await;
1100
1101 // Trigger filter recomputation for generic filter batches
1102 if is_generic_filter {
1103 tracing::info!(
1104 relay = %relay_url_for_fallback,
1105 "Announcement batch complete (fallback path) - triggering filter recomputation"
1106 );
1107 self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback)
1108 .await;
1109 }
1110 } 1042 }
1111 } 1043 }
1112 return; 1044 return;
@@ -1204,20 +1136,8 @@ impl SyncManager {
1204 pending.remove(&relay_url_for_retry); 1136 pending.remove(&relay_url_for_retry);
1205 } 1137 }
1206 drop(pending); 1138 drop(pending);
1207 let is_generic_filter = completed_batch.items.repos.is_empty()
1208 && completed_batch.items.root_events.is_empty();
1209 self.confirm_batch(&relay_url_for_retry, completed_batch) 1139 self.confirm_batch(&relay_url_for_retry, completed_batch)
1210 .await; 1140 .await;
1211
1212 // Trigger filter recomputation for generic filter batches
1213 if is_generic_filter {
1214 tracing::info!(
1215 relay = %relay_url_for_retry,
1216 "Announcement batch complete (retry path) - triggering filter recomputation"
1217 );
1218 self.recompute_new_sync_filters_for_relay(&relay_url_for_retry)
1219 .await;
1220 }
1221 } 1141 }
1222 } 1142 }
1223 return; 1143 return;
@@ -1238,20 +1158,7 @@ impl SyncManager {
1238 drop(pending); 1158 drop(pending);
1239 1159
1240 // 4. Confirm the batch (moves items to RelayState) 1160 // 4. Confirm the batch (moves items to RelayState)
1241 let is_generic_filter =
1242 completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty();
1243 self.confirm_batch(relay_url, completed_batch).await; 1161 self.confirm_batch(relay_url, completed_batch).await;
1244
1245 // 5. For generic filter batches (announcements), trigger filter recomputation
1246 // to subscribe to state events for purgatory announcements that were registered
1247 // during event processing.
1248 if is_generic_filter {
1249 tracing::info!(
1250 relay = %relay_url,
1251 "Announcement batch complete - triggering filter recomputation for purgatory repos"
1252 );
1253 self.recompute_new_sync_filters_for_relay(relay_url).await;
1254 }
1255 } 1162 }
1256 1163
1257 /// Confirm a completed batch by moving items to RelayState 1164 /// Confirm a completed batch by moving items to RelayState
@@ -1530,16 +1437,8 @@ impl SyncManager {
1530 "SyncManager starting" 1437 "SyncManager starting"
1531 ); 1438 );
1532 1439
1533 // 1. Take action channel receiver (created in new()) - sender is shared with write policy 1440 // 1. Create action channel for self-subscriber -> manager communication
1534 let mut action_rx = self 1441 let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100);
1535 .action_rx
1536 .take()
1537 .expect("action_rx should be set in new()");
1538 // Get a clone of action_tx for the self-subscriber
1539 let action_tx_for_subscriber = self
1540 .action_tx
1541 .clone()
1542 .expect("action_tx should be set in new()");
1543 1442
1544 // 2. Create disconnect channel for spawned tasks -> manager communication 1443 // 2. Create disconnect channel for spawned tasks -> manager communication
1545 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); 1444 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100);
@@ -1558,7 +1457,7 @@ impl SyncManager {
1558 format!("ws://{}", self.config.bind_address), 1457 format!("ws://{}", self.config.bind_address),
1559 self.service_domain.clone(), 1458 self.service_domain.clone(),
1560 Arc::clone(&self.repo_sync_index), 1459 Arc::clone(&self.repo_sync_index),
1561 action_tx_for_subscriber, 1460 action_tx,
1562 ); 1461 );
1563 let subscriber_shutdown = shutdown_tx.subscribe(); 1462 let subscriber_shutdown = shutdown_tx.subscribe();
1564 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); 1463 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });