upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 12:48:26 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 12:48:26 +0000
commitf62ef12fb84e2210f9a0a67a5e1e574a8ee66c16 (patch)
treeea457689e8507ae01cbf99fa4c210763f836c7ac
parent49401286ea7413f834197e6a5b221649e10e2ad8 (diff)
refactor: replace inline purgatory sync registration with timer-only approach
Remove the redundant inline kind-30617 registration block from the sync event loop and the three is_generic/recompute_new_sync_filters_for_relay calls from confirm_batch error paths. The purgatory announcement sync timer (run_purgatory_announcement_sync) is now the sole registration path. Consolidate NGIT_SYNC_BATCH_WINDOW_MS and NGIT_PURGATORY_SYNC_INTERVAL_MS into a single NGIT_TEST=1 flag that sets both timers to 200ms, replacing two ad-hoc env vars with one reusable test-mode flag.
-rw-r--r--src/sync/mod.rs103
-rw-r--r--src/sync/self_subscriber.rs12
-rw-r--r--tests/common/relay.rs2
-rw-r--r--tests/sync/maintainer_reprocessing.rs2
4 files changed, 26 insertions, 93 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index ed5b6e7..44efbf0 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -399,21 +399,24 @@ async fn run_daily_timer(
399 399
400/// Background task that periodically syncs purgatory announcements into repo_sync_index. 400/// Background task that periodically syncs purgatory announcements into repo_sync_index.
401/// 401///
402/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there 402/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`).
403/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` 403/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in
404/// which connects to the relay URLs listed in the announcement and subscribes to state 404/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the
405/// events (kind 30618). 405/// relay URLs listed in the announcement and subscribes to state events (kind 30618).
406/// 406///
407/// This covers two cases: 407/// This is the sole registration path for purgatory announcements:
408/// - Sync-path announcements: registered inline during event processing, but this 408/// - Sync-path announcements: registered here within one interval of arriving.
409/// provides a safety net in case the inline registration was missed.
410/// - User-submitted purgatory announcements: the SelfSubscriber never sees them 409/// - User-submitted purgatory announcements: the SelfSubscriber never sees them
411/// (they're rejected from DB), so this timer is the primary registration path. 410/// (they're rejected from DB), so this timer is the only registration path.
412async fn run_purgatory_announcement_sync( 411async fn run_purgatory_announcement_sync(
413 sync_manager: Arc<Mutex<SyncManager>>, 412 sync_manager: Arc<Mutex<SyncManager>>,
414 mut shutdown_rx: broadcast::Receiver<()>, 413 mut shutdown_rx: broadcast::Receiver<()>,
415) { 414) {
416 let interval = Duration::from_secs(5); 415 let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
416 Duration::from_millis(200)
417 } else {
418 Duration::from_secs(5)
419 };
417 loop { 420 loop {
418 tokio::select! { 421 tokio::select! {
419 _ = tokio::time::sleep(interval) => { 422 _ = tokio::time::sleep(interval) => {
@@ -1084,24 +1087,12 @@ impl SyncManager {
1084 { 1087 {
1085 let mut completed_batch = batches.remove(idx); 1088 let mut completed_batch = batches.remove(idx);
1086 completed_batch.failed = true; // Mark as failed 1089 completed_batch.failed = true; // Mark as failed
1087 let is_generic =
1088 completed_batch.items.repos.is_empty()
1089 && completed_batch.items.root_events.is_empty();
1090 if batches.is_empty() { 1090 if batches.is_empty() {
1091 pending.remove(&relay_url_for_fallback); 1091 pending.remove(&relay_url_for_fallback);
1092 } 1092 }
1093 drop(pending); 1093 drop(pending);
1094 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1094 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1095 .await; 1095 .await;
1096 // For generic filter (announcement) batches, recompute filters
1097 // so any purgatory repos registered during this batch get
1098 // state-only subscriptions triggered.
1099 if is_generic {
1100 self.recompute_new_sync_filters_for_relay(
1101 &relay_url_for_fallback,
1102 )
1103 .await;
1104 }
1105 } 1096 }
1106 } 1097 }
1107 return; 1098 return;
@@ -1195,24 +1186,12 @@ impl SyncManager {
1195 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 1186 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
1196 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { 1187 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
1197 let completed_batch = batches.remove(idx); 1188 let completed_batch = batches.remove(idx);
1198 let is_generic =
1199 completed_batch.items.repos.is_empty()
1200 && completed_batch.items.root_events.is_empty();
1201 if batches.is_empty() { 1189 if batches.is_empty() {
1202 pending.remove(&relay_url_for_retry); 1190 pending.remove(&relay_url_for_retry);
1203 } 1191 }
1204 drop(pending); 1192 drop(pending);
1205 self.confirm_batch(&relay_url_for_retry, completed_batch) 1193 self.confirm_batch(&relay_url_for_retry, completed_batch)
1206 .await; 1194 .await;
1207 // For generic filter (announcement) batches, recompute filters
1208 // so any purgatory repos registered during this batch get
1209 // state-only subscriptions triggered.
1210 if is_generic {
1211 self.recompute_new_sync_filters_for_relay(
1212 &relay_url_for_retry,
1213 )
1214 .await;
1215 }
1216 } 1195 }
1217 } 1196 }
1218 return; 1197 return;
@@ -1223,8 +1202,6 @@ impl SyncManager {
1223 1202
1224 // 3. Batch complete - extract and remove 1203 // 3. Batch complete - extract and remove
1225 let completed_batch = batches.remove(batch_idx); 1204 let completed_batch = batches.remove(batch_idx);
1226 let is_generic = completed_batch.items.repos.is_empty()
1227 && completed_batch.items.root_events.is_empty();
1228 1205
1229 // Clean up empty relay entry 1206 // Clean up empty relay entry
1230 if batches.is_empty() { 1207 if batches.is_empty() {
@@ -1236,12 +1213,6 @@ impl SyncManager {
1236 1213
1237 // 4. Confirm the batch (moves items to RelayState) 1214 // 4. Confirm the batch (moves items to RelayState)
1238 self.confirm_batch(relay_url, completed_batch).await; 1215 self.confirm_batch(relay_url, completed_batch).await;
1239
1240 // 5. For generic filter (announcement) batches, recompute sync filters so any
1241 // purgatory repos registered during this batch get state-only subscriptions triggered.
1242 if is_generic {
1243 self.recompute_new_sync_filters_for_relay(relay_url).await;
1244 }
1245 } 1216 }
1246 1217
1247 /// Confirm a completed batch by moving items to RelayState 1218 /// Confirm a completed batch by moving items to RelayState
@@ -1370,7 +1341,7 @@ impl SyncManager {
1370 /// to be batched and create Layer 2/3 filters before we mark sync complete. 1341 /// to be batched and create Layer 2/3 filters before we mark sync complete.
1371 /// 1342 ///
1372 /// The 6-second delay is based on: 1343 /// The 6-second delay is based on:
1373 /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) 1344 /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`)
1374 /// - Buffer for processing: 1 second 1345 /// - Buffer for processing: 1 second
1375 /// 1346 ///
1376 /// Called after each batch is confirmed to detect completion. 1347 /// Called after each batch is confirmed to detect completion.
@@ -1785,7 +1756,6 @@ impl SyncManager {
1785 let eose_tx = self.eose_tx.as_ref().unwrap().clone(); 1756 let eose_tx = self.eose_tx.as_ref().unwrap().clone();
1786 let metrics_clone = self.metrics.clone(); 1757 let metrics_clone = self.metrics.clone();
1787 let pending_sync_index = Arc::clone(&self.pending_sync_index); 1758 let pending_sync_index = Arc::clone(&self.pending_sync_index);
1788 let repo_sync_index = Arc::clone(&self.repo_sync_index);
1789 let health_tracker = Arc::clone(&self.health_tracker); 1759 let health_tracker = Arc::clone(&self.health_tracker);
1790 let rejected_events_index = Arc::clone(&self.rejected_events_index); 1760 let rejected_events_index = Arc::clone(&self.rejected_events_index);
1791 1761
@@ -1827,50 +1797,13 @@ impl SyncManager {
1827 1797
1828 // For sync-triggered events that go to purgatory, trigger immediate sync 1798 // For sync-triggered events that go to purgatory, trigger immediate sync
1829 // (instead of the default 3-minute delay for user-submitted events) 1799 // (instead of the default 3-minute delay for user-submitted events)
1800 //
1801 // Note: announcement events (kind 30617) are registered in repo_sync_index
1802 // by the purgatory announcement sync timer (run_purgatory_announcement_sync)
1803 // rather than inline here.
1830 if result == ProcessResult::Purgatory { 1804 if result == ProcessResult::Purgatory {
1831 // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly
1832 // so that state events (kind 30618) are synced for this purgatory announcement
1833 if event.kind == Kind::GitRepoAnnouncement {
1834 if let Some(identifier) = event.tags.iter().find_map(|tag| {
1835 let tag_vec = tag.as_slice();
1836 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
1837 Some(tag_vec[1].to_string())
1838 } else {
1839 None
1840 }
1841 }) {
1842 let repo_id = format!("30617:{}:{}", event.pubkey, identifier);
1843
1844 // Extract relay URLs from the purgatory entry
1845 let relays = write_policy
1846 .purgatory()
1847 .find_announcement(&event.pubkey, &identifier)
1848 .map(|entry| entry.relays)
1849 .unwrap_or_default();
1850
1851 tracing::info!(
1852 event_id = %event.id,
1853 repo_id = %repo_id,
1854 relay_count = relays.len(),
1855 "Registering purgatory announcement in RepoSyncIndex with StateOnly level"
1856 );
1857
1858 // Register in RepoSyncIndex with StateOnly level
1859 let mut index = repo_sync_index.write().await;
1860 let entry = index
1861 .entry(repo_id)
1862 .or_insert_with(|| RepoSyncNeeds {
1863 relays: HashSet::new(),
1864 root_events: HashSet::new(),
1865 sync_level: SyncLevel::StateOnly,
1866 });
1867 entry.relays.extend(relays);
1868 // Don't upgrade sync_level if already Full
1869 // (e.g., if announcement was promoted before this runs)
1870 }
1871 }
1872 // State events (kind 30618) - extract identifier and trigger immediate sync 1805 // State events (kind 30618) - extract identifier and trigger immediate sync
1873 else if event.kind.as_u16() == 30618 { 1806 if event.kind.as_u16() == 30618 {
1874 if let Some(identifier) = event.tags.iter().find_map(|tag| { 1807 if let Some(identifier) = event.tags.iter().find_map(|tag| {
1875 let tag_vec = tag.clone().to_vec(); 1808 let tag_vec = tag.clone().to_vec();
1876 if tag_vec.len() >= 2 && tag_vec[0] == "d" { 1809 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 70c3dbf..ab10c49 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -126,14 +126,14 @@ impl SelfSubscriber {
126 126
127 /// Get batch window from environment or use default 127 /// Get batch window from environment or use default
128 /// 128 ///
129 /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. 129 /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution.
130 /// Default: 5000ms (5 seconds) 130 /// Default: 5000ms (5 seconds)
131 fn get_batch_window() -> Duration { 131 fn get_batch_window() -> Duration {
132 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") 132 if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
133 .ok() 133 Duration::from_millis(200)
134 .and_then(|s| s.parse::<u64>().ok()) 134 } else {
135 .map(Duration::from_millis) 135 Duration::from_millis(5000)
136 .unwrap_or(Duration::from_millis(5000)) 136 }
137 } 137 }
138 138
139 /// Process a relay pool notification 139 /// Process a relay pool notification
diff --git a/tests/common/relay.rs b/tests/common/relay.rs
index 0ec9a2e..b1e96cf 100644
--- a/tests/common/relay.rs
+++ b/tests/common/relay.rs
@@ -204,7 +204,7 @@ impl TestRelay {
204 .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) 204 .env("NGIT_GIT_DATA_PATH", git_data_dir.path())
205 .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation 205 .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation
206 .env("NGIT_OWNER_NPUB", &test_npub) 206 .env("NGIT_OWNER_NPUB", &test_npub)
207 .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) 207 .env("NGIT_TEST", "1") // Enable test mode: fast timers (200ms batch window, 200ms purgatory sync)
208 .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests 208 .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests
209 .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests 209 .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests
210 .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests 210 .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests
diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs
index 61d8e14..ff1eb43 100644
--- a/tests/sync/maintainer_reprocessing.rs
+++ b/tests/sync/maintainer_reprocessing.rs
@@ -377,7 +377,7 @@ async fn test_multiple_maintainers_all_reprocessed() {
377 println!("relay_b started at {}", relay_b.url()); 377 println!("relay_b started at {}", relay_b.url());
378 378
379 // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. 379 // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a.
380 // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we 380 // The negentropy sync completes within ~200ms (NGIT_TEST=1 sets batch window to 200ms), but we
381 // allow extra time for slow CI environments. 381 // allow extra time for slow CI environments.
382 tokio::time::sleep(Duration::from_secs(3)).await; 382 tokio::time::sleep(Duration::from_secs(3)).await;
383 println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); 383 println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)");