diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-23 12:48:26 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-23 12:48:26 +0000 |
| commit | f62ef12fb84e2210f9a0a67a5e1e574a8ee66c16 (patch) | |
| tree | ea457689e8507ae01cbf99fa4c210763f836c7ac | |
| parent | 49401286ea7413f834197e6a5b221649e10e2ad8 (diff) | |
refactor: replace inline purgatory sync registration with timer-only approach
Remove the redundant inline kind-30617 registration block from the sync
event loop and the three is_generic/recompute_new_sync_filters_for_relay
calls from confirm_batch error paths. The purgatory announcement sync
timer (run_purgatory_announcement_sync) is now the sole registration path.
Consolidate NGIT_SYNC_BATCH_WINDOW_MS and NGIT_PURGATORY_SYNC_INTERVAL_MS
into a single NGIT_TEST=1 flag that sets both timers to 200ms, replacing
two ad-hoc env vars with one reusable test-mode flag.
| -rw-r--r-- | src/sync/mod.rs | 103 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 12 | ||||
| -rw-r--r-- | tests/common/relay.rs | 2 | ||||
| -rw-r--r-- | tests/sync/maintainer_reprocessing.rs | 2 |
4 files changed, 26 insertions, 93 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed5b6e7..44efbf0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -399,21 +399,24 @@ async fn run_daily_timer( | |||
| 399 | 399 | ||
| 400 | /// Background task that periodically syncs purgatory announcements into repo_sync_index. | 400 | /// Background task that periodically syncs purgatory announcements into repo_sync_index. |
| 401 | /// | 401 | /// |
| 402 | /// Runs every 5 seconds. For each announcement currently in purgatory, ensures there | 402 | /// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). |
| 403 | /// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` | 403 | /// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in |
| 404 | /// which connects to the relay URLs listed in the announcement and subscribes to state | 404 | /// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the |
| 405 | /// events (kind 30618). | 405 | /// relay URLs listed in the announcement and subscribes to state events (kind 30618). |
| 406 | /// | 406 | /// |
| 407 | /// This covers two cases: | 407 | /// This is the sole registration path for purgatory announcements: |
| 408 | /// - Sync-path announcements: registered inline during event processing, but this | 408 | /// - Sync-path announcements: registered here within one interval of arriving. |
| 409 | /// provides a safety net in case the inline registration was missed. | ||
| 410 | /// - User-submitted purgatory announcements: the SelfSubscriber never sees them | 409 | /// - User-submitted purgatory announcements: the SelfSubscriber never sees them |
| 411 | /// (they're rejected from DB), so this timer is the primary registration path. | 410 | /// (they're rejected from DB), so this timer is the only registration path. |
| 412 | async fn run_purgatory_announcement_sync( | 411 | async fn run_purgatory_announcement_sync( |
| 413 | sync_manager: Arc<Mutex<SyncManager>>, | 412 | sync_manager: Arc<Mutex<SyncManager>>, |
| 414 | mut shutdown_rx: broadcast::Receiver<()>, | 413 | mut shutdown_rx: broadcast::Receiver<()>, |
| 415 | ) { | 414 | ) { |
| 416 | let interval = Duration::from_secs(5); | 415 | let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { |
| 416 | Duration::from_millis(200) | ||
| 417 | } else { | ||
| 418 | Duration::from_secs(5) | ||
| 419 | }; | ||
| 417 | loop { | 420 | loop { |
| 418 | tokio::select! { | 421 | tokio::select! { |
| 419 | _ = tokio::time::sleep(interval) => { | 422 | _ = tokio::time::sleep(interval) => { |
| @@ -1084,24 +1087,12 @@ impl SyncManager { | |||
| 1084 | { | 1087 | { |
| 1085 | let mut completed_batch = batches.remove(idx); | 1088 | let mut completed_batch = batches.remove(idx); |
| 1086 | completed_batch.failed = true; // Mark as failed | 1089 | completed_batch.failed = true; // Mark as failed |
| 1087 | let is_generic = | ||
| 1088 | completed_batch.items.repos.is_empty() | ||
| 1089 | && completed_batch.items.root_events.is_empty(); | ||
| 1090 | if batches.is_empty() { | 1090 | if batches.is_empty() { |
| 1091 | pending.remove(&relay_url_for_fallback); | 1091 | pending.remove(&relay_url_for_fallback); |
| 1092 | } | 1092 | } |
| 1093 | drop(pending); | 1093 | drop(pending); |
| 1094 | self.confirm_batch(&relay_url_for_fallback, completed_batch) | 1094 | self.confirm_batch(&relay_url_for_fallback, completed_batch) |
| 1095 | .await; | 1095 | .await; |
| 1096 | // For generic filter (announcement) batches, recompute filters | ||
| 1097 | // so any purgatory repos registered during this batch get | ||
| 1098 | // state-only subscriptions triggered. | ||
| 1099 | if is_generic { | ||
| 1100 | self.recompute_new_sync_filters_for_relay( | ||
| 1101 | &relay_url_for_fallback, | ||
| 1102 | ) | ||
| 1103 | .await; | ||
| 1104 | } | ||
| 1105 | } | 1096 | } |
| 1106 | } | 1097 | } |
| 1107 | return; | 1098 | return; |
| @@ -1195,24 +1186,12 @@ impl SyncManager { | |||
| 1195 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | 1186 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { |
| 1196 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { | 1187 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { |
| 1197 | let completed_batch = batches.remove(idx); | 1188 | let completed_batch = batches.remove(idx); |
| 1198 | let is_generic = | ||
| 1199 | completed_batch.items.repos.is_empty() | ||
| 1200 | && completed_batch.items.root_events.is_empty(); | ||
| 1201 | if batches.is_empty() { | 1189 | if batches.is_empty() { |
| 1202 | pending.remove(&relay_url_for_retry); | 1190 | pending.remove(&relay_url_for_retry); |
| 1203 | } | 1191 | } |
| 1204 | drop(pending); | 1192 | drop(pending); |
| 1205 | self.confirm_batch(&relay_url_for_retry, completed_batch) | 1193 | self.confirm_batch(&relay_url_for_retry, completed_batch) |
| 1206 | .await; | 1194 | .await; |
| 1207 | // For generic filter (announcement) batches, recompute filters | ||
| 1208 | // so any purgatory repos registered during this batch get | ||
| 1209 | // state-only subscriptions triggered. | ||
| 1210 | if is_generic { | ||
| 1211 | self.recompute_new_sync_filters_for_relay( | ||
| 1212 | &relay_url_for_retry, | ||
| 1213 | ) | ||
| 1214 | .await; | ||
| 1215 | } | ||
| 1216 | } | 1195 | } |
| 1217 | } | 1196 | } |
| 1218 | return; | 1197 | return; |
| @@ -1223,8 +1202,6 @@ impl SyncManager { | |||
| 1223 | 1202 | ||
| 1224 | // 3. Batch complete - extract and remove | 1203 | // 3. Batch complete - extract and remove |
| 1225 | let completed_batch = batches.remove(batch_idx); | 1204 | let completed_batch = batches.remove(batch_idx); |
| 1226 | let is_generic = completed_batch.items.repos.is_empty() | ||
| 1227 | && completed_batch.items.root_events.is_empty(); | ||
| 1228 | 1205 | ||
| 1229 | // Clean up empty relay entry | 1206 | // Clean up empty relay entry |
| 1230 | if batches.is_empty() { | 1207 | if batches.is_empty() { |
| @@ -1236,12 +1213,6 @@ impl SyncManager { | |||
| 1236 | 1213 | ||
| 1237 | // 4. Confirm the batch (moves items to RelayState) | 1214 | // 4. Confirm the batch (moves items to RelayState) |
| 1238 | self.confirm_batch(relay_url, completed_batch).await; | 1215 | self.confirm_batch(relay_url, completed_batch).await; |
| 1239 | |||
| 1240 | // 5. For generic filter (announcement) batches, recompute sync filters so any | ||
| 1241 | // purgatory repos registered during this batch get state-only subscriptions triggered. | ||
| 1242 | if is_generic { | ||
| 1243 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1244 | } | ||
| 1245 | } | 1216 | } |
| 1246 | 1217 | ||
| 1247 | /// Confirm a completed batch by moving items to RelayState | 1218 | /// Confirm a completed batch by moving items to RelayState |
| @@ -1370,7 +1341,7 @@ impl SyncManager { | |||
| 1370 | /// to be batched and create Layer 2/3 filters before we mark sync complete. | 1341 | /// to be batched and create Layer 2/3 filters before we mark sync complete. |
| 1371 | /// | 1342 | /// |
| 1372 | /// The 6-second delay is based on: | 1343 | /// The 6-second delay is based on: |
| 1373 | /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) | 1344 | /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) |
| 1374 | /// - Buffer for processing: 1 second | 1345 | /// - Buffer for processing: 1 second |
| 1375 | /// | 1346 | /// |
| 1376 | /// Called after each batch is confirmed to detect completion. | 1347 | /// Called after each batch is confirmed to detect completion. |
| @@ -1785,7 +1756,6 @@ impl SyncManager { | |||
| 1785 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); | 1756 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); |
| 1786 | let metrics_clone = self.metrics.clone(); | 1757 | let metrics_clone = self.metrics.clone(); |
| 1787 | let pending_sync_index = Arc::clone(&self.pending_sync_index); | 1758 | let pending_sync_index = Arc::clone(&self.pending_sync_index); |
| 1788 | let repo_sync_index = Arc::clone(&self.repo_sync_index); | ||
| 1789 | let health_tracker = Arc::clone(&self.health_tracker); | 1759 | let health_tracker = Arc::clone(&self.health_tracker); |
| 1790 | let rejected_events_index = Arc::clone(&self.rejected_events_index); | 1760 | let rejected_events_index = Arc::clone(&self.rejected_events_index); |
| 1791 | 1761 | ||
| @@ -1827,50 +1797,13 @@ impl SyncManager { | |||
| 1827 | 1797 | ||
| 1828 | // For sync-triggered events that go to purgatory, trigger immediate sync | 1798 | // For sync-triggered events that go to purgatory, trigger immediate sync |
| 1829 | // (instead of the default 3-minute delay for user-submitted events) | 1799 | // (instead of the default 3-minute delay for user-submitted events) |
| 1800 | // | ||
| 1801 | // Note: announcement events (kind 30617) are registered in repo_sync_index | ||
| 1802 | // by the purgatory announcement sync timer (run_purgatory_announcement_sync) | ||
| 1803 | // rather than inline here. | ||
| 1830 | if result == ProcessResult::Purgatory { | 1804 | if result == ProcessResult::Purgatory { |
| 1831 | // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly | ||
| 1832 | // so that state events (kind 30618) are synced for this purgatory announcement | ||
| 1833 | if event.kind == Kind::GitRepoAnnouncement { | ||
| 1834 | if let Some(identifier) = event.tags.iter().find_map(|tag| { | ||
| 1835 | let tag_vec = tag.as_slice(); | ||
| 1836 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { | ||
| 1837 | Some(tag_vec[1].to_string()) | ||
| 1838 | } else { | ||
| 1839 | None | ||
| 1840 | } | ||
| 1841 | }) { | ||
| 1842 | let repo_id = format!("30617:{}:{}", event.pubkey, identifier); | ||
| 1843 | |||
| 1844 | // Extract relay URLs from the purgatory entry | ||
| 1845 | let relays = write_policy | ||
| 1846 | .purgatory() | ||
| 1847 | .find_announcement(&event.pubkey, &identifier) | ||
| 1848 | .map(|entry| entry.relays) | ||
| 1849 | .unwrap_or_default(); | ||
| 1850 | |||
| 1851 | tracing::info!( | ||
| 1852 | event_id = %event.id, | ||
| 1853 | repo_id = %repo_id, | ||
| 1854 | relay_count = relays.len(), | ||
| 1855 | "Registering purgatory announcement in RepoSyncIndex with StateOnly level" | ||
| 1856 | ); | ||
| 1857 | |||
| 1858 | // Register in RepoSyncIndex with StateOnly level | ||
| 1859 | let mut index = repo_sync_index.write().await; | ||
| 1860 | let entry = index | ||
| 1861 | .entry(repo_id) | ||
| 1862 | .or_insert_with(|| RepoSyncNeeds { | ||
| 1863 | relays: HashSet::new(), | ||
| 1864 | root_events: HashSet::new(), | ||
| 1865 | sync_level: SyncLevel::StateOnly, | ||
| 1866 | }); | ||
| 1867 | entry.relays.extend(relays); | ||
| 1868 | // Don't upgrade sync_level if already Full | ||
| 1869 | // (e.g., if announcement was promoted before this runs) | ||
| 1870 | } | ||
| 1871 | } | ||
| 1872 | // State events (kind 30618) - extract identifier and trigger immediate sync | 1805 | // State events (kind 30618) - extract identifier and trigger immediate sync |
| 1873 | else if event.kind.as_u16() == 30618 { | 1806 | if event.kind.as_u16() == 30618 { |
| 1874 | if let Some(identifier) = event.tags.iter().find_map(|tag| { | 1807 | if let Some(identifier) = event.tags.iter().find_map(|tag| { |
| 1875 | let tag_vec = tag.clone().to_vec(); | 1808 | let tag_vec = tag.clone().to_vec(); |
| 1876 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { | 1809 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 70c3dbf..ab10c49 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -126,14 +126,14 @@ impl SelfSubscriber { | |||
| 126 | 126 | ||
| 127 | /// Get batch window from environment or use default | 127 | /// Get batch window from environment or use default |
| 128 | /// | 128 | /// |
| 129 | /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. | 129 | /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. |
| 130 | /// Default: 5000ms (5 seconds) | 130 | /// Default: 5000ms (5 seconds) |
| 131 | fn get_batch_window() -> Duration { | 131 | fn get_batch_window() -> Duration { |
| 132 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | 132 | if std::env::var("NGIT_TEST").as_deref() == Ok("1") { |
| 133 | .ok() | 133 | Duration::from_millis(200) |
| 134 | .and_then(|s| s.parse::<u64>().ok()) | 134 | } else { |
| 135 | .map(Duration::from_millis) | 135 | Duration::from_millis(5000) |
| 136 | .unwrap_or(Duration::from_millis(5000)) | 136 | } |
| 137 | } | 137 | } |
| 138 | 138 | ||
| 139 | /// Process a relay pool notification | 139 | /// Process a relay pool notification |
diff --git a/tests/common/relay.rs b/tests/common/relay.rs index 0ec9a2e..b1e96cf 100644 --- a/tests/common/relay.rs +++ b/tests/common/relay.rs | |||
| @@ -204,7 +204,7 @@ impl TestRelay { | |||
| 204 | .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) | 204 | .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) |
| 205 | .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation | 205 | .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation |
| 206 | .env("NGIT_OWNER_NPUB", &test_npub) | 206 | .env("NGIT_OWNER_NPUB", &test_npub) |
| 207 | .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) | 207 | .env("NGIT_TEST", "1") // Enable test mode: fast timers (200ms batch window, 200ms purgatory sync) |
| 208 | .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests | 208 | .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests |
| 209 | .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests | 209 | .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests |
| 210 | .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests | 210 | .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests |
diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 61d8e14..ff1eb43 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs | |||
| @@ -377,7 +377,7 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 377 | println!("relay_b started at {}", relay_b.url()); | 377 | println!("relay_b started at {}", relay_b.url()); |
| 378 | 378 | ||
| 379 | // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. | 379 | // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. |
| 380 | // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we | 380 | // The negentropy sync completes within ~200ms (NGIT_TEST=1 sets batch window to 200ms), but we |
| 381 | // allow extra time for slow CI environments. | 381 | // allow extra time for slow CI environments. |
| 382 | tokio::time::sleep(Duration::from_secs(3)).await; | 382 | tokio::time::sleep(Duration::from_secs(3)).await; |
| 383 | println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); | 383 | println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); |