upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs103
-rw-r--r--src/sync/self_subscriber.rs12
2 files changed, 24 insertions, 91 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index ed5b6e7..44efbf0 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -399,21 +399,24 @@ async fn run_daily_timer(
399 399
400/// Background task that periodically syncs purgatory announcements into repo_sync_index. 400/// Background task that periodically syncs purgatory announcements into repo_sync_index.
401/// 401///
402/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there 402/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`).
403/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` 403/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in
404/// which connects to the relay URLs listed in the announcement and subscribes to state 404/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the
405/// events (kind 30618). 405/// relay URLs listed in the announcement and subscribes to state events (kind 30618).
406/// 406///
407/// This covers two cases: 407/// This is the sole registration path for purgatory announcements:
408/// - Sync-path announcements: registered inline during event processing, but this 408/// - Sync-path announcements: registered here within one interval of arriving.
409/// provides a safety net in case the inline registration was missed.
410/// - User-submitted purgatory announcements: the SelfSubscriber never sees them 409/// - User-submitted purgatory announcements: the SelfSubscriber never sees them
411/// (they're rejected from DB), so this timer is the primary registration path. 410/// (they're rejected from DB), so this timer is the only registration path.
412async fn run_purgatory_announcement_sync( 411async fn run_purgatory_announcement_sync(
413 sync_manager: Arc<Mutex<SyncManager>>, 412 sync_manager: Arc<Mutex<SyncManager>>,
414 mut shutdown_rx: broadcast::Receiver<()>, 413 mut shutdown_rx: broadcast::Receiver<()>,
415) { 414) {
416 let interval = Duration::from_secs(5); 415 let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
416 Duration::from_millis(200)
417 } else {
418 Duration::from_secs(5)
419 };
417 loop { 420 loop {
418 tokio::select! { 421 tokio::select! {
419 _ = tokio::time::sleep(interval) => { 422 _ = tokio::time::sleep(interval) => {
@@ -1084,24 +1087,12 @@ impl SyncManager {
1084 { 1087 {
1085 let mut completed_batch = batches.remove(idx); 1088 let mut completed_batch = batches.remove(idx);
1086 completed_batch.failed = true; // Mark as failed 1089 completed_batch.failed = true; // Mark as failed
1087 let is_generic =
1088 completed_batch.items.repos.is_empty()
1089 && completed_batch.items.root_events.is_empty();
1090 if batches.is_empty() { 1090 if batches.is_empty() {
1091 pending.remove(&relay_url_for_fallback); 1091 pending.remove(&relay_url_for_fallback);
1092 } 1092 }
1093 drop(pending); 1093 drop(pending);
1094 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1094 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1095 .await; 1095 .await;
1096 // For generic filter (announcement) batches, recompute filters
1097 // so any purgatory repos registered during this batch get
1098 // state-only subscriptions triggered.
1099 if is_generic {
1100 self.recompute_new_sync_filters_for_relay(
1101 &relay_url_for_fallback,
1102 )
1103 .await;
1104 }
1105 } 1096 }
1106 } 1097 }
1107 return; 1098 return;
@@ -1195,24 +1186,12 @@ impl SyncManager {
1195 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 1186 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
1196 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { 1187 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
1197 let completed_batch = batches.remove(idx); 1188 let completed_batch = batches.remove(idx);
1198 let is_generic =
1199 completed_batch.items.repos.is_empty()
1200 && completed_batch.items.root_events.is_empty();
1201 if batches.is_empty() { 1189 if batches.is_empty() {
1202 pending.remove(&relay_url_for_retry); 1190 pending.remove(&relay_url_for_retry);
1203 } 1191 }
1204 drop(pending); 1192 drop(pending);
1205 self.confirm_batch(&relay_url_for_retry, completed_batch) 1193 self.confirm_batch(&relay_url_for_retry, completed_batch)
1206 .await; 1194 .await;
1207 // For generic filter (announcement) batches, recompute filters
1208 // so any purgatory repos registered during this batch get
1209 // state-only subscriptions triggered.
1210 if is_generic {
1211 self.recompute_new_sync_filters_for_relay(
1212 &relay_url_for_retry,
1213 )
1214 .await;
1215 }
1216 } 1195 }
1217 } 1196 }
1218 return; 1197 return;
@@ -1223,8 +1202,6 @@ impl SyncManager {
1223 1202
1224 // 3. Batch complete - extract and remove 1203 // 3. Batch complete - extract and remove
1225 let completed_batch = batches.remove(batch_idx); 1204 let completed_batch = batches.remove(batch_idx);
1226 let is_generic = completed_batch.items.repos.is_empty()
1227 && completed_batch.items.root_events.is_empty();
1228 1205
1229 // Clean up empty relay entry 1206 // Clean up empty relay entry
1230 if batches.is_empty() { 1207 if batches.is_empty() {
@@ -1236,12 +1213,6 @@ impl SyncManager {
1236 1213
1237 // 4. Confirm the batch (moves items to RelayState) 1214 // 4. Confirm the batch (moves items to RelayState)
1238 self.confirm_batch(relay_url, completed_batch).await; 1215 self.confirm_batch(relay_url, completed_batch).await;
1239
1240 // 5. For generic filter (announcement) batches, recompute sync filters so any
1241 // purgatory repos registered during this batch get state-only subscriptions triggered.
1242 if is_generic {
1243 self.recompute_new_sync_filters_for_relay(relay_url).await;
1244 }
1245 } 1216 }
1246 1217
1247 /// Confirm a completed batch by moving items to RelayState 1218 /// Confirm a completed batch by moving items to RelayState
@@ -1370,7 +1341,7 @@ impl SyncManager {
1370 /// to be batched and create Layer 2/3 filters before we mark sync complete. 1341 /// to be batched and create Layer 2/3 filters before we mark sync complete.
1371 /// 1342 ///
1372 /// The 6-second delay is based on: 1343 /// The 6-second delay is based on:
1373 /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) 1344 /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`)
1374 /// - Buffer for processing: 1 second 1345 /// - Buffer for processing: 1 second
1375 /// 1346 ///
1376 /// Called after each batch is confirmed to detect completion. 1347 /// Called after each batch is confirmed to detect completion.
@@ -1785,7 +1756,6 @@ impl SyncManager {
1785 let eose_tx = self.eose_tx.as_ref().unwrap().clone(); 1756 let eose_tx = self.eose_tx.as_ref().unwrap().clone();
1786 let metrics_clone = self.metrics.clone(); 1757 let metrics_clone = self.metrics.clone();
1787 let pending_sync_index = Arc::clone(&self.pending_sync_index); 1758 let pending_sync_index = Arc::clone(&self.pending_sync_index);
1788 let repo_sync_index = Arc::clone(&self.repo_sync_index);
1789 let health_tracker = Arc::clone(&self.health_tracker); 1759 let health_tracker = Arc::clone(&self.health_tracker);
1790 let rejected_events_index = Arc::clone(&self.rejected_events_index); 1760 let rejected_events_index = Arc::clone(&self.rejected_events_index);
1791 1761
@@ -1827,50 +1797,13 @@ impl SyncManager {
1827 1797
1828 // For sync-triggered events that go to purgatory, trigger immediate sync 1798 // For sync-triggered events that go to purgatory, trigger immediate sync
1829 // (instead of the default 3-minute delay for user-submitted events) 1799 // (instead of the default 3-minute delay for user-submitted events)
1800 //
1801 // Note: announcement events (kind 30617) are registered in repo_sync_index
1802 // by the purgatory announcement sync timer (run_purgatory_announcement_sync)
1803 // rather than inline here.
1830 if result == ProcessResult::Purgatory { 1804 if result == ProcessResult::Purgatory {
1831 // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly
1832 // so that state events (kind 30618) are synced for this purgatory announcement
1833 if event.kind == Kind::GitRepoAnnouncement {
1834 if let Some(identifier) = event.tags.iter().find_map(|tag| {
1835 let tag_vec = tag.as_slice();
1836 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
1837 Some(tag_vec[1].to_string())
1838 } else {
1839 None
1840 }
1841 }) {
1842 let repo_id = format!("30617:{}:{}", event.pubkey, identifier);
1843
1844 // Extract relay URLs from the purgatory entry
1845 let relays = write_policy
1846 .purgatory()
1847 .find_announcement(&event.pubkey, &identifier)
1848 .map(|entry| entry.relays)
1849 .unwrap_or_default();
1850
1851 tracing::info!(
1852 event_id = %event.id,
1853 repo_id = %repo_id,
1854 relay_count = relays.len(),
1855 "Registering purgatory announcement in RepoSyncIndex with StateOnly level"
1856 );
1857
1858 // Register in RepoSyncIndex with StateOnly level
1859 let mut index = repo_sync_index.write().await;
1860 let entry = index
1861 .entry(repo_id)
1862 .or_insert_with(|| RepoSyncNeeds {
1863 relays: HashSet::new(),
1864 root_events: HashSet::new(),
1865 sync_level: SyncLevel::StateOnly,
1866 });
1867 entry.relays.extend(relays);
1868 // Don't upgrade sync_level if already Full
1869 // (e.g., if announcement was promoted before this runs)
1870 }
1871 }
1872 // State events (kind 30618) - extract identifier and trigger immediate sync 1805 // State events (kind 30618) - extract identifier and trigger immediate sync
1873 else if event.kind.as_u16() == 30618 { 1806 if event.kind.as_u16() == 30618 {
1874 if let Some(identifier) = event.tags.iter().find_map(|tag| { 1807 if let Some(identifier) = event.tags.iter().find_map(|tag| {
1875 let tag_vec = tag.clone().to_vec(); 1808 let tag_vec = tag.clone().to_vec();
1876 if tag_vec.len() >= 2 && tag_vec[0] == "d" { 1809 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 70c3dbf..ab10c49 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -126,14 +126,14 @@ impl SelfSubscriber {
126 126
127 /// Get batch window from environment or use default 127 /// Get batch window from environment or use default
128 /// 128 ///
129 /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. 129 /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution.
130 /// Default: 5000ms (5 seconds) 130 /// Default: 5000ms (5 seconds)
131 fn get_batch_window() -> Duration { 131 fn get_batch_window() -> Duration {
132 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") 132 if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
133 .ok() 133 Duration::from_millis(200)
134 .and_then(|s| s.parse::<u64>().ok()) 134 } else {
135 .map(Duration::from_millis) 135 Duration::from_millis(5000)
136 .unwrap_or(Duration::from_millis(5000)) 136 }
137 } 137 }
138 138
139 /// Process a relay pool notification 139 /// Process a relay pool notification