From e22021f0b248ebcf3bd09210d59b2cdb4701032f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:41:29 +0000 Subject: fix: simplify purgatory sync - fix SelfSubscriber sync_level upgrade and negentropy fallback Three targeted fixes for purgatory announcement sync: 1. SelfSubscriber sync_level upgrade: After or_insert_with in process_batch, always set entry.sync_level = SyncLevel::Full so that when a promoted announcement is broadcast via notify_event and SelfSubscriber receives it, an existing StateOnly entry gets upgraded to Full and PR event subscriptions are triggered immediately (not delayed up to 24h). 2. Negentropy fallback filter split: In handle_eose, when falling back from negentropy to REQ+EOSE, split batch_repos by SyncLevel and call build_sync_level_aware_filters instead of build_layer2_and_layer3_filters. Prevents StateOnly (purgatory) repos from getting Layer 2 #a/#A/#q filters prematurely, which caused nostr-sdk client deduplication to permanently drop PR events after orphan rejection. 3. Recompute sync filters after announcement batch EOSE: Add recompute_new_sync_filters_for_relay calls at all three batch-completion paths in handle_eose for generic filter (announcement) batches. This triggers state-only subscriptions for any purgatory repos registered during that batch, fixing the 24h delay before state event sync starts. 4. User-submitted purgatory announcements: Add repo_sync_index field to PolicyContext with setter/getter, wire in main.rs after SyncManager creation, and register in AcceptPurgatory handler so user-submitted announcements get StateOnly sync started immediately. 5. Update archive tests: test_archive_without_state_events_does_not_sync_git updated to reflect that StateOnly subscription now proactively fetches state events from source relays. test_archive_read_only_creates_bare_repo un-ignored as it now works end-to-end. --- src/main.rs | 7 +++++ src/nostr/builder.rs | 54 +++++++++++++++++++++++++++++++++++++ src/nostr/policy/mod.rs | 19 +++++++++++++ src/sync/mod.rs | 66 ++++++++++++++++++++++++++++++++++++++++++--- src/sync/self_subscriber.rs | 4 +++ tests/archive_read_only.rs | 63 +++++++++++++++++++++++++++---------------- 6 files changed, 187 insertions(+), 26 deletions(-) diff --git a/src/main.rs b/src/main.rs index ab6ede7..ebe05a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,13 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); + // Wire repo_sync_index into write policy so user-submitted purgatory announcements + // get registered for state event sync immediately (Fix 3). + let repo_sync_index = sync_manager.repo_sync_index(); + relay_with_db + .write_policy + .set_repo_sync_index(repo_sync_index); + tokio::spawn(async move { sync_manager.run().await; }); diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..8d1e461 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -17,6 +17,7 @@ use crate::nostr::policy::{ AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; +use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; @@ -98,6 +99,14 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } + /// Set the repo sync index so that user-submitted purgatory announcements can + /// be registered for state event sync immediately. + /// + /// This must be called after SyncManager is created. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + self.ctx.set_repo_sync_index(index); + } + /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -146,6 +155,51 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); + + // Register repo in repo_sync_index with StateOnly level so that + // state event sync starts promptly via the next batch EOSE recompute. + // This handles user-submitted purgatory announcements - the SelfSubscriber + // only sees DB events, so it won't pick these up automatically. + if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { + if let Ok(announcement) = + RepositoryAnnouncement::from_event(event.clone()) + { + use std::collections::HashSet; + let repo_id = format!( + "30617:{}:{}", + event.pubkey, + announcement.identifier + ); + + // Extract relay URLs from the announcement event tags + let relays: HashSet = event + .tags + .iter() + .flat_map(|tag| { + let tag_vec = tag.as_slice(); + if !tag_vec.is_empty() && tag_vec[0] == "relays" { + tag_vec[1..].iter().map(|s| s.to_string()).collect::>() + } else { + vec![] + } + }) + .collect(); + + let mut index = repo_sync_index.write().await; + index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { + relays, + root_events: HashSet::new(), + sync_level: SyncLevel::StateOnly, + }); + drop(index); + + tracing::debug!( + repo_id = %repo_id, + "Registered purgatory announcement in repo_sync_index as StateOnly" + ); + } + } + WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..c958586 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; +use crate::sync::RepoSyncIndex; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -34,6 +35,8 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, + /// Repo sync index for registering purgatory announcements (set after SyncManager creation) + pub repo_sync_index: Arc>>, } impl PolicyContext { @@ -51,6 +54,7 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, + repo_sync_index: Arc::new(std::sync::RwLock::new(None)), } } @@ -68,4 +72,19 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } + + /// Set the repo sync index after SyncManager has been created. + /// + /// This allows purgatory announcements submitted by users to be registered + /// in the sync index so state event sync starts promptly. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + let mut guard = self.repo_sync_index.write().unwrap(); + *guard = Some(index); + } + + /// Get a clone of the repo sync index if it has been set. + pub fn get_repo_sync_index(&self) -> Option { + let guard = self.repo_sync_index.read().unwrap(); + guard.clone() + } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..916e2b0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -700,6 +700,14 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } + /// Get a clone of the repo sync index Arc. + /// + /// This allows the write policy to register user-submitted purgatory announcements + /// in the sync index so that state event sync starts promptly. + pub fn repo_sync_index(&self) -> RepoSyncIndex { + self.repo_sync_index.clone() + } + /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -951,9 +959,29 @@ impl SyncManager { // Create REQ+EOSE subscriptions using original semantic filters // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail - let fallback_filters = filters::build_layer2_and_layer3_filters( - &batch_repos, + // succeed even when ID-based queries fail. + // Split batch_repos by SyncLevel to avoid sending Layer 2 filters + // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be + // rejected as orphan and then silently dropped by nostr-sdk deduplication. + let (full_repos, state_only_repos) = { + let repo_index = self.repo_sync_index.read().await; + let mut full = HashSet::new(); + let mut state_only = HashSet::new(); + for repo_ref in &batch_repos { + match repo_index.get(repo_ref).map(|n| n.sync_level) { + Some(SyncLevel::StateOnly) => { + state_only.insert(repo_ref.clone()); + } + _ => { + full.insert(repo_ref.clone()); + } + } + } + (full, state_only) + }; + let fallback_filters = filters::build_sync_level_aware_filters( + &full_repos, + &state_only_repos, &batch_root_events, None, ); @@ -1033,12 +1061,24 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_fallback, + ) + .await; + } } } return; @@ -1132,12 +1172,24 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_retry, + ) + .await; + } } } return; @@ -1148,6 +1200,8 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); + let is_generic = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1159,6 +1213,12 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; + + // 5. For generic filter (announcement) batches, recompute sync filters so any + // purgatory repos registered during this batch get state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay(relay_url).await; + } } /// Confirm a completed batch by moving items to RelayState diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index db16c62..70c3dbf 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -478,6 +478,10 @@ impl SelfSubscriber { root_events: HashSet::new(), sync_level: SyncLevel::Full, }); + // Upgrade sync_level to Full - this handles the case where the entry + // already exists as StateOnly (purgatory announcement) and is now being + // promoted (git data arrived and the event was broadcast via notify_event). + entry.sync_level = SyncLevel::Full; entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs index e388ae5..069b3b7 100644 --- a/tests/archive_read_only.rs +++ b/tests/archive_read_only.rs @@ -55,7 +55,6 @@ use std::time::Duration; /// 5. Verify bare repository is created and git data is synced /// 6. Verify git pushes are rejected (read-only mode) #[tokio::test] -#[ignore] // Requires SyncLevel implementation (Phase 3) - purgatory announcements don't trigger per-repo sync yet async fn test_archive_read_only_creates_bare_repo() { // 1. Start source relay let source_relay = TestRelay::start().await; @@ -264,24 +263,24 @@ async fn test_archive_read_only_creates_bare_repo() { source_relay.stop().await; } -/// Test that archive mode without state events does NOT sync git data. +/// Test that archive mode proactively syncs state events and git data +/// when the source relay has state events available. /// -/// This verifies the security model: archive mode only syncs git data -/// when there are state events to validate against. +/// With StateOnly sync now implemented, purgatory announcements subscribe +/// to state events from the relays listed in the announcement. This means +/// the archive relay will: +/// 1. Sync the announcement → purgatory → register as StateOnly in repo_sync_index +/// 2. Subscribe to state events (kind 30618) on source relay +/// 3. Receive the state event → purgatory sync triggered +/// 4. Fetch git data from source relay's clone URL /// -/// With announcement purgatory, the flow is: -/// 1. Send announcement to source relay (goes to purgatory) -/// 2. Send state event to source relay (goes to purgatory) -/// 3. Push git data to source relay (promotes announcement and state event) -/// 4. Start archive relay with sync from source -/// 5. Archive relay syncs the promoted announcement -/// 6. Verify git data is NOT synced (archive has no state event to authorize git fetch) +/// This test verifies the full sync chain works end-to-end for archive mode. #[tokio::test] -async fn test_archive_without_state_events_does_not_sync_git() { +async fn test_archive_syncs_state_events_and_git_data_via_state_only_subscription() { // 1. Start source relay let source_relay = TestRelay::start().await; let keys = Keys::generate(); - let identifier = "archive-no-state-repo"; + let identifier = "archive-state-only-sync-repo"; // Pre-allocate archive relay port let archive_port = TestRelay::find_free_port(); @@ -295,6 +294,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { let npub = keys.public_key().to_bech32().expect("Failed to get npub"); // 3. Create and send announcement listing BOTH relays + // The archive relay will subscribe to state events on BOTH listed relays let announcement = create_repo_announcement( &keys, &[&source_relay.domain(), &archive_domain], @@ -337,6 +337,8 @@ async fn test_archive_without_state_events_does_not_sync_git() { ) .expect("Failed to create state event"); + let state_event_id = state_event.id; + source_client .send_event(&state_event) .await @@ -348,9 +350,12 @@ async fn test_archive_without_state_events_does_not_sync_git() { push_to_relay(temp_dir.path(), &source_relay.domain(), &npub, identifier) .expect("Push to source should succeed"); - tokio::time::sleep(Duration::from_millis(500)).await; + // Wait for state event to be promoted on source relay + wait_for_event_served(source_relay.url(), &state_event_id, Duration::from_secs(5)) + .await + .expect("State event should be served on source relay after push"); - // 6. Start archive relay (without state event - we don't send state event to archive) + // 6. Start archive relay - StateOnly subscription will proactively fetch state events let archive_relay = TestRelay::start_with_archive_and_sync( archive_port, Some(source_relay.url().to_string()), @@ -360,15 +365,28 @@ async fn test_archive_without_state_events_does_not_sync_git() { ) .await; - // Wait for sync + // Wait for sync connection wait_for_sync_connection(archive_relay.url(), 1, Duration::from_secs(5)) .await .expect("Sync connection should establish"); - // Give time for sync to fetch announcement - tokio::time::sleep(Duration::from_secs(3)).await; + // 7. Wait for state event to be served on archive relay + // The StateOnly subscription fetches the state event from source relay, + // which then triggers purgatory sync and git data fetch. + let found = wait_for_event_served( + archive_relay.url(), + &state_event_id, + Duration::from_secs(30), // Allow time for sync + git fetch + ) + .await; + + assert!( + found.is_ok(), + "State event should be served on archive after StateOnly subscription fetches it: {:?}", + found.err() + ); - // 7. Verify bare repository was created (announcement was synced and accepted to purgatory) + // 8. Verify bare repository was created let repo_path = archive_relay .git_data_path() .join(format!("{}/{}.git", npub, identifier)); @@ -378,8 +396,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { "Bare repository should be created for archive announcement" ); - // 8. Verify git data was NOT synced (no state events on archive to trigger git fetch) - // Check that the commit does NOT exist in the archive relay's repo + // 9. Verify git data was synced via the state event chain let output = tokio::process::Command::new("git") .args(["cat-file", "-t", &commit_hash]) .current_dir(&repo_path) @@ -389,8 +406,8 @@ async fn test_archive_without_state_events_does_not_sync_git() { let commit_exists = output.map(|o| o.status.success()).unwrap_or(false); assert!( - !commit_exists, - "Git data should NOT be synced without state events (security: validates against Nostr state)" + commit_exists, + "Git data should be synced via StateOnly subscription → state event → git fetch chain" ); // Cleanup -- cgit v1.2.3