upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/algorithms.rs58
-rw-r--r--src/sync/filters.rs31
-rw-r--r--src/sync/mod.rs167
-rw-r--r--src/sync/self_subscriber.rs34
4 files changed, 265 insertions, 25 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index 39788bc..9899abc 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState};
25/// this repo need to sync from", it's "what repos does this relay need to sync". 25/// this repo need to sync from", it's "what repos does this relay need to sync".
26#[derive(Debug, Clone, Default)] 26#[derive(Debug, Clone, Default)]
27pub struct RelaySyncNeeds { 27pub struct RelaySyncNeeds {
28 /// Repos that need to be synced from this relay 28 /// Repos that need full L2+L3 sync from this relay
29 pub repos: HashSet<String>, 29 pub repos: HashSet<String>,
30 /// Repos that only need state event sync (purgatory announcements)
31 pub state_only_repos: HashSet<String>,
30 /// Root events that need to be tracked from this relay 32 /// Root events that need to be tracked from this relay
31 pub root_events: HashSet<EventId>, 33 pub root_events: HashSet<EventId>,
32} 34}
@@ -67,8 +69,15 @@ pub fn derive_relay_targets(
67 for relay_url in &needs.relays { 69 for relay_url in &needs.relays {
68 let entry = relay_targets.entry(relay_url.clone()).or_default(); 70 let entry = relay_targets.entry(relay_url.clone()).or_default();
69 71
70 entry.repos.insert(repo_id.clone()); 72 match needs.sync_level {
71 entry.root_events.extend(needs.root_events.iter().cloned()); 73 super::SyncLevel::Full => {
74 entry.repos.insert(repo_id.clone());
75 entry.root_events.extend(needs.root_events.iter().cloned());
76 }
77 super::SyncLevel::StateOnly => {
78 entry.state_only_repos.insert(repo_id.clone());
79 }
80 }
72 } 81 }
73 } 82 }
74 83
@@ -96,7 +105,7 @@ pub fn compute_actions(
96 pending: &HashMap<String, Vec<PendingBatch>>, 105 pending: &HashMap<String, Vec<PendingBatch>>,
97 confirmed: &HashMap<String, RelayState>, 106 confirmed: &HashMap<String, RelayState>,
98) -> Vec<AddFilters> { 107) -> Vec<AddFilters> {
99 use crate::sync::filters::build_layer2_and_layer3_filters; 108 use crate::sync::filters::build_sync_level_aware_filters;
100 109
101 let mut actions = Vec::new(); 110 let mut actions = Vec::new();
102 111
@@ -140,14 +149,22 @@ pub fn compute_actions(
140 .map(|state| state.root_events.clone()) 149 .map(|state| state.root_events.clone())
141 .unwrap_or_default(); 150 .unwrap_or_default();
142 151
143 // Calculate what's NEW (not in pending, not in confirmed) 152 // Calculate what's NEW for full repos (not in pending, not in confirmed)
144 let new_repos: HashSet<String> = target_needs 153 let new_full_repos: HashSet<String> = target_needs
145 .repos 154 .repos
146 .difference(&pending_repos) 155 .difference(&pending_repos)
147 .filter(|repo| !confirmed_repos.contains(*repo)) 156 .filter(|repo| !confirmed_repos.contains(*repo))
148 .cloned() 157 .cloned()
149 .collect(); 158 .collect();
150 159
160 // Calculate what's NEW for state-only repos
161 let new_state_only_repos: HashSet<String> = target_needs
162 .state_only_repos
163 .difference(&pending_repos)
164 .filter(|repo| !confirmed_repos.contains(*repo))
165 .cloned()
166 .collect();
167
151 let new_events: HashSet<EventId> = target_needs 168 let new_events: HashSet<EventId> = target_needs
152 .root_events 169 .root_events
153 .difference(&pending_events) 170 .difference(&pending_events)
@@ -156,13 +173,23 @@ pub fn compute_actions(
156 .collect(); 173 .collect();
157 174
158 // If there's anything new, create an AddFilters action 175 // If there's anything new, create an AddFilters action
159 if !new_repos.is_empty() || !new_events.is_empty() { 176 if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty()
160 let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); 177 {
178 let filters = build_sync_level_aware_filters(
179 &new_full_repos,
180 &new_state_only_repos,
181 &new_events,
182 None,
183 );
184
185 // Combine all repos into pending items (pending tracking doesn't need sync level)
186 let mut all_new_repos = new_full_repos;
187 all_new_repos.extend(new_state_only_repos);
161 188
162 actions.push(AddFilters { 189 actions.push(AddFilters {
163 relay_url: relay_url.clone(), 190 relay_url: relay_url.clone(),
164 items: PendingItems { 191 items: PendingItems {
165 repos: new_repos, 192 repos: all_new_repos,
166 root_events: new_events, 193 root_events: new_events,
167 }, 194 },
168 filters, 195 filters,
@@ -204,6 +231,7 @@ mod tests {
204 ModRepoSyncNeeds { 231 ModRepoSyncNeeds {
205 relays, 232 relays,
206 root_events, 233 root_events,
234 sync_level: Default::default(),
207 }, 235 },
208 ); 236 );
209 237
@@ -229,6 +257,7 @@ mod tests {
229 ModRepoSyncNeeds { 257 ModRepoSyncNeeds {
230 relays, 258 relays,
231 root_events: HashSet::new(), 259 root_events: HashSet::new(),
260 sync_level: Default::default(),
232 }, 261 },
233 ); 262 );
234 } 263 }
@@ -252,6 +281,7 @@ mod tests {
252 ModRepoSyncNeeds { 281 ModRepoSyncNeeds {
253 relays, 282 relays,
254 root_events: HashSet::new(), 283 root_events: HashSet::new(),
284 sync_level: Default::default(),
255 }, 285 },
256 ); 286 );
257 287
@@ -285,6 +315,7 @@ mod tests {
285 ModRepoSyncNeeds { 315 ModRepoSyncNeeds {
286 relays: relays1, 316 relays: relays1,
287 root_events: root_events1, 317 root_events: root_events1,
318 sync_level: Default::default(),
288 }, 319 },
289 ); 320 );
290 321
@@ -299,6 +330,7 @@ mod tests {
299 ModRepoSyncNeeds { 330 ModRepoSyncNeeds {
300 relays: relays2, 331 relays: relays2,
301 root_events: root_events2, 332 root_events: root_events2,
333 sync_level: Default::default(),
302 }, 334 },
303 ); 335 );
304 336
@@ -332,6 +364,7 @@ mod tests {
332 "wss://relay1.com".to_string(), 364 "wss://relay1.com".to_string(),
333 RelaySyncNeeds { 365 RelaySyncNeeds {
334 repos: vec!["repo1".to_string()].into_iter().collect(), 366 repos: vec!["repo1".to_string()].into_iter().collect(),
367 state_only_repos: HashSet::new(),
335 root_events: HashSet::new(), 368 root_events: HashSet::new(),
336 }, 369 },
337 ); 370 );
@@ -366,6 +399,7 @@ mod tests {
366 "wss://relay1.com".to_string(), 399 "wss://relay1.com".to_string(),
367 RelaySyncNeeds { 400 RelaySyncNeeds {
368 repos: vec!["repo1".to_string()].into_iter().collect(), 401 repos: vec!["repo1".to_string()].into_iter().collect(),
402 state_only_repos: HashSet::new(),
369 root_events: HashSet::new(), 403 root_events: HashSet::new(),
370 }, 404 },
371 ); 405 );
@@ -389,6 +423,7 @@ mod tests {
389 "wss://relay1.com".to_string(), 423 "wss://relay1.com".to_string(),
390 RelaySyncNeeds { 424 RelaySyncNeeds {
391 repos: vec!["repo1".to_string()].into_iter().collect(), 425 repos: vec!["repo1".to_string()].into_iter().collect(),
426 state_only_repos: HashSet::new(),
392 root_events: HashSet::new(), 427 root_events: HashSet::new(),
393 }, 428 },
394 ); 429 );
@@ -428,6 +463,7 @@ mod tests {
428 "wss://relay1.com".to_string(), 463 "wss://relay1.com".to_string(),
429 RelaySyncNeeds { 464 RelaySyncNeeds {
430 repos: vec!["repo1".to_string()].into_iter().collect(), 465 repos: vec!["repo1".to_string()].into_iter().collect(),
466 state_only_repos: HashSet::new(),
431 root_events: HashSet::new(), 467 root_events: HashSet::new(),
432 }, 468 },
433 ); 469 );
@@ -465,6 +501,7 @@ mod tests {
465 "wss://relay1.com".to_string(), 501 "wss://relay1.com".to_string(),
466 RelaySyncNeeds { 502 RelaySyncNeeds {
467 repos: vec!["repo1".to_string()].into_iter().collect(), 503 repos: vec!["repo1".to_string()].into_iter().collect(),
504 state_only_repos: HashSet::new(),
468 root_events: HashSet::new(), 505 root_events: HashSet::new(),
469 }, 506 },
470 ); 507 );
@@ -510,6 +547,7 @@ mod tests {
510 ] 547 ]
511 .into_iter() 548 .into_iter()
512 .collect(), 549 .collect(),
550 state_only_repos: HashSet::new(),
513 root_events: HashSet::new(), 551 root_events: HashSet::new(),
514 }, 552 },
515 ); 553 );
@@ -572,6 +610,7 @@ mod tests {
572 "wss://relay1.com".to_string(), 610 "wss://relay1.com".to_string(),
573 RelaySyncNeeds { 611 RelaySyncNeeds {
574 repos: HashSet::new(), 612 repos: HashSet::new(),
613 state_only_repos: HashSet::new(),
575 root_events: vec![event_id].into_iter().collect(), 614 root_events: vec![event_id].into_iter().collect(),
576 }, 615 },
577 ); 616 );
@@ -599,6 +638,7 @@ mod tests {
599 "wss://new-relay.com".to_string(), 638 "wss://new-relay.com".to_string(),
600 RelaySyncNeeds { 639 RelaySyncNeeds {
601 repos: vec!["repo1".to_string()].into_iter().collect(), 640 repos: vec!["repo1".to_string()].into_iter().collect(),
641 state_only_repos: HashSet::new(),
602 root_events: HashSet::new(), 642 root_events: HashSet::new(),
603 }, 643 },
604 ); 644 );
diff --git a/src/sync/filters.rs b/src/sync/filters.rs
index 3592489..1215e81 100644
--- a/src/sync/filters.rs
+++ b/src/sync/filters.rs
@@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters(
245 filters 245 filters
246} 246}
247 247
248/// Builds filters respecting SyncLevel for each repo
249///
250/// StateOnly repos only get state event filters (kind 30618).
251/// Full repos get all L2/L3 filters (state + repo-tagging + root event).
252///
253/// # Arguments
254/// * `full_repos` - Repos needing full L2+L3 sync
255/// * `state_only_repos` - Repos needing only state event sync (purgatory)
256/// * `root_events` - Root event IDs (only used for Full repos)
257/// * `since` - Optional timestamp for incremental sync
258pub fn build_sync_level_aware_filters(
259 full_repos: &HashSet<String>,
260 state_only_repos: &HashSet<String>,
261 root_events: &HashSet<EventId>,
262 since: Option<Timestamp>,
263) -> Vec<Filter> {
264 let mut filters = Vec::new();
265
266 // All repos (both Full and StateOnly) need state event filters
267 let all_repos: HashSet<String> = full_repos.union(state_only_repos).cloned().collect();
268 filters.extend(state_event_filters_for_our_repos(&all_repos, since));
269
270 // Only Full repos get repo-tagging and root event filters
271 if !full_repos.is_empty() {
272 filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since));
273 }
274 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
275
276 filters
277}
278
248#[cfg(test)] 279#[cfg(test)]
249mod tests { 280mod tests {
250 use super::*; 281 use super::*;
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index d6634ff..cd62380 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex;
85// Supporting Data Structures 85// Supporting Data Structures
86// ============================================================================= 86// =============================================================================
87 87
88/// Level of sync needed for a repository
89///
90/// Purgatory announcements only need state events synced (to validate git data).
91/// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.).
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
93pub enum SyncLevel {
94 /// Full L2 + L3 sync (promoted repos with git data)
95 #[default]
96 Full,
97 /// Only state events (kind 30618) - for purgatory announcements
98 StateOnly,
99}
100
88/// What repos and root events need to be synced 101/// What repos and root events need to be synced
89#[derive(Debug, Clone, Default)] 102#[derive(Debug, Clone, Default)]
90pub struct RepoSyncNeeds { 103pub struct RepoSyncNeeds {
@@ -92,6 +105,8 @@ pub struct RepoSyncNeeds {
92 pub relays: HashSet<String>, 105 pub relays: HashSet<String>,
93 /// Root event IDs - 1617/1618/1621 - that reference this repo 106 /// Root event IDs - 1617/1618/1621 - that reference this repo
94 pub root_events: HashSet<EventId>, 107 pub root_events: HashSet<EventId>,
108 /// Sync level - StateOnly for purgatory, Full for promoted repos
109 pub sync_level: SyncLevel,
95} 110}
96 111
97/// Connection status for a relay 112/// Connection status for a relay
@@ -382,6 +397,40 @@ async fn run_daily_timer(
382 } 397 }
383} 398}
384 399
400/// Background task that periodically syncs purgatory announcements into repo_sync_index.
401///
402/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`).
403/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in
404/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the
405/// relay URLs listed in the announcement and subscribes to state events (kind 30618).
406///
407/// This is the sole registration path for purgatory announcements:
408/// - Sync-path announcements: registered here within one interval of arriving.
409/// - User-submitted purgatory announcements: the SelfSubscriber never sees them
410/// (they're rejected from DB), so this timer is the only registration path.
411async fn run_purgatory_announcement_sync(
412 sync_manager: Arc<Mutex<SyncManager>>,
413 mut shutdown_rx: broadcast::Receiver<()>,
414) {
415 let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
416 Duration::from_millis(200)
417 } else {
418 Duration::from_secs(5)
419 };
420 loop {
421 tokio::select! {
422 _ = tokio::time::sleep(interval) => {
423 let mut manager = sync_manager.lock().await;
424 manager.sync_purgatory_announcements_to_index().await;
425 }
426 _ = shutdown_rx.recv() => {
427 tracing::debug!("Purgatory announcement sync timer received shutdown signal");
428 break;
429 }
430 }
431 }
432}
433
385// Combined Health and Metrics Checker 434// Combined Health and Metrics Checker
386 435
387/// Background task for cleaning up expired entries from the rejected events index 436/// Background task for cleaning up expired entries from the rejected events index
@@ -936,9 +985,29 @@ impl SyncManager {
936 985
937 // Create REQ+EOSE subscriptions using original semantic filters 986 // Create REQ+EOSE subscriptions using original semantic filters
938 // This queries by kind/author/tags instead of by ID, which may 987 // This queries by kind/author/tags instead of by ID, which may
939 // succeed even when ID-based queries fail 988 // succeed even when ID-based queries fail.
940 let fallback_filters = filters::build_layer2_and_layer3_filters( 989 // Split batch_repos by SyncLevel to avoid sending Layer 2 filters
941 &batch_repos, 990 // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be
991 // rejected as orphan and then silently dropped by nostr-sdk deduplication.
992 let (full_repos, state_only_repos) = {
993 let repo_index = self.repo_sync_index.read().await;
994 let mut full = HashSet::new();
995 let mut state_only = HashSet::new();
996 for repo_ref in &batch_repos {
997 match repo_index.get(repo_ref).map(|n| n.sync_level) {
998 Some(SyncLevel::StateOnly) => {
999 state_only.insert(repo_ref.clone());
1000 }
1001 _ => {
1002 full.insert(repo_ref.clone());
1003 }
1004 }
1005 }
1006 (full, state_only)
1007 };
1008 let fallback_filters = filters::build_sync_level_aware_filters(
1009 &full_repos,
1010 &state_only_repos,
942 &batch_root_events, 1011 &batch_root_events,
943 None, 1012 None,
944 ); 1013 );
@@ -1272,7 +1341,7 @@ impl SyncManager {
1272 /// to be batched and create Layer 2/3 filters before we mark sync complete. 1341 /// to be batched and create Layer 2/3 filters before we mark sync complete.
1273 /// 1342 ///
1274 /// The 6-second delay is based on: 1343 /// The 6-second delay is based on:
1275 /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) 1344 /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`)
1276 /// - Buffer for processing: 1 second 1345 /// - Buffer for processing: 1 second
1277 /// 1346 ///
1278 /// Called after each batch is confirmed to detect completion. 1347 /// Called after each batch is confirmed to detect completion.
@@ -1486,7 +1555,17 @@ impl SyncManager {
1486 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; 1555 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await;
1487 }); 1556 });
1488 1557
1489 // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 1558 // 11. Spawn purgatory announcement sync timer (every 5s)
1559 // Ensures purgatory announcements (including user-submitted ones that never
1560 // touch the DB) are registered in repo_sync_index as StateOnly so that
1561 // state event subscriptions are established on their listed relay URLs.
1562 let purgatory_sync_manager = Arc::clone(&sync_manager);
1563 let purgatory_sync_shutdown = shutdown_tx.subscribe();
1564 tokio::spawn(async move {
1565 run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await;
1566 });
1567
1568 // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
1490 loop { 1569 loop {
1491 // Wait for an event without holding the lock 1570 // Wait for an event without holding the lock
1492 tokio::select! { 1571 tokio::select! {
@@ -1719,6 +1798,10 @@ impl SyncManager {
1719 1798
1720 // For sync-triggered events that go to purgatory, trigger immediate sync 1799 // For sync-triggered events that go to purgatory, trigger immediate sync
1721 // (instead of the default 3-minute delay for user-submitted events) 1800 // (instead of the default 3-minute delay for user-submitted events)
1801 //
1802 // Note: announcement events (kind 30617) are registered in repo_sync_index
1803 // by the purgatory announcement sync timer (run_purgatory_announcement_sync)
1804 // rather than inline here.
1722 if result == ProcessResult::Purgatory { 1805 if result == ProcessResult::Purgatory {
1723 // State events (kind 30618) - extract identifier and trigger immediate sync 1806 // State events (kind 30618) - extract identifier and trigger immediate sync
1724 if event.kind.as_u16() == 30618 { 1807 if event.kind.as_u16() == 30618 {
@@ -2303,6 +2386,80 @@ impl SyncManager {
2303 } 2386 }
2304 } 2387 }
2305 2388
2389 /// Sync purgatory announcements into repo_sync_index as StateOnly entries.
2390 ///
2391 /// Called periodically by the purgatory announcement sync timer (every 5s).
2392 /// For each announcement currently in purgatory, ensures a `StateOnly` entry
2393 /// exists in `repo_sync_index`. New entries are then picked up by
2394 /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes
2395 /// to state events for that repo.
2396 ///
2397 /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full).
2398 async fn sync_purgatory_announcements_to_index(&mut self) {
2399 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
2400
2401 // Collect all purgatory announcements (snapshot - no async holds)
2402 let announcements = self.purgatory.announcements_for_sync();
2403
2404 if announcements.is_empty() {
2405 return;
2406 }
2407
2408 // Register any new entries in repo_sync_index as StateOnly
2409 let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
2410 {
2411 let mut index = self.repo_sync_index.write().await;
2412 for (repo_id, relays) in &announcements {
2413 let entry = index.entry(repo_id.clone()).or_insert_with(|| {
2414 tracing::debug!(
2415 repo_id = %repo_id,
2416 "Registering purgatory announcement in repo_sync_index as StateOnly"
2417 );
2418 RepoSyncNeeds {
2419 relays: std::collections::HashSet::new(),
2420 root_events: std::collections::HashSet::new(),
2421 sync_level: SyncLevel::StateOnly,
2422 }
2423 });
2424 // Don't downgrade an already-Full entry
2425 // Add any new relay URLs
2426 for relay in relays {
2427 if entry.relays.insert(relay.clone()) {
2428 new_relay_urls.insert(relay.clone());
2429 }
2430 }
2431 }
2432 }
2433
2434 if new_relay_urls.is_empty() {
2435 return;
2436 }
2437
2438 // For any relay URLs that are new, compute and send AddFilters actions
2439 let all_targets = {
2440 let repo_index = self.repo_sync_index.read().await;
2441 derive_relay_targets(&repo_index)
2442 };
2443
2444 let actions = {
2445 let pending_index = self.pending_sync_index.read().await;
2446 let relay_index = self.relay_sync_index.read().await;
2447 compute_actions(&all_targets, &pending_index, &relay_index)
2448 };
2449
2450 for action in actions {
2451 // Only act on relays that have new URLs (avoids redundant work)
2452 if new_relay_urls.contains(&action.relay_url) {
2453 tracing::info!(
2454 relay = %action.relay_url,
2455 repos = action.items.repos.len(),
2456 "Purgatory sync timer: connecting to new relay from purgatory announcement"
2457 );
2458 self.handle_new_sync_filters(action).await;
2459 }
2460 }
2461 }
2462
2306 /// Handle a relay disconnection 2463 /// Handle a relay disconnection
2307 /// 2464 ///
2308 /// This method is called when the event loop terminates and sends a disconnect notification. 2465 /// This method is called when the event loop terminates and sends a disconnect notification.
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 86e4583..4d69c9a 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -18,7 +18,7 @@ use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase; 19use crate::nostr::builder::SharedDatabase;
20 20
21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel};
22 22
23// ============================================================================= 23// =============================================================================
24// LoopControl - Result of notification processing 24// LoopControl - Result of notification processing
@@ -60,6 +60,7 @@ impl PendingUpdates {
60 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { 60 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
61 relays: HashSet::new(), 61 relays: HashSet::new(),
62 root_events: HashSet::new(), 62 root_events: HashSet::new(),
63 sync_level: SyncLevel::Full,
63 }); 64 });
64 entry.relays.extend(relays); 65 entry.relays.extend(relays);
65 entry.root_events.extend(root_events); 66 entry.root_events.extend(root_events);
@@ -132,14 +133,14 @@ impl SelfSubscriber {
132 133
133 /// Get batch window from environment or use default 134 /// Get batch window from environment or use default
134 /// 135 ///
135 /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. 136 /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution.
136 /// Default: 5000ms (5 seconds) 137 /// Default: 5000ms (5 seconds)
137 fn get_batch_window() -> Duration { 138 fn get_batch_window() -> Duration {
138 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") 139 if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
139 .ok() 140 Duration::from_millis(200)
140 .and_then(|s| s.parse::<u64>().ok()) 141 } else {
141 .map(Duration::from_millis) 142 Duration::from_millis(5000)
142 .unwrap_or(Duration::from_millis(5000)) 143 }
143 } 144 }
144 145
145 /// Load existing events from database on startup 146 /// Load existing events from database on startup
@@ -197,6 +198,7 @@ impl SelfSubscriber {
197 .or_insert_with(|| RepoSyncNeeds { 198 .or_insert_with(|| RepoSyncNeeds {
198 relays: HashSet::new(), 199 relays: HashSet::new(),
199 root_events: HashSet::new(), 200 root_events: HashSet::new(),
201 sync_level: SyncLevel::StateOnly,
200 }); 202 });
201 entry.relays.extend(needs.relays.clone()); 203 entry.relays.extend(needs.relays.clone());
202 } 204 }
@@ -570,7 +572,12 @@ impl SelfSubscriber {
570 .or_insert_with(|| RepoSyncNeeds { 572 .or_insert_with(|| RepoSyncNeeds {
571 relays: HashSet::new(), 573 relays: HashSet::new(),
572 root_events: HashSet::new(), 574 root_events: HashSet::new(),
575 sync_level: SyncLevel::Full,
573 }); 576 });
577 // Upgrade sync_level to Full - this handles the case where the entry
578 // already exists as StateOnly (purgatory announcement) and is now being
579 // promoted (git data arrived and the event was broadcast via notify_event).
580 entry.sync_level = SyncLevel::Full;
574 entry.relays.extend(needs.relays); 581 entry.relays.extend(needs.relays);
575 entry.root_events.extend(needs.root_events); 582 entry.root_events.extend(needs.root_events);
576 583
@@ -594,21 +601,26 @@ impl SelfSubscriber {
594 continue; 601 continue;
595 } 602 }
596 603
597 // Build filters for these repos 604 // Build filters for these repos (sync-level-aware)
598 let filters = crate::sync::filters::build_layer2_and_layer3_filters( 605 let filters = crate::sync::filters::build_sync_level_aware_filters(
599 &needs.repos, 606 &needs.repos,
607 &needs.state_only_repos,
600 &needs.root_events, 608 &needs.root_events,
601 None, 609 None,
602 ); 610 );
603 611
604 // Log before moving values 612 // Log before moving values
605 let repo_count = needs.repos.len(); 613 let repo_count = needs.repos.len() + needs.state_only_repos.len();
606 let event_count = needs.root_events.len(); 614 let event_count = needs.root_events.len();
607 615
616 // Combine all repos into pending items
617 let mut all_repos = needs.repos;
618 all_repos.extend(needs.state_only_repos);
619
608 let action = AddFilters { 620 let action = AddFilters {
609 relay_url: relay_url.clone(), 621 relay_url: relay_url.clone(),
610 items: crate::sync::PendingItems { 622 items: crate::sync::PendingItems {
611 repos: needs.repos, 623 repos: all_repos,
612 root_events: needs.root_events, 624 root_events: needs.root_events,
613 }, 625 },
614 filters, 626 filters,