diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/algorithms.rs | 58 | ||||
| -rw-r--r-- | src/sync/filters.rs | 31 | ||||
| -rw-r--r-- | src/sync/mod.rs | 167 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 34 |
4 files changed, 265 insertions, 25 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 39788bc..9899abc 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState}; | |||
| 25 | /// this repo need to sync from", it's "what repos does this relay need to sync". | 25 | /// this repo need to sync from", it's "what repos does this relay need to sync". |
| 26 | #[derive(Debug, Clone, Default)] | 26 | #[derive(Debug, Clone, Default)] |
| 27 | pub struct RelaySyncNeeds { | 27 | pub struct RelaySyncNeeds { |
| 28 | /// Repos that need to be synced from this relay | 28 | /// Repos that need full L2+L3 sync from this relay |
| 29 | pub repos: HashSet<String>, | 29 | pub repos: HashSet<String>, |
| 30 | /// Repos that only need state event sync (purgatory announcements) | ||
| 31 | pub state_only_repos: HashSet<String>, | ||
| 30 | /// Root events that need to be tracked from this relay | 32 | /// Root events that need to be tracked from this relay |
| 31 | pub root_events: HashSet<EventId>, | 33 | pub root_events: HashSet<EventId>, |
| 32 | } | 34 | } |
| @@ -67,8 +69,15 @@ pub fn derive_relay_targets( | |||
| 67 | for relay_url in &needs.relays { | 69 | for relay_url in &needs.relays { |
| 68 | let entry = relay_targets.entry(relay_url.clone()).or_default(); | 70 | let entry = relay_targets.entry(relay_url.clone()).or_default(); |
| 69 | 71 | ||
| 70 | entry.repos.insert(repo_id.clone()); | 72 | match needs.sync_level { |
| 71 | entry.root_events.extend(needs.root_events.iter().cloned()); | 73 | super::SyncLevel::Full => { |
| 74 | entry.repos.insert(repo_id.clone()); | ||
| 75 | entry.root_events.extend(needs.root_events.iter().cloned()); | ||
| 76 | } | ||
| 77 | super::SyncLevel::StateOnly => { | ||
| 78 | entry.state_only_repos.insert(repo_id.clone()); | ||
| 79 | } | ||
| 80 | } | ||
| 72 | } | 81 | } |
| 73 | } | 82 | } |
| 74 | 83 | ||
| @@ -96,7 +105,7 @@ pub fn compute_actions( | |||
| 96 | pending: &HashMap<String, Vec<PendingBatch>>, | 105 | pending: &HashMap<String, Vec<PendingBatch>>, |
| 97 | confirmed: &HashMap<String, RelayState>, | 106 | confirmed: &HashMap<String, RelayState>, |
| 98 | ) -> Vec<AddFilters> { | 107 | ) -> Vec<AddFilters> { |
| 99 | use crate::sync::filters::build_layer2_and_layer3_filters; | 108 | use crate::sync::filters::build_sync_level_aware_filters; |
| 100 | 109 | ||
| 101 | let mut actions = Vec::new(); | 110 | let mut actions = Vec::new(); |
| 102 | 111 | ||
| @@ -140,14 +149,22 @@ pub fn compute_actions( | |||
| 140 | .map(|state| state.root_events.clone()) | 149 | .map(|state| state.root_events.clone()) |
| 141 | .unwrap_or_default(); | 150 | .unwrap_or_default(); |
| 142 | 151 | ||
| 143 | // Calculate what's NEW (not in pending, not in confirmed) | 152 | // Calculate what's NEW for full repos (not in pending, not in confirmed) |
| 144 | let new_repos: HashSet<String> = target_needs | 153 | let new_full_repos: HashSet<String> = target_needs |
| 145 | .repos | 154 | .repos |
| 146 | .difference(&pending_repos) | 155 | .difference(&pending_repos) |
| 147 | .filter(|repo| !confirmed_repos.contains(*repo)) | 156 | .filter(|repo| !confirmed_repos.contains(*repo)) |
| 148 | .cloned() | 157 | .cloned() |
| 149 | .collect(); | 158 | .collect(); |
| 150 | 159 | ||
| 160 | // Calculate what's NEW for state-only repos | ||
| 161 | let new_state_only_repos: HashSet<String> = target_needs | ||
| 162 | .state_only_repos | ||
| 163 | .difference(&pending_repos) | ||
| 164 | .filter(|repo| !confirmed_repos.contains(*repo)) | ||
| 165 | .cloned() | ||
| 166 | .collect(); | ||
| 167 | |||
| 151 | let new_events: HashSet<EventId> = target_needs | 168 | let new_events: HashSet<EventId> = target_needs |
| 152 | .root_events | 169 | .root_events |
| 153 | .difference(&pending_events) | 170 | .difference(&pending_events) |
| @@ -156,13 +173,23 @@ pub fn compute_actions( | |||
| 156 | .collect(); | 173 | .collect(); |
| 157 | 174 | ||
| 158 | // If there's anything new, create an AddFilters action | 175 | // If there's anything new, create an AddFilters action |
| 159 | if !new_repos.is_empty() || !new_events.is_empty() { | 176 | if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty() |
| 160 | let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); | 177 | { |
| 178 | let filters = build_sync_level_aware_filters( | ||
| 179 | &new_full_repos, | ||
| 180 | &new_state_only_repos, | ||
| 181 | &new_events, | ||
| 182 | None, | ||
| 183 | ); | ||
| 184 | |||
| 185 | // Combine all repos into pending items (pending tracking doesn't need sync level) | ||
| 186 | let mut all_new_repos = new_full_repos; | ||
| 187 | all_new_repos.extend(new_state_only_repos); | ||
| 161 | 188 | ||
| 162 | actions.push(AddFilters { | 189 | actions.push(AddFilters { |
| 163 | relay_url: relay_url.clone(), | 190 | relay_url: relay_url.clone(), |
| 164 | items: PendingItems { | 191 | items: PendingItems { |
| 165 | repos: new_repos, | 192 | repos: all_new_repos, |
| 166 | root_events: new_events, | 193 | root_events: new_events, |
| 167 | }, | 194 | }, |
| 168 | filters, | 195 | filters, |
| @@ -204,6 +231,7 @@ mod tests { | |||
| 204 | ModRepoSyncNeeds { | 231 | ModRepoSyncNeeds { |
| 205 | relays, | 232 | relays, |
| 206 | root_events, | 233 | root_events, |
| 234 | sync_level: Default::default(), | ||
| 207 | }, | 235 | }, |
| 208 | ); | 236 | ); |
| 209 | 237 | ||
| @@ -229,6 +257,7 @@ mod tests { | |||
| 229 | ModRepoSyncNeeds { | 257 | ModRepoSyncNeeds { |
| 230 | relays, | 258 | relays, |
| 231 | root_events: HashSet::new(), | 259 | root_events: HashSet::new(), |
| 260 | sync_level: Default::default(), | ||
| 232 | }, | 261 | }, |
| 233 | ); | 262 | ); |
| 234 | } | 263 | } |
| @@ -252,6 +281,7 @@ mod tests { | |||
| 252 | ModRepoSyncNeeds { | 281 | ModRepoSyncNeeds { |
| 253 | relays, | 282 | relays, |
| 254 | root_events: HashSet::new(), | 283 | root_events: HashSet::new(), |
| 284 | sync_level: Default::default(), | ||
| 255 | }, | 285 | }, |
| 256 | ); | 286 | ); |
| 257 | 287 | ||
| @@ -285,6 +315,7 @@ mod tests { | |||
| 285 | ModRepoSyncNeeds { | 315 | ModRepoSyncNeeds { |
| 286 | relays: relays1, | 316 | relays: relays1, |
| 287 | root_events: root_events1, | 317 | root_events: root_events1, |
| 318 | sync_level: Default::default(), | ||
| 288 | }, | 319 | }, |
| 289 | ); | 320 | ); |
| 290 | 321 | ||
| @@ -299,6 +330,7 @@ mod tests { | |||
| 299 | ModRepoSyncNeeds { | 330 | ModRepoSyncNeeds { |
| 300 | relays: relays2, | 331 | relays: relays2, |
| 301 | root_events: root_events2, | 332 | root_events: root_events2, |
| 333 | sync_level: Default::default(), | ||
| 302 | }, | 334 | }, |
| 303 | ); | 335 | ); |
| 304 | 336 | ||
| @@ -332,6 +364,7 @@ mod tests { | |||
| 332 | "wss://relay1.com".to_string(), | 364 | "wss://relay1.com".to_string(), |
| 333 | RelaySyncNeeds { | 365 | RelaySyncNeeds { |
| 334 | repos: vec!["repo1".to_string()].into_iter().collect(), | 366 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 367 | state_only_repos: HashSet::new(), | ||
| 335 | root_events: HashSet::new(), | 368 | root_events: HashSet::new(), |
| 336 | }, | 369 | }, |
| 337 | ); | 370 | ); |
| @@ -366,6 +399,7 @@ mod tests { | |||
| 366 | "wss://relay1.com".to_string(), | 399 | "wss://relay1.com".to_string(), |
| 367 | RelaySyncNeeds { | 400 | RelaySyncNeeds { |
| 368 | repos: vec!["repo1".to_string()].into_iter().collect(), | 401 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 402 | state_only_repos: HashSet::new(), | ||
| 369 | root_events: HashSet::new(), | 403 | root_events: HashSet::new(), |
| 370 | }, | 404 | }, |
| 371 | ); | 405 | ); |
| @@ -389,6 +423,7 @@ mod tests { | |||
| 389 | "wss://relay1.com".to_string(), | 423 | "wss://relay1.com".to_string(), |
| 390 | RelaySyncNeeds { | 424 | RelaySyncNeeds { |
| 391 | repos: vec!["repo1".to_string()].into_iter().collect(), | 425 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 426 | state_only_repos: HashSet::new(), | ||
| 392 | root_events: HashSet::new(), | 427 | root_events: HashSet::new(), |
| 393 | }, | 428 | }, |
| 394 | ); | 429 | ); |
| @@ -428,6 +463,7 @@ mod tests { | |||
| 428 | "wss://relay1.com".to_string(), | 463 | "wss://relay1.com".to_string(), |
| 429 | RelaySyncNeeds { | 464 | RelaySyncNeeds { |
| 430 | repos: vec!["repo1".to_string()].into_iter().collect(), | 465 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 466 | state_only_repos: HashSet::new(), | ||
| 431 | root_events: HashSet::new(), | 467 | root_events: HashSet::new(), |
| 432 | }, | 468 | }, |
| 433 | ); | 469 | ); |
| @@ -465,6 +501,7 @@ mod tests { | |||
| 465 | "wss://relay1.com".to_string(), | 501 | "wss://relay1.com".to_string(), |
| 466 | RelaySyncNeeds { | 502 | RelaySyncNeeds { |
| 467 | repos: vec!["repo1".to_string()].into_iter().collect(), | 503 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 504 | state_only_repos: HashSet::new(), | ||
| 468 | root_events: HashSet::new(), | 505 | root_events: HashSet::new(), |
| 469 | }, | 506 | }, |
| 470 | ); | 507 | ); |
| @@ -510,6 +547,7 @@ mod tests { | |||
| 510 | ] | 547 | ] |
| 511 | .into_iter() | 548 | .into_iter() |
| 512 | .collect(), | 549 | .collect(), |
| 550 | state_only_repos: HashSet::new(), | ||
| 513 | root_events: HashSet::new(), | 551 | root_events: HashSet::new(), |
| 514 | }, | 552 | }, |
| 515 | ); | 553 | ); |
| @@ -572,6 +610,7 @@ mod tests { | |||
| 572 | "wss://relay1.com".to_string(), | 610 | "wss://relay1.com".to_string(), |
| 573 | RelaySyncNeeds { | 611 | RelaySyncNeeds { |
| 574 | repos: HashSet::new(), | 612 | repos: HashSet::new(), |
| 613 | state_only_repos: HashSet::new(), | ||
| 575 | root_events: vec![event_id].into_iter().collect(), | 614 | root_events: vec![event_id].into_iter().collect(), |
| 576 | }, | 615 | }, |
| 577 | ); | 616 | ); |
| @@ -599,6 +638,7 @@ mod tests { | |||
| 599 | "wss://new-relay.com".to_string(), | 638 | "wss://new-relay.com".to_string(), |
| 600 | RelaySyncNeeds { | 639 | RelaySyncNeeds { |
| 601 | repos: vec!["repo1".to_string()].into_iter().collect(), | 640 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 641 | state_only_repos: HashSet::new(), | ||
| 602 | root_events: HashSet::new(), | 642 | root_events: HashSet::new(), |
| 603 | }, | 643 | }, |
| 604 | ); | 644 | ); |
diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 3592489..1215e81 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs | |||
| @@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters( | |||
| 245 | filters | 245 | filters |
| 246 | } | 246 | } |
| 247 | 247 | ||
| 248 | /// Builds filters respecting SyncLevel for each repo | ||
| 249 | /// | ||
| 250 | /// StateOnly repos only get state event filters (kind 30618). | ||
| 251 | /// Full repos get all L2/L3 filters (state + repo-tagging + root event). | ||
| 252 | /// | ||
| 253 | /// # Arguments | ||
| 254 | /// * `full_repos` - Repos needing full L2+L3 sync | ||
| 255 | /// * `state_only_repos` - Repos needing only state event sync (purgatory) | ||
| 256 | /// * `root_events` - Root event IDs (only used for Full repos) | ||
| 257 | /// * `since` - Optional timestamp for incremental sync | ||
| 258 | pub fn build_sync_level_aware_filters( | ||
| 259 | full_repos: &HashSet<String>, | ||
| 260 | state_only_repos: &HashSet<String>, | ||
| 261 | root_events: &HashSet<EventId>, | ||
| 262 | since: Option<Timestamp>, | ||
| 263 | ) -> Vec<Filter> { | ||
| 264 | let mut filters = Vec::new(); | ||
| 265 | |||
| 266 | // All repos (both Full and StateOnly) need state event filters | ||
| 267 | let all_repos: HashSet<String> = full_repos.union(state_only_repos).cloned().collect(); | ||
| 268 | filters.extend(state_event_filters_for_our_repos(&all_repos, since)); | ||
| 269 | |||
| 270 | // Only Full repos get repo-tagging and root event filters | ||
| 271 | if !full_repos.is_empty() { | ||
| 272 | filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since)); | ||
| 273 | } | ||
| 274 | filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); | ||
| 275 | |||
| 276 | filters | ||
| 277 | } | ||
| 278 | |||
| 248 | #[cfg(test)] | 279 | #[cfg(test)] |
| 249 | mod tests { | 280 | mod tests { |
| 250 | use super::*; | 281 | use super::*; |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d6634ff..cd62380 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex; | |||
| 85 | // Supporting Data Structures | 85 | // Supporting Data Structures |
| 86 | // ============================================================================= | 86 | // ============================================================================= |
| 87 | 87 | ||
| 88 | /// Level of sync needed for a repository | ||
| 89 | /// | ||
| 90 | /// Purgatory announcements only need state events synced (to validate git data). | ||
| 91 | /// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.). | ||
| 92 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] | ||
| 93 | pub enum SyncLevel { | ||
| 94 | /// Full L2 + L3 sync (promoted repos with git data) | ||
| 95 | #[default] | ||
| 96 | Full, | ||
| 97 | /// Only state events (kind 30618) - for purgatory announcements | ||
| 98 | StateOnly, | ||
| 99 | } | ||
| 100 | |||
| 88 | /// What repos and root events need to be synced | 101 | /// What repos and root events need to be synced |
| 89 | #[derive(Debug, Clone, Default)] | 102 | #[derive(Debug, Clone, Default)] |
| 90 | pub struct RepoSyncNeeds { | 103 | pub struct RepoSyncNeeds { |
| @@ -92,6 +105,8 @@ pub struct RepoSyncNeeds { | |||
| 92 | pub relays: HashSet<String>, | 105 | pub relays: HashSet<String>, |
| 93 | /// Root event IDs - 1617/1618/1621 - that reference this repo | 106 | /// Root event IDs - 1617/1618/1621 - that reference this repo |
| 94 | pub root_events: HashSet<EventId>, | 107 | pub root_events: HashSet<EventId>, |
| 108 | /// Sync level - StateOnly for purgatory, Full for promoted repos | ||
| 109 | pub sync_level: SyncLevel, | ||
| 95 | } | 110 | } |
| 96 | 111 | ||
| 97 | /// Connection status for a relay | 112 | /// Connection status for a relay |
| @@ -382,6 +397,40 @@ async fn run_daily_timer( | |||
| 382 | } | 397 | } |
| 383 | } | 398 | } |
| 384 | 399 | ||
| 400 | /// Background task that periodically syncs purgatory announcements into repo_sync_index. | ||
| 401 | /// | ||
| 402 | /// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). | ||
| 403 | /// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in | ||
| 404 | /// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the | ||
| 405 | /// relay URLs listed in the announcement and subscribes to state events (kind 30618). | ||
| 406 | /// | ||
| 407 | /// This is the sole registration path for purgatory announcements: | ||
| 408 | /// - Sync-path announcements: registered here within one interval of arriving. | ||
| 409 | /// - User-submitted purgatory announcements: the SelfSubscriber never sees them | ||
| 410 | /// (they're rejected from DB), so this timer is the only registration path. | ||
| 411 | async fn run_purgatory_announcement_sync( | ||
| 412 | sync_manager: Arc<Mutex<SyncManager>>, | ||
| 413 | mut shutdown_rx: broadcast::Receiver<()>, | ||
| 414 | ) { | ||
| 415 | let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { | ||
| 416 | Duration::from_millis(200) | ||
| 417 | } else { | ||
| 418 | Duration::from_secs(5) | ||
| 419 | }; | ||
| 420 | loop { | ||
| 421 | tokio::select! { | ||
| 422 | _ = tokio::time::sleep(interval) => { | ||
| 423 | let mut manager = sync_manager.lock().await; | ||
| 424 | manager.sync_purgatory_announcements_to_index().await; | ||
| 425 | } | ||
| 426 | _ = shutdown_rx.recv() => { | ||
| 427 | tracing::debug!("Purgatory announcement sync timer received shutdown signal"); | ||
| 428 | break; | ||
| 429 | } | ||
| 430 | } | ||
| 431 | } | ||
| 432 | } | ||
| 433 | |||
| 385 | // Combined Health and Metrics Checker | 434 | // Combined Health and Metrics Checker |
| 386 | 435 | ||
| 387 | /// Background task for cleaning up expired entries from the rejected events index | 436 | /// Background task for cleaning up expired entries from the rejected events index |
| @@ -936,9 +985,29 @@ impl SyncManager { | |||
| 936 | 985 | ||
| 937 | // Create REQ+EOSE subscriptions using original semantic filters | 986 | // Create REQ+EOSE subscriptions using original semantic filters |
| 938 | // This queries by kind/author/tags instead of by ID, which may | 987 | // This queries by kind/author/tags instead of by ID, which may |
| 939 | // succeed even when ID-based queries fail | 988 | // succeed even when ID-based queries fail. |
| 940 | let fallback_filters = filters::build_layer2_and_layer3_filters( | 989 | // Split batch_repos by SyncLevel to avoid sending Layer 2 filters |
| 941 | &batch_repos, | 990 | // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be |
| 991 | // rejected as orphan and then silently dropped by nostr-sdk deduplication. | ||
| 992 | let (full_repos, state_only_repos) = { | ||
| 993 | let repo_index = self.repo_sync_index.read().await; | ||
| 994 | let mut full = HashSet::new(); | ||
| 995 | let mut state_only = HashSet::new(); | ||
| 996 | for repo_ref in &batch_repos { | ||
| 997 | match repo_index.get(repo_ref).map(|n| n.sync_level) { | ||
| 998 | Some(SyncLevel::StateOnly) => { | ||
| 999 | state_only.insert(repo_ref.clone()); | ||
| 1000 | } | ||
| 1001 | _ => { | ||
| 1002 | full.insert(repo_ref.clone()); | ||
| 1003 | } | ||
| 1004 | } | ||
| 1005 | } | ||
| 1006 | (full, state_only) | ||
| 1007 | }; | ||
| 1008 | let fallback_filters = filters::build_sync_level_aware_filters( | ||
| 1009 | &full_repos, | ||
| 1010 | &state_only_repos, | ||
| 942 | &batch_root_events, | 1011 | &batch_root_events, |
| 943 | None, | 1012 | None, |
| 944 | ); | 1013 | ); |
| @@ -1272,7 +1341,7 @@ impl SyncManager { | |||
| 1272 | /// to be batched and create Layer 2/3 filters before we mark sync complete. | 1341 | /// to be batched and create Layer 2/3 filters before we mark sync complete. |
| 1273 | /// | 1342 | /// |
| 1274 | /// The 6-second delay is based on: | 1343 | /// The 6-second delay is based on: |
| 1275 | /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) | 1344 | /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) |
| 1276 | /// - Buffer for processing: 1 second | 1345 | /// - Buffer for processing: 1 second |
| 1277 | /// | 1346 | /// |
| 1278 | /// Called after each batch is confirmed to detect completion. | 1347 | /// Called after each batch is confirmed to detect completion. |
| @@ -1486,7 +1555,17 @@ impl SyncManager { | |||
| 1486 | run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; | 1555 | run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; |
| 1487 | }); | 1556 | }); |
| 1488 | 1557 | ||
| 1489 | // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 1558 | // 11. Spawn purgatory announcement sync timer (every 5s) |
| 1559 | // Ensures purgatory announcements (including user-submitted ones that never | ||
| 1560 | // touch the DB) are registered in repo_sync_index as StateOnly so that | ||
| 1561 | // state event subscriptions are established on their listed relay URLs. | ||
| 1562 | let purgatory_sync_manager = Arc::clone(&sync_manager); | ||
| 1563 | let purgatory_sync_shutdown = shutdown_tx.subscribe(); | ||
| 1564 | tokio::spawn(async move { | ||
| 1565 | run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await; | ||
| 1566 | }); | ||
| 1567 | |||
| 1568 | // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | ||
| 1490 | loop { | 1569 | loop { |
| 1491 | // Wait for an event without holding the lock | 1570 | // Wait for an event without holding the lock |
| 1492 | tokio::select! { | 1571 | tokio::select! { |
| @@ -1719,6 +1798,10 @@ impl SyncManager { | |||
| 1719 | 1798 | ||
| 1720 | // For sync-triggered events that go to purgatory, trigger immediate sync | 1799 | // For sync-triggered events that go to purgatory, trigger immediate sync |
| 1721 | // (instead of the default 3-minute delay for user-submitted events) | 1800 | // (instead of the default 3-minute delay for user-submitted events) |
| 1801 | // | ||
| 1802 | // Note: announcement events (kind 30617) are registered in repo_sync_index | ||
| 1803 | // by the purgatory announcement sync timer (run_purgatory_announcement_sync) | ||
| 1804 | // rather than inline here. | ||
| 1722 | if result == ProcessResult::Purgatory { | 1805 | if result == ProcessResult::Purgatory { |
| 1723 | // State events (kind 30618) - extract identifier and trigger immediate sync | 1806 | // State events (kind 30618) - extract identifier and trigger immediate sync |
| 1724 | if event.kind.as_u16() == 30618 { | 1807 | if event.kind.as_u16() == 30618 { |
| @@ -2303,6 +2386,80 @@ impl SyncManager { | |||
| 2303 | } | 2386 | } |
| 2304 | } | 2387 | } |
| 2305 | 2388 | ||
| 2389 | /// Sync purgatory announcements into repo_sync_index as StateOnly entries. | ||
| 2390 | /// | ||
| 2391 | /// Called periodically by the purgatory announcement sync timer (every 5s). | ||
| 2392 | /// For each announcement currently in purgatory, ensures a `StateOnly` entry | ||
| 2393 | /// exists in `repo_sync_index`. New entries are then picked up by | ||
| 2394 | /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes | ||
| 2395 | /// to state events for that repo. | ||
| 2396 | /// | ||
| 2397 | /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full). | ||
| 2398 | async fn sync_purgatory_announcements_to_index(&mut self) { | ||
| 2399 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | ||
| 2400 | |||
| 2401 | // Collect all purgatory announcements (snapshot - no async holds) | ||
| 2402 | let announcements = self.purgatory.announcements_for_sync(); | ||
| 2403 | |||
| 2404 | if announcements.is_empty() { | ||
| 2405 | return; | ||
| 2406 | } | ||
| 2407 | |||
| 2408 | // Register any new entries in repo_sync_index as StateOnly | ||
| 2409 | let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new(); | ||
| 2410 | { | ||
| 2411 | let mut index = self.repo_sync_index.write().await; | ||
| 2412 | for (repo_id, relays) in &announcements { | ||
| 2413 | let entry = index.entry(repo_id.clone()).or_insert_with(|| { | ||
| 2414 | tracing::debug!( | ||
| 2415 | repo_id = %repo_id, | ||
| 2416 | "Registering purgatory announcement in repo_sync_index as StateOnly" | ||
| 2417 | ); | ||
| 2418 | RepoSyncNeeds { | ||
| 2419 | relays: std::collections::HashSet::new(), | ||
| 2420 | root_events: std::collections::HashSet::new(), | ||
| 2421 | sync_level: SyncLevel::StateOnly, | ||
| 2422 | } | ||
| 2423 | }); | ||
| 2424 | // Don't downgrade an already-Full entry | ||
| 2425 | // Add any new relay URLs | ||
| 2426 | for relay in relays { | ||
| 2427 | if entry.relays.insert(relay.clone()) { | ||
| 2428 | new_relay_urls.insert(relay.clone()); | ||
| 2429 | } | ||
| 2430 | } | ||
| 2431 | } | ||
| 2432 | } | ||
| 2433 | |||
| 2434 | if new_relay_urls.is_empty() { | ||
| 2435 | return; | ||
| 2436 | } | ||
| 2437 | |||
| 2438 | // For any relay URLs that are new, compute and send AddFilters actions | ||
| 2439 | let all_targets = { | ||
| 2440 | let repo_index = self.repo_sync_index.read().await; | ||
| 2441 | derive_relay_targets(&repo_index) | ||
| 2442 | }; | ||
| 2443 | |||
| 2444 | let actions = { | ||
| 2445 | let pending_index = self.pending_sync_index.read().await; | ||
| 2446 | let relay_index = self.relay_sync_index.read().await; | ||
| 2447 | compute_actions(&all_targets, &pending_index, &relay_index) | ||
| 2448 | }; | ||
| 2449 | |||
| 2450 | for action in actions { | ||
| 2451 | // Only act on relays that have new URLs (avoids redundant work) | ||
| 2452 | if new_relay_urls.contains(&action.relay_url) { | ||
| 2453 | tracing::info!( | ||
| 2454 | relay = %action.relay_url, | ||
| 2455 | repos = action.items.repos.len(), | ||
| 2456 | "Purgatory sync timer: connecting to new relay from purgatory announcement" | ||
| 2457 | ); | ||
| 2458 | self.handle_new_sync_filters(action).await; | ||
| 2459 | } | ||
| 2460 | } | ||
| 2461 | } | ||
| 2462 | |||
| 2306 | /// Handle a relay disconnection | 2463 | /// Handle a relay disconnection |
| 2307 | /// | 2464 | /// |
| 2308 | /// This method is called when the event loop terminates and sends a disconnect notification. | 2465 | /// This method is called when the event loop terminates and sends a disconnect notification. |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 86e4583..4d69c9a 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -18,7 +18,7 @@ use tokio::sync::{broadcast, mpsc}; | |||
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | 19 | use crate::nostr::builder::SharedDatabase; |
| 20 | 20 | ||
| 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; |
| 22 | 22 | ||
| 23 | // ============================================================================= | 23 | // ============================================================================= |
| 24 | // LoopControl - Result of notification processing | 24 | // LoopControl - Result of notification processing |
| @@ -60,6 +60,7 @@ impl PendingUpdates { | |||
| 60 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { | 60 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { |
| 61 | relays: HashSet::new(), | 61 | relays: HashSet::new(), |
| 62 | root_events: HashSet::new(), | 62 | root_events: HashSet::new(), |
| 63 | sync_level: SyncLevel::Full, | ||
| 63 | }); | 64 | }); |
| 64 | entry.relays.extend(relays); | 65 | entry.relays.extend(relays); |
| 65 | entry.root_events.extend(root_events); | 66 | entry.root_events.extend(root_events); |
| @@ -132,14 +133,14 @@ impl SelfSubscriber { | |||
| 132 | 133 | ||
| 133 | /// Get batch window from environment or use default | 134 | /// Get batch window from environment or use default |
| 134 | /// | 135 | /// |
| 135 | /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. | 136 | /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. |
| 136 | /// Default: 5000ms (5 seconds) | 137 | /// Default: 5000ms (5 seconds) |
| 137 | fn get_batch_window() -> Duration { | 138 | fn get_batch_window() -> Duration { |
| 138 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | 139 | if std::env::var("NGIT_TEST").as_deref() == Ok("1") { |
| 139 | .ok() | 140 | Duration::from_millis(200) |
| 140 | .and_then(|s| s.parse::<u64>().ok()) | 141 | } else { |
| 141 | .map(Duration::from_millis) | 142 | Duration::from_millis(5000) |
| 142 | .unwrap_or(Duration::from_millis(5000)) | 143 | } |
| 143 | } | 144 | } |
| 144 | 145 | ||
| 145 | /// Load existing events from database on startup | 146 | /// Load existing events from database on startup |
| @@ -197,6 +198,7 @@ impl SelfSubscriber { | |||
| 197 | .or_insert_with(|| RepoSyncNeeds { | 198 | .or_insert_with(|| RepoSyncNeeds { |
| 198 | relays: HashSet::new(), | 199 | relays: HashSet::new(), |
| 199 | root_events: HashSet::new(), | 200 | root_events: HashSet::new(), |
| 201 | sync_level: SyncLevel::StateOnly, | ||
| 200 | }); | 202 | }); |
| 201 | entry.relays.extend(needs.relays.clone()); | 203 | entry.relays.extend(needs.relays.clone()); |
| 202 | } | 204 | } |
| @@ -570,7 +572,12 @@ impl SelfSubscriber { | |||
| 570 | .or_insert_with(|| RepoSyncNeeds { | 572 | .or_insert_with(|| RepoSyncNeeds { |
| 571 | relays: HashSet::new(), | 573 | relays: HashSet::new(), |
| 572 | root_events: HashSet::new(), | 574 | root_events: HashSet::new(), |
| 575 | sync_level: SyncLevel::Full, | ||
| 573 | }); | 576 | }); |
| 577 | // Upgrade sync_level to Full - this handles the case where the entry | ||
| 578 | // already exists as StateOnly (purgatory announcement) and is now being | ||
| 579 | // promoted (git data arrived and the event was broadcast via notify_event). | ||
| 580 | entry.sync_level = SyncLevel::Full; | ||
| 574 | entry.relays.extend(needs.relays); | 581 | entry.relays.extend(needs.relays); |
| 575 | entry.root_events.extend(needs.root_events); | 582 | entry.root_events.extend(needs.root_events); |
| 576 | 583 | ||
| @@ -594,21 +601,26 @@ impl SelfSubscriber { | |||
| 594 | continue; | 601 | continue; |
| 595 | } | 602 | } |
| 596 | 603 | ||
| 597 | // Build filters for these repos | 604 | // Build filters for these repos (sync-level-aware) |
| 598 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | 605 | let filters = crate::sync::filters::build_sync_level_aware_filters( |
| 599 | &needs.repos, | 606 | &needs.repos, |
| 607 | &needs.state_only_repos, | ||
| 600 | &needs.root_events, | 608 | &needs.root_events, |
| 601 | None, | 609 | None, |
| 602 | ); | 610 | ); |
| 603 | 611 | ||
| 604 | // Log before moving values | 612 | // Log before moving values |
| 605 | let repo_count = needs.repos.len(); | 613 | let repo_count = needs.repos.len() + needs.state_only_repos.len(); |
| 606 | let event_count = needs.root_events.len(); | 614 | let event_count = needs.root_events.len(); |
| 607 | 615 | ||
| 616 | // Combine all repos into pending items | ||
| 617 | let mut all_repos = needs.repos; | ||
| 618 | all_repos.extend(needs.state_only_repos); | ||
| 619 | |||
| 608 | let action = AddFilters { | 620 | let action = AddFilters { |
| 609 | relay_url: relay_url.clone(), | 621 | relay_url: relay_url.clone(), |
| 610 | items: crate::sync::PendingItems { | 622 | items: crate::sync::PendingItems { |
| 611 | repos: needs.repos, | 623 | repos: all_repos, |
| 612 | root_events: needs.root_events, | 624 | root_events: needs.root_events, |
| 613 | }, | 625 | }, |
| 614 | filters, | 626 | filters, |