From f62ef12fb84e2210f9a0a67a5e1e574a8ee66c16 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:48:26 +0000 Subject: refactor: replace inline purgatory sync registration with timer-only approach Remove the redundant inline kind-30617 registration block from the sync event loop and the three is_generic/recompute_new_sync_filters_for_relay calls from confirm_batch error paths. The purgatory announcement sync timer (run_purgatory_announcement_sync) is now the sole registration path. Consolidate NGIT_SYNC_BATCH_WINDOW_MS and NGIT_PURGATORY_SYNC_INTERVAL_MS into a single NGIT_TEST=1 flag that sets both timers to 200ms, replacing two ad-hoc env vars with one reusable test-mode flag. --- src/sync/mod.rs | 103 ++++++++------------------------------------ src/sync/self_subscriber.rs | 12 +++--- 2 files changed, 24 insertions(+), 91 deletions(-) (limited to 'src') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed5b6e7..44efbf0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -399,21 +399,24 @@ async fn run_daily_timer( /// Background task that periodically syncs purgatory announcements into repo_sync_index. /// -/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there -/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` -/// which connects to the relay URLs listed in the announcement and subscribes to state -/// events (kind 30618). +/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). +/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in +/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the +/// relay URLs listed in the announcement and subscribes to state events (kind 30618). /// -/// This covers two cases: -/// - Sync-path announcements: registered inline during event processing, but this -/// provides a safety net in case the inline registration was missed. +/// This is the sole registration path for purgatory announcements: +/// - Sync-path announcements: registered here within one interval of arriving. /// - User-submitted purgatory announcements: the SelfSubscriber never sees them -/// (they're rejected from DB), so this timer is the primary registration path. +/// (they're rejected from DB), so this timer is the only registration path. async fn run_purgatory_announcement_sync( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, ) { - let interval = Duration::from_secs(5); + let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_secs(5) + }; loop { tokio::select! { _ = tokio::time::sleep(interval) => { @@ -1084,24 +1087,12 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_fallback, - ) - .await; - } } } return; @@ -1195,24 +1186,12 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_retry, - ) - .await; - } } } return; @@ -1223,8 +1202,6 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); - let is_generic = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1236,12 +1213,6 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter (announcement) batches, recompute sync filters so any - // purgatory repos registered during this batch get state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1370,7 +1341,7 @@ impl SyncManager { /// to be batched and create Layer 2/3 filters before we mark sync complete. /// /// The 6-second delay is based on: - /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) + /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) /// - Buffer for processing: 1 second /// /// Called after each batch is confirmed to detect completion. @@ -1785,7 +1756,6 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); - let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1827,50 +1797,13 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) + // + // Note: announcement events (kind 30617) are registered in repo_sync_index + // by the purgatory announcement sync timer (run_purgatory_announcement_sync) + // rather than inline here. if result == ProcessResult::Purgatory { - // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly - // so that state events (kind 30618) are synced for this purgatory announcement - if event.kind == Kind::GitRepoAnnouncement { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = format!("30617:{}:{}", event.pubkey, identifier); - - // Extract relay URLs from the purgatory entry - let relays = write_policy - .purgatory() - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - tracing::info!( - event_id = %event.id, - repo_id = %repo_id, - relay_count = relays.len(), - "Registering purgatory announcement in RepoSyncIndex with StateOnly level" - ); - - // Register in RepoSyncIndex with StateOnly level - let mut index = repo_sync_index.write().await; - let entry = index - .entry(repo_id) - .or_insert_with(|| RepoSyncNeeds { - relays: HashSet::new(), - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - entry.relays.extend(relays); - // Don't upgrade sync_level if already Full - // (e.g., if announcement was promoted before this runs) - } - } // State events (kind 30618) - extract identifier and trigger immediate sync - else if event.kind.as_u16() == 30618 { + if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 70c3dbf..ab10c49 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -126,14 +126,14 @@ impl SelfSubscriber { /// Get batch window from environment or use default /// - /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. + /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. /// Default: 5000ms (5 seconds) fn get_batch_window() -> Duration { - std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") - .ok() - .and_then(|s| s.parse::().ok()) - .map(Duration::from_millis) - .unwrap_or(Duration::from_millis(5000)) + if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_millis(5000) + } } /// Process a relay pool notification -- cgit v1.2.3