diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 103 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 12 |
2 files changed, 24 insertions, 91 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed5b6e7..44efbf0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -399,21 +399,24 @@ async fn run_daily_timer( | |||
| 399 | 399 | ||
| 400 | /// Background task that periodically syncs purgatory announcements into repo_sync_index. | 400 | /// Background task that periodically syncs purgatory announcements into repo_sync_index. |
| 401 | /// | 401 | /// |
| 402 | /// Runs every 5 seconds. For each announcement currently in purgatory, ensures there | 402 | /// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). |
| 403 | /// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` | 403 | /// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in |
| 404 | /// which connects to the relay URLs listed in the announcement and subscribes to state | 404 | /// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the |
| 405 | /// events (kind 30618). | 405 | /// relay URLs listed in the announcement and subscribes to state events (kind 30618). |
| 406 | /// | 406 | /// |
| 407 | /// This covers two cases: | 407 | /// This is the sole registration path for purgatory announcements: |
| 408 | /// - Sync-path announcements: registered inline during event processing, but this | 408 | /// - Sync-path announcements: registered here within one interval of arriving. |
| 409 | /// provides a safety net in case the inline registration was missed. | ||
| 410 | /// - User-submitted purgatory announcements: the SelfSubscriber never sees them | 409 | /// - User-submitted purgatory announcements: the SelfSubscriber never sees them |
| 411 | /// (they're rejected from DB), so this timer is the primary registration path. | 410 | /// (they're rejected from DB), so this timer is the only registration path. |
| 412 | async fn run_purgatory_announcement_sync( | 411 | async fn run_purgatory_announcement_sync( |
| 413 | sync_manager: Arc<Mutex<SyncManager>>, | 412 | sync_manager: Arc<Mutex<SyncManager>>, |
| 414 | mut shutdown_rx: broadcast::Receiver<()>, | 413 | mut shutdown_rx: broadcast::Receiver<()>, |
| 415 | ) { | 414 | ) { |
| 416 | let interval = Duration::from_secs(5); | 415 | let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { |
| 416 | Duration::from_millis(200) | ||
| 417 | } else { | ||
| 418 | Duration::from_secs(5) | ||
| 419 | }; | ||
| 417 | loop { | 420 | loop { |
| 418 | tokio::select! { | 421 | tokio::select! { |
| 419 | _ = tokio::time::sleep(interval) => { | 422 | _ = tokio::time::sleep(interval) => { |
| @@ -1084,24 +1087,12 @@ impl SyncManager { | |||
| 1084 | { | 1087 | { |
| 1085 | let mut completed_batch = batches.remove(idx); | 1088 | let mut completed_batch = batches.remove(idx); |
| 1086 | completed_batch.failed = true; // Mark as failed | 1089 | completed_batch.failed = true; // Mark as failed |
| 1087 | let is_generic = | ||
| 1088 | completed_batch.items.repos.is_empty() | ||
| 1089 | && completed_batch.items.root_events.is_empty(); | ||
| 1090 | if batches.is_empty() { | 1090 | if batches.is_empty() { |
| 1091 | pending.remove(&relay_url_for_fallback); | 1091 | pending.remove(&relay_url_for_fallback); |
| 1092 | } | 1092 | } |
| 1093 | drop(pending); | 1093 | drop(pending); |
| 1094 | self.confirm_batch(&relay_url_for_fallback, completed_batch) | 1094 | self.confirm_batch(&relay_url_for_fallback, completed_batch) |
| 1095 | .await; | 1095 | .await; |
| 1096 | // For generic filter (announcement) batches, recompute filters | ||
| 1097 | // so any purgatory repos registered during this batch get | ||
| 1098 | // state-only subscriptions triggered. | ||
| 1099 | if is_generic { | ||
| 1100 | self.recompute_new_sync_filters_for_relay( | ||
| 1101 | &relay_url_for_fallback, | ||
| 1102 | ) | ||
| 1103 | .await; | ||
| 1104 | } | ||
| 1105 | } | 1096 | } |
| 1106 | } | 1097 | } |
| 1107 | return; | 1098 | return; |
| @@ -1195,24 +1186,12 @@ impl SyncManager { | |||
| 1195 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | 1186 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { |
| 1196 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { | 1187 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { |
| 1197 | let completed_batch = batches.remove(idx); | 1188 | let completed_batch = batches.remove(idx); |
| 1198 | let is_generic = | ||
| 1199 | completed_batch.items.repos.is_empty() | ||
| 1200 | && completed_batch.items.root_events.is_empty(); | ||
| 1201 | if batches.is_empty() { | 1189 | if batches.is_empty() { |
| 1202 | pending.remove(&relay_url_for_retry); | 1190 | pending.remove(&relay_url_for_retry); |
| 1203 | } | 1191 | } |
| 1204 | drop(pending); | 1192 | drop(pending); |
| 1205 | self.confirm_batch(&relay_url_for_retry, completed_batch) | 1193 | self.confirm_batch(&relay_url_for_retry, completed_batch) |
| 1206 | .await; | 1194 | .await; |
| 1207 | // For generic filter (announcement) batches, recompute filters | ||
| 1208 | // so any purgatory repos registered during this batch get | ||
| 1209 | // state-only subscriptions triggered. | ||
| 1210 | if is_generic { | ||
| 1211 | self.recompute_new_sync_filters_for_relay( | ||
| 1212 | &relay_url_for_retry, | ||
| 1213 | ) | ||
| 1214 | .await; | ||
| 1215 | } | ||
| 1216 | } | 1195 | } |
| 1217 | } | 1196 | } |
| 1218 | return; | 1197 | return; |
| @@ -1223,8 +1202,6 @@ impl SyncManager { | |||
| 1223 | 1202 | ||
| 1224 | // 3. Batch complete - extract and remove | 1203 | // 3. Batch complete - extract and remove |
| 1225 | let completed_batch = batches.remove(batch_idx); | 1204 | let completed_batch = batches.remove(batch_idx); |
| 1226 | let is_generic = completed_batch.items.repos.is_empty() | ||
| 1227 | && completed_batch.items.root_events.is_empty(); | ||
| 1228 | 1205 | ||
| 1229 | // Clean up empty relay entry | 1206 | // Clean up empty relay entry |
| 1230 | if batches.is_empty() { | 1207 | if batches.is_empty() { |
| @@ -1236,12 +1213,6 @@ impl SyncManager { | |||
| 1236 | 1213 | ||
| 1237 | // 4. Confirm the batch (moves items to RelayState) | 1214 | // 4. Confirm the batch (moves items to RelayState) |
| 1238 | self.confirm_batch(relay_url, completed_batch).await; | 1215 | self.confirm_batch(relay_url, completed_batch).await; |
| 1239 | |||
| 1240 | // 5. For generic filter (announcement) batches, recompute sync filters so any | ||
| 1241 | // purgatory repos registered during this batch get state-only subscriptions triggered. | ||
| 1242 | if is_generic { | ||
| 1243 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1244 | } | ||
| 1245 | } | 1216 | } |
| 1246 | 1217 | ||
| 1247 | /// Confirm a completed batch by moving items to RelayState | 1218 | /// Confirm a completed batch by moving items to RelayState |
| @@ -1370,7 +1341,7 @@ impl SyncManager { | |||
| 1370 | /// to be batched and create Layer 2/3 filters before we mark sync complete. | 1341 | /// to be batched and create Layer 2/3 filters before we mark sync complete. |
| 1371 | /// | 1342 | /// |
| 1372 | /// The 6-second delay is based on: | 1343 | /// The 6-second delay is based on: |
| 1373 | /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) | 1344 | /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) |
| 1374 | /// - Buffer for processing: 1 second | 1345 | /// - Buffer for processing: 1 second |
| 1375 | /// | 1346 | /// |
| 1376 | /// Called after each batch is confirmed to detect completion. | 1347 | /// Called after each batch is confirmed to detect completion. |
| @@ -1785,7 +1756,6 @@ impl SyncManager { | |||
| 1785 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); | 1756 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); |
| 1786 | let metrics_clone = self.metrics.clone(); | 1757 | let metrics_clone = self.metrics.clone(); |
| 1787 | let pending_sync_index = Arc::clone(&self.pending_sync_index); | 1758 | let pending_sync_index = Arc::clone(&self.pending_sync_index); |
| 1788 | let repo_sync_index = Arc::clone(&self.repo_sync_index); | ||
| 1789 | let health_tracker = Arc::clone(&self.health_tracker); | 1759 | let health_tracker = Arc::clone(&self.health_tracker); |
| 1790 | let rejected_events_index = Arc::clone(&self.rejected_events_index); | 1760 | let rejected_events_index = Arc::clone(&self.rejected_events_index); |
| 1791 | 1761 | ||
| @@ -1827,50 +1797,13 @@ impl SyncManager { | |||
| 1827 | 1797 | ||
| 1828 | // For sync-triggered events that go to purgatory, trigger immediate sync | 1798 | // For sync-triggered events that go to purgatory, trigger immediate sync |
| 1829 | // (instead of the default 3-minute delay for user-submitted events) | 1799 | // (instead of the default 3-minute delay for user-submitted events) |
| 1800 | // | ||
| 1801 | // Note: announcement events (kind 30617) are registered in repo_sync_index | ||
| 1802 | // by the purgatory announcement sync timer (run_purgatory_announcement_sync) | ||
| 1803 | // rather than inline here. | ||
| 1830 | if result == ProcessResult::Purgatory { | 1804 | if result == ProcessResult::Purgatory { |
| 1831 | // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly | ||
| 1832 | // so that state events (kind 30618) are synced for this purgatory announcement | ||
| 1833 | if event.kind == Kind::GitRepoAnnouncement { | ||
| 1834 | if let Some(identifier) = event.tags.iter().find_map(|tag| { | ||
| 1835 | let tag_vec = tag.as_slice(); | ||
| 1836 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { | ||
| 1837 | Some(tag_vec[1].to_string()) | ||
| 1838 | } else { | ||
| 1839 | None | ||
| 1840 | } | ||
| 1841 | }) { | ||
| 1842 | let repo_id = format!("30617:{}:{}", event.pubkey, identifier); | ||
| 1843 | |||
| 1844 | // Extract relay URLs from the purgatory entry | ||
| 1845 | let relays = write_policy | ||
| 1846 | .purgatory() | ||
| 1847 | .find_announcement(&event.pubkey, &identifier) | ||
| 1848 | .map(|entry| entry.relays) | ||
| 1849 | .unwrap_or_default(); | ||
| 1850 | |||
| 1851 | tracing::info!( | ||
| 1852 | event_id = %event.id, | ||
| 1853 | repo_id = %repo_id, | ||
| 1854 | relay_count = relays.len(), | ||
| 1855 | "Registering purgatory announcement in RepoSyncIndex with StateOnly level" | ||
| 1856 | ); | ||
| 1857 | |||
| 1858 | // Register in RepoSyncIndex with StateOnly level | ||
| 1859 | let mut index = repo_sync_index.write().await; | ||
| 1860 | let entry = index | ||
| 1861 | .entry(repo_id) | ||
| 1862 | .or_insert_with(|| RepoSyncNeeds { | ||
| 1863 | relays: HashSet::new(), | ||
| 1864 | root_events: HashSet::new(), | ||
| 1865 | sync_level: SyncLevel::StateOnly, | ||
| 1866 | }); | ||
| 1867 | entry.relays.extend(relays); | ||
| 1868 | // Don't upgrade sync_level if already Full | ||
| 1869 | // (e.g., if announcement was promoted before this runs) | ||
| 1870 | } | ||
| 1871 | } | ||
| 1872 | // State events (kind 30618) - extract identifier and trigger immediate sync | 1805 | // State events (kind 30618) - extract identifier and trigger immediate sync |
| 1873 | else if event.kind.as_u16() == 30618 { | 1806 | if event.kind.as_u16() == 30618 { |
| 1874 | if let Some(identifier) = event.tags.iter().find_map(|tag| { | 1807 | if let Some(identifier) = event.tags.iter().find_map(|tag| { |
| 1875 | let tag_vec = tag.clone().to_vec(); | 1808 | let tag_vec = tag.clone().to_vec(); |
| 1876 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { | 1809 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 70c3dbf..ab10c49 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -126,14 +126,14 @@ impl SelfSubscriber { | |||
| 126 | 126 | ||
| 127 | /// Get batch window from environment or use default | 127 | /// Get batch window from environment or use default |
| 128 | /// | 128 | /// |
| 129 | /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. | 129 | /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. |
| 130 | /// Default: 5000ms (5 seconds) | 130 | /// Default: 5000ms (5 seconds) |
| 131 | fn get_batch_window() -> Duration { | 131 | fn get_batch_window() -> Duration { |
| 132 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | 132 | if std::env::var("NGIT_TEST").as_deref() == Ok("1") { |
| 133 | .ok() | 133 | Duration::from_millis(200) |
| 134 | .and_then(|s| s.parse::<u64>().ok()) | 134 | } else { |
| 135 | .map(Duration::from_millis) | 135 | Duration::from_millis(5000) |
| 136 | .unwrap_or(Duration::from_millis(5000)) | 136 | } |
| 137 | } | 137 | } |
| 138 | 138 | ||
| 139 | /// Process a relay pool notification | 139 | /// Process a relay pool notification |