upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-18 19:41:29 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-18 19:41:29 +0000
commite22021f0b248ebcf3bd09210d59b2cdb4701032f (patch)
tree3dd1a3a75a8b7424749c0b0505a3d1ab61ac7895 /src/sync
parenta804164468d3beafb243ece12555b4d1692a075d (diff)
fix: simplify purgatory sync - fix SelfSubscriber sync_level upgrade and negentropy fallback
Three targeted fixes for purgatory announcement sync: 1. SelfSubscriber sync_level upgrade: After or_insert_with in process_batch, always set entry.sync_level = SyncLevel::Full so that when a promoted announcement is broadcast via notify_event and SelfSubscriber receives it, an existing StateOnly entry gets upgraded to Full and PR event subscriptions are triggered immediately (not delayed up to 24h). 2. Negentropy fallback filter split: In handle_eose, when falling back from negentropy to REQ+EOSE, split batch_repos by SyncLevel and call build_sync_level_aware_filters instead of build_layer2_and_layer3_filters. Prevents StateOnly (purgatory) repos from getting Layer 2 #a/#A/#q filters prematurely, which caused nostr-sdk client deduplication to permanently drop PR events after orphan rejection. 3. Recompute sync filters after announcement batch EOSE: Add recompute_new_sync_filters_for_relay calls at all three batch-completion paths in handle_eose for generic filter (announcement) batches. This triggers state-only subscriptions for any purgatory repos registered during that batch, fixing the 24h delay before state event sync starts. 4. User-submitted purgatory announcements: Add repo_sync_index field to PolicyContext with setter/getter, wire in main.rs after SyncManager creation, and register in AcceptPurgatory handler so user-submitted announcements get StateOnly sync started immediately. 5. Update archive tests: test_archive_without_state_events_does_not_sync_git updated to reflect that StateOnly subscription now proactively fetches state events from source relays. test_archive_read_only_creates_bare_repo un-ignored as it now works end-to-end.
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs66
-rw-r--r--src/sync/self_subscriber.rs4
2 files changed, 67 insertions, 3 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 519017b..916e2b0 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -700,6 +700,14 @@ impl SyncManager {
700 self.rejected_events_index.save_to_disk(path) 700 self.rejected_events_index.save_to_disk(path)
701 } 701 }
702 702
703 /// Get a clone of the repo sync index Arc.
704 ///
705 /// This allows the write policy to register user-submitted purgatory announcements
706 /// in the sync index so that state event sync starts promptly.
707 pub fn repo_sync_index(&self) -> RepoSyncIndex {
708 self.repo_sync_index.clone()
709 }
710
703 /// Handle EOSE (End Of Stored Events) for a subscription 711 /// Handle EOSE (End Of Stored Events) for a subscription
704 /// 712 ///
705 /// This method: 713 /// This method:
@@ -951,9 +959,29 @@ impl SyncManager {
951 959
952 // Create REQ+EOSE subscriptions using original semantic filters 960 // Create REQ+EOSE subscriptions using original semantic filters
953 // This queries by kind/author/tags instead of by ID, which may 961 // This queries by kind/author/tags instead of by ID, which may
954 // succeed even when ID-based queries fail 962 // succeed even when ID-based queries fail.
955 let fallback_filters = filters::build_layer2_and_layer3_filters( 963 // Split batch_repos by SyncLevel to avoid sending Layer 2 filters
956 &batch_repos, 964 // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be
965 // rejected as orphan and then silently dropped by nostr-sdk deduplication.
966 let (full_repos, state_only_repos) = {
967 let repo_index = self.repo_sync_index.read().await;
968 let mut full = HashSet::new();
969 let mut state_only = HashSet::new();
970 for repo_ref in &batch_repos {
971 match repo_index.get(repo_ref).map(|n| n.sync_level) {
972 Some(SyncLevel::StateOnly) => {
973 state_only.insert(repo_ref.clone());
974 }
975 _ => {
976 full.insert(repo_ref.clone());
977 }
978 }
979 }
980 (full, state_only)
981 };
982 let fallback_filters = filters::build_sync_level_aware_filters(
983 &full_repos,
984 &state_only_repos,
957 &batch_root_events, 985 &batch_root_events,
958 None, 986 None,
959 ); 987 );
@@ -1033,12 +1061,24 @@ impl SyncManager {
1033 { 1061 {
1034 let mut completed_batch = batches.remove(idx); 1062 let mut completed_batch = batches.remove(idx);
1035 completed_batch.failed = true; // Mark as failed 1063 completed_batch.failed = true; // Mark as failed
1064 let is_generic =
1065 completed_batch.items.repos.is_empty()
1066 && completed_batch.items.root_events.is_empty();
1036 if batches.is_empty() { 1067 if batches.is_empty() {
1037 pending.remove(&relay_url_for_fallback); 1068 pending.remove(&relay_url_for_fallback);
1038 } 1069 }
1039 drop(pending); 1070 drop(pending);
1040 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1071 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1041 .await; 1072 .await;
1073 // For generic filter (announcement) batches, recompute filters
1074 // so any purgatory repos registered during this batch get
1075 // state-only subscriptions triggered.
1076 if is_generic {
1077 self.recompute_new_sync_filters_for_relay(
1078 &relay_url_for_fallback,
1079 )
1080 .await;
1081 }
1042 } 1082 }
1043 } 1083 }
1044 return; 1084 return;
@@ -1132,12 +1172,24 @@ impl SyncManager {
1132 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 1172 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
1133 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { 1173 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
1134 let completed_batch = batches.remove(idx); 1174 let completed_batch = batches.remove(idx);
1175 let is_generic =
1176 completed_batch.items.repos.is_empty()
1177 && completed_batch.items.root_events.is_empty();
1135 if batches.is_empty() { 1178 if batches.is_empty() {
1136 pending.remove(&relay_url_for_retry); 1179 pending.remove(&relay_url_for_retry);
1137 } 1180 }
1138 drop(pending); 1181 drop(pending);
1139 self.confirm_batch(&relay_url_for_retry, completed_batch) 1182 self.confirm_batch(&relay_url_for_retry, completed_batch)
1140 .await; 1183 .await;
1184 // For generic filter (announcement) batches, recompute filters
1185 // so any purgatory repos registered during this batch get
1186 // state-only subscriptions triggered.
1187 if is_generic {
1188 self.recompute_new_sync_filters_for_relay(
1189 &relay_url_for_retry,
1190 )
1191 .await;
1192 }
1141 } 1193 }
1142 } 1194 }
1143 return; 1195 return;
@@ -1148,6 +1200,8 @@ impl SyncManager {
1148 1200
1149 // 3. Batch complete - extract and remove 1201 // 3. Batch complete - extract and remove
1150 let completed_batch = batches.remove(batch_idx); 1202 let completed_batch = batches.remove(batch_idx);
1203 let is_generic = completed_batch.items.repos.is_empty()
1204 && completed_batch.items.root_events.is_empty();
1151 1205
1152 // Clean up empty relay entry 1206 // Clean up empty relay entry
1153 if batches.is_empty() { 1207 if batches.is_empty() {
@@ -1159,6 +1213,12 @@ impl SyncManager {
1159 1213
1160 // 4. Confirm the batch (moves items to RelayState) 1214 // 4. Confirm the batch (moves items to RelayState)
1161 self.confirm_batch(relay_url, completed_batch).await; 1215 self.confirm_batch(relay_url, completed_batch).await;
1216
1217 // 5. For generic filter (announcement) batches, recompute sync filters so any
1218 // purgatory repos registered during this batch get state-only subscriptions triggered.
1219 if is_generic {
1220 self.recompute_new_sync_filters_for_relay(relay_url).await;
1221 }
1162 } 1222 }
1163 1223
1164 /// Confirm a completed batch by moving items to RelayState 1224 /// Confirm a completed batch by moving items to RelayState
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index db16c62..70c3dbf 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -478,6 +478,10 @@ impl SelfSubscriber {
478 root_events: HashSet::new(), 478 root_events: HashSet::new(),
479 sync_level: SyncLevel::Full, 479 sync_level: SyncLevel::Full,
480 }); 480 });
481 // Upgrade sync_level to Full - this handles the case where the entry
482 // already exists as StateOnly (purgatory announcement) and is now being
483 // promoted (git data arrived and the event was broadcast via notify_event).
484 entry.sync_level = SyncLevel::Full;
481 entry.relays.extend(needs.relays); 485 entry.relays.extend(needs.relays);
482 entry.root_events.extend(needs.root_events); 486 entry.root_events.extend(needs.root_events);
483 487