upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs66
-rw-r--r--src/sync/self_subscriber.rs4
2 files changed, 67 insertions, 3 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 519017b..916e2b0 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -700,6 +700,14 @@ impl SyncManager {
700 self.rejected_events_index.save_to_disk(path) 700 self.rejected_events_index.save_to_disk(path)
701 } 701 }
702 702
703 /// Get a clone of the repo sync index Arc.
704 ///
705 /// This allows the write policy to register user-submitted purgatory announcements
706 /// in the sync index so that state event sync starts promptly.
707 pub fn repo_sync_index(&self) -> RepoSyncIndex {
708 self.repo_sync_index.clone()
709 }
710
703 /// Handle EOSE (End Of Stored Events) for a subscription 711 /// Handle EOSE (End Of Stored Events) for a subscription
704 /// 712 ///
705 /// This method: 713 /// This method:
@@ -951,9 +959,29 @@ impl SyncManager {
951 959
952 // Create REQ+EOSE subscriptions using original semantic filters 960 // Create REQ+EOSE subscriptions using original semantic filters
953 // This queries by kind/author/tags instead of by ID, which may 961 // This queries by kind/author/tags instead of by ID, which may
954 // succeed even when ID-based queries fail 962 // succeed even when ID-based queries fail.
955 let fallback_filters = filters::build_layer2_and_layer3_filters( 963 // Split batch_repos by SyncLevel to avoid sending Layer 2 filters
956 &batch_repos, 964 // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be
965 // rejected as orphan and then silently dropped by nostr-sdk deduplication.
966 let (full_repos, state_only_repos) = {
967 let repo_index = self.repo_sync_index.read().await;
968 let mut full = HashSet::new();
969 let mut state_only = HashSet::new();
970 for repo_ref in &batch_repos {
971 match repo_index.get(repo_ref).map(|n| n.sync_level) {
972 Some(SyncLevel::StateOnly) => {
973 state_only.insert(repo_ref.clone());
974 }
975 _ => {
976 full.insert(repo_ref.clone());
977 }
978 }
979 }
980 (full, state_only)
981 };
982 let fallback_filters = filters::build_sync_level_aware_filters(
983 &full_repos,
984 &state_only_repos,
957 &batch_root_events, 985 &batch_root_events,
958 None, 986 None,
959 ); 987 );
@@ -1033,12 +1061,24 @@ impl SyncManager {
1033 { 1061 {
1034 let mut completed_batch = batches.remove(idx); 1062 let mut completed_batch = batches.remove(idx);
1035 completed_batch.failed = true; // Mark as failed 1063 completed_batch.failed = true; // Mark as failed
1064 let is_generic =
1065 completed_batch.items.repos.is_empty()
1066 && completed_batch.items.root_events.is_empty();
1036 if batches.is_empty() { 1067 if batches.is_empty() {
1037 pending.remove(&relay_url_for_fallback); 1068 pending.remove(&relay_url_for_fallback);
1038 } 1069 }
1039 drop(pending); 1070 drop(pending);
1040 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1071 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1041 .await; 1072 .await;
1073 // For generic filter (announcement) batches, recompute filters
1074 // so any purgatory repos registered during this batch get
1075 // state-only subscriptions triggered.
1076 if is_generic {
1077 self.recompute_new_sync_filters_for_relay(
1078 &relay_url_for_fallback,
1079 )
1080 .await;
1081 }
1042 } 1082 }
1043 } 1083 }
1044 return; 1084 return;
@@ -1132,12 +1172,24 @@ impl SyncManager {
1132 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 1172 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
1133 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { 1173 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
1134 let completed_batch = batches.remove(idx); 1174 let completed_batch = batches.remove(idx);
1175 let is_generic =
1176 completed_batch.items.repos.is_empty()
1177 && completed_batch.items.root_events.is_empty();
1135 if batches.is_empty() { 1178 if batches.is_empty() {
1136 pending.remove(&relay_url_for_retry); 1179 pending.remove(&relay_url_for_retry);
1137 } 1180 }
1138 drop(pending); 1181 drop(pending);
1139 self.confirm_batch(&relay_url_for_retry, completed_batch) 1182 self.confirm_batch(&relay_url_for_retry, completed_batch)
1140 .await; 1183 .await;
1184 // For generic filter (announcement) batches, recompute filters
1185 // so any purgatory repos registered during this batch get
1186 // state-only subscriptions triggered.
1187 if is_generic {
1188 self.recompute_new_sync_filters_for_relay(
1189 &relay_url_for_retry,
1190 )
1191 .await;
1192 }
1141 } 1193 }
1142 } 1194 }
1143 return; 1195 return;
@@ -1148,6 +1200,8 @@ impl SyncManager {
1148 1200
1149 // 3. Batch complete - extract and remove 1201 // 3. Batch complete - extract and remove
1150 let completed_batch = batches.remove(batch_idx); 1202 let completed_batch = batches.remove(batch_idx);
1203 let is_generic = completed_batch.items.repos.is_empty()
1204 && completed_batch.items.root_events.is_empty();
1151 1205
1152 // Clean up empty relay entry 1206 // Clean up empty relay entry
1153 if batches.is_empty() { 1207 if batches.is_empty() {
@@ -1159,6 +1213,12 @@ impl SyncManager {
1159 1213
1160 // 4. Confirm the batch (moves items to RelayState) 1214 // 4. Confirm the batch (moves items to RelayState)
1161 self.confirm_batch(relay_url, completed_batch).await; 1215 self.confirm_batch(relay_url, completed_batch).await;
1216
1217 // 5. For generic filter (announcement) batches, recompute sync filters so any
1218 // purgatory repos registered during this batch get state-only subscriptions triggered.
1219 if is_generic {
1220 self.recompute_new_sync_filters_for_relay(relay_url).await;
1221 }
1162 } 1222 }
1163 1223
1164 /// Confirm a completed batch by moving items to RelayState 1224 /// Confirm a completed batch by moving items to RelayState
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index db16c62..70c3dbf 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -478,6 +478,10 @@ impl SelfSubscriber {
478 root_events: HashSet::new(), 478 root_events: HashSet::new(),
479 sync_level: SyncLevel::Full, 479 sync_level: SyncLevel::Full,
480 }); 480 });
481 // Upgrade sync_level to Full - this handles the case where the entry
482 // already exists as StateOnly (purgatory announcement) and is now being
483 // promoted (git data arrived and the event was broadcast via notify_event).
484 entry.sync_level = SyncLevel::Full;
481 entry.relays.extend(needs.relays); 485 entry.relays.extend(needs.relays);
482 entry.root_events.extend(needs.root_events); 486 entry.root_events.extend(needs.root_events);
483 487