upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs34
1 files changed, 23 insertions, 11 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 86e4583..4d69c9a 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -18,7 +18,7 @@ use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase; 19use crate::nostr::builder::SharedDatabase;
20 20
21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel};
22 22
23// ============================================================================= 23// =============================================================================
24// LoopControl - Result of notification processing 24// LoopControl - Result of notification processing
@@ -60,6 +60,7 @@ impl PendingUpdates {
60 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { 60 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
61 relays: HashSet::new(), 61 relays: HashSet::new(),
62 root_events: HashSet::new(), 62 root_events: HashSet::new(),
63 sync_level: SyncLevel::Full,
63 }); 64 });
64 entry.relays.extend(relays); 65 entry.relays.extend(relays);
65 entry.root_events.extend(root_events); 66 entry.root_events.extend(root_events);
@@ -132,14 +133,14 @@ impl SelfSubscriber {
132 133
133 /// Get batch window from environment or use default 134 /// Get batch window from environment or use default
134 /// 135 ///
135 /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. 136 /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution.
136 /// Default: 5000ms (5 seconds) 137 /// Default: 5000ms (5 seconds)
137 fn get_batch_window() -> Duration { 138 fn get_batch_window() -> Duration {
138 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") 139 if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
139 .ok() 140 Duration::from_millis(200)
140 .and_then(|s| s.parse::<u64>().ok()) 141 } else {
141 .map(Duration::from_millis) 142 Duration::from_millis(5000)
142 .unwrap_or(Duration::from_millis(5000)) 143 }
143 } 144 }
144 145
145 /// Load existing events from database on startup 146 /// Load existing events from database on startup
@@ -197,6 +198,7 @@ impl SelfSubscriber {
197 .or_insert_with(|| RepoSyncNeeds { 198 .or_insert_with(|| RepoSyncNeeds {
198 relays: HashSet::new(), 199 relays: HashSet::new(),
199 root_events: HashSet::new(), 200 root_events: HashSet::new(),
201 sync_level: SyncLevel::StateOnly,
200 }); 202 });
201 entry.relays.extend(needs.relays.clone()); 203 entry.relays.extend(needs.relays.clone());
202 } 204 }
@@ -570,7 +572,12 @@ impl SelfSubscriber {
570 .or_insert_with(|| RepoSyncNeeds { 572 .or_insert_with(|| RepoSyncNeeds {
571 relays: HashSet::new(), 573 relays: HashSet::new(),
572 root_events: HashSet::new(), 574 root_events: HashSet::new(),
575 sync_level: SyncLevel::Full,
573 }); 576 });
577 // Upgrade sync_level to Full - this handles the case where the entry
578 // already exists as StateOnly (purgatory announcement) and is now being
579 // promoted (git data arrived and the event was broadcast via notify_event).
580 entry.sync_level = SyncLevel::Full;
574 entry.relays.extend(needs.relays); 581 entry.relays.extend(needs.relays);
575 entry.root_events.extend(needs.root_events); 582 entry.root_events.extend(needs.root_events);
576 583
@@ -594,21 +601,26 @@ impl SelfSubscriber {
594 continue; 601 continue;
595 } 602 }
596 603
597 // Build filters for these repos 604 // Build filters for these repos (sync-level-aware)
598 let filters = crate::sync::filters::build_layer2_and_layer3_filters( 605 let filters = crate::sync::filters::build_sync_level_aware_filters(
599 &needs.repos, 606 &needs.repos,
607 &needs.state_only_repos,
600 &needs.root_events, 608 &needs.root_events,
601 None, 609 None,
602 ); 610 );
603 611
604 // Log before moving values 612 // Log before moving values
605 let repo_count = needs.repos.len(); 613 let repo_count = needs.repos.len() + needs.state_only_repos.len();
606 let event_count = needs.root_events.len(); 614 let event_count = needs.root_events.len();
607 615
616 // Combine all repos into pending items
617 let mut all_repos = needs.repos;
618 all_repos.extend(needs.state_only_repos);
619
608 let action = AddFilters { 620 let action = AddFilters {
609 relay_url: relay_url.clone(), 621 relay_url: relay_url.clone(),
610 items: crate::sync::PendingItems { 622 items: crate::sync::PendingItems {
611 repos: needs.repos, 623 repos: all_repos,
612 root_events: needs.root_events, 624 root_events: needs.root_events,
613 }, 625 },
614 filters, 626 filters,