From e22021f0b248ebcf3bd09210d59b2cdb4701032f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:41:29 +0000 Subject: fix: simplify purgatory sync - fix SelfSubscriber sync_level upgrade and negentropy fallback Three targeted fixes for purgatory announcement sync: 1. SelfSubscriber sync_level upgrade: After or_insert_with in process_batch, always set entry.sync_level = SyncLevel::Full so that when a promoted announcement is broadcast via notify_event and SelfSubscriber receives it, an existing StateOnly entry gets upgraded to Full and PR event subscriptions are triggered immediately (not delayed up to 24h). 2. Negentropy fallback filter split: In handle_eose, when falling back from negentropy to REQ+EOSE, split batch_repos by SyncLevel and call build_sync_level_aware_filters instead of build_layer2_and_layer3_filters. Prevents StateOnly (purgatory) repos from getting Layer 2 #a/#A/#q filters prematurely, which caused nostr-sdk client deduplication to permanently drop PR events after orphan rejection. 3. Recompute sync filters after announcement batch EOSE: Add recompute_new_sync_filters_for_relay calls at all three batch-completion paths in handle_eose for generic filter (announcement) batches. This triggers state-only subscriptions for any purgatory repos registered during that batch, fixing the 24h delay before state event sync starts. 4. User-submitted purgatory announcements: Add repo_sync_index field to PolicyContext with setter/getter, wire in main.rs after SyncManager creation, and register in AcceptPurgatory handler so user-submitted announcements get StateOnly sync started immediately. 5. Update archive tests: test_archive_without_state_events_does_not_sync_git updated to reflect that StateOnly subscription now proactively fetches state events from source relays. test_archive_read_only_creates_bare_repo un-ignored as it now works end-to-end. --- src/sync/mod.rs | 66 ++++++++++++++++++++++++++++++++++++++++++--- src/sync/self_subscriber.rs | 4 +++ 2 files changed, 67 insertions(+), 3 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..916e2b0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -700,6 +700,14 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } + /// Get a clone of the repo sync index Arc. + /// + /// This allows the write policy to register user-submitted purgatory announcements + /// in the sync index so that state event sync starts promptly. + pub fn repo_sync_index(&self) -> RepoSyncIndex { + self.repo_sync_index.clone() + } + /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -951,9 +959,29 @@ impl SyncManager { // Create REQ+EOSE subscriptions using original semantic filters // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail - let fallback_filters = filters::build_layer2_and_layer3_filters( - &batch_repos, + // succeed even when ID-based queries fail. + // Split batch_repos by SyncLevel to avoid sending Layer 2 filters + // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be + // rejected as orphan and then silently dropped by nostr-sdk deduplication. + let (full_repos, state_only_repos) = { + let repo_index = self.repo_sync_index.read().await; + let mut full = HashSet::new(); + let mut state_only = HashSet::new(); + for repo_ref in &batch_repos { + match repo_index.get(repo_ref).map(|n| n.sync_level) { + Some(SyncLevel::StateOnly) => { + state_only.insert(repo_ref.clone()); + } + _ => { + full.insert(repo_ref.clone()); + } + } + } + (full, state_only) + }; + let fallback_filters = filters::build_sync_level_aware_filters( + &full_repos, + &state_only_repos, &batch_root_events, None, ); @@ -1033,12 +1061,24 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_fallback, + ) + .await; + } } } return; @@ -1132,12 +1172,24 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_retry, + ) + .await; + } } } return; @@ -1148,6 +1200,8 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); + let is_generic = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1159,6 +1213,12 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; + + // 5. For generic filter (announcement) batches, recompute sync filters so any + // purgatory repos registered during this batch get state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay(relay_url).await; + } } /// Confirm a completed batch by moving items to RelayState diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index db16c62..70c3dbf 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -478,6 +478,10 @@ impl SelfSubscriber { root_events: HashSet::new(), sync_level: SyncLevel::Full, }); + // Upgrade sync_level to Full - this handles the case where the entry + // already exists as StateOnly (purgatory announcement) and is now being + // promoted (git data arrived and the event was broadcast via notify_event). + entry.sync_level = SyncLevel::Full; entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); -- cgit v1.2.3