diff options
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 34 |
1 files changed, 23 insertions, 11 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 86e4583..4d69c9a 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -18,7 +18,7 @@ use tokio::sync::{broadcast, mpsc}; | |||
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | 19 | use crate::nostr::builder::SharedDatabase; |
| 20 | 20 | ||
| 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; |
| 22 | 22 | ||
| 23 | // ============================================================================= | 23 | // ============================================================================= |
| 24 | // LoopControl - Result of notification processing | 24 | // LoopControl - Result of notification processing |
| @@ -60,6 +60,7 @@ impl PendingUpdates { | |||
| 60 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { | 60 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { |
| 61 | relays: HashSet::new(), | 61 | relays: HashSet::new(), |
| 62 | root_events: HashSet::new(), | 62 | root_events: HashSet::new(), |
| 63 | sync_level: SyncLevel::Full, | ||
| 63 | }); | 64 | }); |
| 64 | entry.relays.extend(relays); | 65 | entry.relays.extend(relays); |
| 65 | entry.root_events.extend(root_events); | 66 | entry.root_events.extend(root_events); |
| @@ -132,14 +133,14 @@ impl SelfSubscriber { | |||
| 132 | 133 | ||
| 133 | /// Get batch window from environment or use default | 134 | /// Get batch window from environment or use default |
| 134 | /// | 135 | /// |
| 135 | /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. | 136 | /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. |
| 136 | /// Default: 5000ms (5 seconds) | 137 | /// Default: 5000ms (5 seconds) |
| 137 | fn get_batch_window() -> Duration { | 138 | fn get_batch_window() -> Duration { |
| 138 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | 139 | if std::env::var("NGIT_TEST").as_deref() == Ok("1") { |
| 139 | .ok() | 140 | Duration::from_millis(200) |
| 140 | .and_then(|s| s.parse::<u64>().ok()) | 141 | } else { |
| 141 | .map(Duration::from_millis) | 142 | Duration::from_millis(5000) |
| 142 | .unwrap_or(Duration::from_millis(5000)) | 143 | } |
| 143 | } | 144 | } |
| 144 | 145 | ||
| 145 | /// Load existing events from database on startup | 146 | /// Load existing events from database on startup |
| @@ -197,6 +198,7 @@ impl SelfSubscriber { | |||
| 197 | .or_insert_with(|| RepoSyncNeeds { | 198 | .or_insert_with(|| RepoSyncNeeds { |
| 198 | relays: HashSet::new(), | 199 | relays: HashSet::new(), |
| 199 | root_events: HashSet::new(), | 200 | root_events: HashSet::new(), |
| 201 | sync_level: SyncLevel::StateOnly, | ||
| 200 | }); | 202 | }); |
| 201 | entry.relays.extend(needs.relays.clone()); | 203 | entry.relays.extend(needs.relays.clone()); |
| 202 | } | 204 | } |
| @@ -570,7 +572,12 @@ impl SelfSubscriber { | |||
| 570 | .or_insert_with(|| RepoSyncNeeds { | 572 | .or_insert_with(|| RepoSyncNeeds { |
| 571 | relays: HashSet::new(), | 573 | relays: HashSet::new(), |
| 572 | root_events: HashSet::new(), | 574 | root_events: HashSet::new(), |
| 575 | sync_level: SyncLevel::Full, | ||
| 573 | }); | 576 | }); |
| 577 | // Upgrade sync_level to Full - this handles the case where the entry | ||
| 578 | // already exists as StateOnly (purgatory announcement) and is now being | ||
| 579 | // promoted (git data arrived and the event was broadcast via notify_event). | ||
| 580 | entry.sync_level = SyncLevel::Full; | ||
| 574 | entry.relays.extend(needs.relays); | 581 | entry.relays.extend(needs.relays); |
| 575 | entry.root_events.extend(needs.root_events); | 582 | entry.root_events.extend(needs.root_events); |
| 576 | 583 | ||
| @@ -594,21 +601,26 @@ impl SelfSubscriber { | |||
| 594 | continue; | 601 | continue; |
| 595 | } | 602 | } |
| 596 | 603 | ||
| 597 | // Build filters for these repos | 604 | // Build filters for these repos (sync-level-aware) |
| 598 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | 605 | let filters = crate::sync::filters::build_sync_level_aware_filters( |
| 599 | &needs.repos, | 606 | &needs.repos, |
| 607 | &needs.state_only_repos, | ||
| 600 | &needs.root_events, | 608 | &needs.root_events, |
| 601 | None, | 609 | None, |
| 602 | ); | 610 | ); |
| 603 | 611 | ||
| 604 | // Log before moving values | 612 | // Log before moving values |
| 605 | let repo_count = needs.repos.len(); | 613 | let repo_count = needs.repos.len() + needs.state_only_repos.len(); |
| 606 | let event_count = needs.root_events.len(); | 614 | let event_count = needs.root_events.len(); |
| 607 | 615 | ||
| 616 | // Combine all repos into pending items | ||
| 617 | let mut all_repos = needs.repos; | ||
| 618 | all_repos.extend(needs.state_only_repos); | ||
| 619 | |||
| 608 | let action = AddFilters { | 620 | let action = AddFilters { |
| 609 | relay_url: relay_url.clone(), | 621 | relay_url: relay_url.clone(), |
| 610 | items: crate::sync::PendingItems { | 622 | items: crate::sync::PendingItems { |
| 611 | repos: needs.repos, | 623 | repos: all_repos, |
| 612 | root_events: needs.root_events, | 624 | root_events: needs.root_events, |
| 613 | }, | 625 | }, |
| 614 | filters, | 626 | filters, |