From f62ef12fb84e2210f9a0a67a5e1e574a8ee66c16 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:48:26 +0000 Subject: refactor: replace inline purgatory sync registration with timer-only approach Remove the redundant inline kind-30617 registration block from the sync event loop and the three is_generic/recompute_new_sync_filters_for_relay calls from confirm_batch error paths. The purgatory announcement sync timer (run_purgatory_announcement_sync) is now the sole registration path. Consolidate NGIT_SYNC_BATCH_WINDOW_MS and NGIT_PURGATORY_SYNC_INTERVAL_MS into a single NGIT_TEST=1 flag that sets both timers to 200ms, replacing two ad-hoc env vars with one reusable test-mode flag. --- src/sync/mod.rs | 103 ++++++---------------------------- src/sync/self_subscriber.rs | 12 ++-- tests/common/relay.rs | 2 +- tests/sync/maintainer_reprocessing.rs | 2 +- 4 files changed, 26 insertions(+), 93 deletions(-) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed5b6e7..44efbf0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -399,21 +399,24 @@ async fn run_daily_timer( /// Background task that periodically syncs purgatory announcements into repo_sync_index. /// -/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there -/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` -/// which connects to the relay URLs listed in the announcement and subscribes to state -/// events (kind 30618). +/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). +/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in +/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the +/// relay URLs listed in the announcement and subscribes to state events (kind 30618). /// -/// This covers two cases: -/// - Sync-path announcements: registered inline during event processing, but this -/// provides a safety net in case the inline registration was missed. +/// This is the sole registration path for purgatory announcements: +/// - Sync-path announcements: registered here within one interval of arriving. /// - User-submitted purgatory announcements: the SelfSubscriber never sees them -/// (they're rejected from DB), so this timer is the primary registration path. +/// (they're rejected from DB), so this timer is the only registration path. async fn run_purgatory_announcement_sync( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, ) { - let interval = Duration::from_secs(5); + let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_secs(5) + }; loop { tokio::select! { _ = tokio::time::sleep(interval) => { @@ -1084,24 +1087,12 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_fallback, - ) - .await; - } } } return; @@ -1195,24 +1186,12 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_retry, - ) - .await; - } } } return; @@ -1223,8 +1202,6 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); - let is_generic = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1236,12 +1213,6 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter (announcement) batches, recompute sync filters so any - // purgatory repos registered during this batch get state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1370,7 +1341,7 @@ impl SyncManager { /// to be batched and create Layer 2/3 filters before we mark sync complete. /// /// The 6-second delay is based on: - /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) + /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) /// - Buffer for processing: 1 second /// /// Called after each batch is confirmed to detect completion. @@ -1785,7 +1756,6 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); - let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1827,50 +1797,13 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) + // + // Note: announcement events (kind 30617) are registered in repo_sync_index + // by the purgatory announcement sync timer (run_purgatory_announcement_sync) + // rather than inline here. if result == ProcessResult::Purgatory { - // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly - // so that state events (kind 30618) are synced for this purgatory announcement - if event.kind == Kind::GitRepoAnnouncement { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = format!("30617:{}:{}", event.pubkey, identifier); - - // Extract relay URLs from the purgatory entry - let relays = write_policy - .purgatory() - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - tracing::info!( - event_id = %event.id, - repo_id = %repo_id, - relay_count = relays.len(), - "Registering purgatory announcement in RepoSyncIndex with StateOnly level" - ); - - // Register in RepoSyncIndex with StateOnly level - let mut index = repo_sync_index.write().await; - let entry = index - .entry(repo_id) - .or_insert_with(|| RepoSyncNeeds { - relays: HashSet::new(), - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - entry.relays.extend(relays); - // Don't upgrade sync_level if already Full - // (e.g., if announcement was promoted before this runs) - } - } // State events (kind 30618) - extract identifier and trigger immediate sync - else if event.kind.as_u16() == 30618 { + if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 70c3dbf..ab10c49 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -126,14 +126,14 @@ impl SelfSubscriber { /// Get batch window from environment or use default /// - /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. + /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. /// Default: 5000ms (5 seconds) fn get_batch_window() -> Duration { - std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") - .ok() - .and_then(|s| s.parse::().ok()) - .map(Duration::from_millis) - .unwrap_or(Duration::from_millis(5000)) + if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_millis(5000) + } } /// Process a relay pool notification diff --git a/tests/common/relay.rs b/tests/common/relay.rs index 0ec9a2e..b1e96cf 100644 --- a/tests/common/relay.rs +++ b/tests/common/relay.rs @@ -204,7 +204,7 @@ impl TestRelay { .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation .env("NGIT_OWNER_NPUB", &test_npub) - .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) + .env("NGIT_TEST", "1") // Enable test mode: fast timers (200ms batch window, 200ms purgatory sync) .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 61d8e14..ff1eb43 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -377,7 +377,7 @@ async fn test_multiple_maintainers_all_reprocessed() { println!("relay_b started at {}", relay_b.url()); // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. - // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // The negentropy sync completes within ~200ms (NGIT_TEST=1 sets batch window to 200ms), but we // allow extra time for slow CI environments. tokio::time::sleep(Duration::from_secs(3)).await; println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); -- cgit v1.2.3