upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 15:41:32 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 15:41:32 +0000
commitc54ce061d6d278cce8362d5af085808ca60c239b (patch)
treeec967d6195d9f7ec4f061449596611afe3a0950f /src/sync/mod.rs
parente0ad39a489b3398f8208713bf728db0cb11475b0 (diff)
parent113928aa84894ea8f65c247d9987527e792b32a9 (diff)
feat: announcement purgatory
Extends purgatory to hold repository announcements until git data arrives, preventing empty repositories from being served to clients. When an announcement is received, a bare repo is created immediately and the announcement is held in purgatory. It is only promoted and served once a git push confirms real content exists. If no push arrives before expiry, the bare repo is deleted and the announcement is silently discarded. Key behaviours: - Soft expiry: announcements are hidden from clients but kept alive while git pushes are in progress, reviving on successful push - Expiry is extended when a matching state event or git push is observed - NIP-09 deletion events remove announcements from purgatory - Purgatory state (announcements, state events, PR events, expired set) is persisted to disk on graceful shutdown and restored on startup, with elapsed downtime subtracted from expiry deadlines - Purgatory announcements drive StateOnly sync in the sync system so state events are fetched from listed relays before promotion - SyncLevel added to RepoSyncIndex to distinguish purgatory repos (StateOnly) from promoted repos (Full L2+L3 sync)
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs167
1 files changed, 162 insertions, 5 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index d6634ff..cd62380 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex;
85// Supporting Data Structures 85// Supporting Data Structures
86// ============================================================================= 86// =============================================================================
87 87
88/// Level of sync needed for a repository
89///
90/// Purgatory announcements only need state events synced (to validate git data).
91/// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.).
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
93pub enum SyncLevel {
94 /// Full L2 + L3 sync (promoted repos with git data)
95 #[default]
96 Full,
97 /// Only state events (kind 30618) - for purgatory announcements
98 StateOnly,
99}
100
88/// What repos and root events need to be synced 101/// What repos and root events need to be synced
89#[derive(Debug, Clone, Default)] 102#[derive(Debug, Clone, Default)]
90pub struct RepoSyncNeeds { 103pub struct RepoSyncNeeds {
@@ -92,6 +105,8 @@ pub struct RepoSyncNeeds {
92 pub relays: HashSet<String>, 105 pub relays: HashSet<String>,
93 /// Root event IDs - 1617/1618/1621 - that reference this repo 106 /// Root event IDs - 1617/1618/1621 - that reference this repo
94 pub root_events: HashSet<EventId>, 107 pub root_events: HashSet<EventId>,
108 /// Sync level - StateOnly for purgatory, Full for promoted repos
109 pub sync_level: SyncLevel,
95} 110}
96 111
97/// Connection status for a relay 112/// Connection status for a relay
@@ -382,6 +397,40 @@ async fn run_daily_timer(
382 } 397 }
383} 398}
384 399
400/// Background task that periodically syncs purgatory announcements into repo_sync_index.
401///
402/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`).
403/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in
404/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the
405/// relay URLs listed in the announcement and subscribes to state events (kind 30618).
406///
407/// This is the sole registration path for purgatory announcements:
408/// - Sync-path announcements: registered here within one interval of arriving.
409/// - User-submitted purgatory announcements: the SelfSubscriber never sees them
410/// (they're rejected from DB), so this timer is the only registration path.
411async fn run_purgatory_announcement_sync(
412 sync_manager: Arc<Mutex<SyncManager>>,
413 mut shutdown_rx: broadcast::Receiver<()>,
414) {
415 let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
416 Duration::from_millis(200)
417 } else {
418 Duration::from_secs(5)
419 };
420 loop {
421 tokio::select! {
422 _ = tokio::time::sleep(interval) => {
423 let mut manager = sync_manager.lock().await;
424 manager.sync_purgatory_announcements_to_index().await;
425 }
426 _ = shutdown_rx.recv() => {
427 tracing::debug!("Purgatory announcement sync timer received shutdown signal");
428 break;
429 }
430 }
431 }
432}
433
385// Combined Health and Metrics Checker 434// Combined Health and Metrics Checker
386 435
387/// Background task for cleaning up expired entries from the rejected events index 436/// Background task for cleaning up expired entries from the rejected events index
@@ -936,9 +985,29 @@ impl SyncManager {
936 985
937 // Create REQ+EOSE subscriptions using original semantic filters 986 // Create REQ+EOSE subscriptions using original semantic filters
938 // This queries by kind/author/tags instead of by ID, which may 987 // This queries by kind/author/tags instead of by ID, which may
939 // succeed even when ID-based queries fail 988 // succeed even when ID-based queries fail.
940 let fallback_filters = filters::build_layer2_and_layer3_filters( 989 // Split batch_repos by SyncLevel to avoid sending Layer 2 filters
941 &batch_repos, 990 // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be
991 // rejected as orphan and then silently dropped by nostr-sdk deduplication.
992 let (full_repos, state_only_repos) = {
993 let repo_index = self.repo_sync_index.read().await;
994 let mut full = HashSet::new();
995 let mut state_only = HashSet::new();
996 for repo_ref in &batch_repos {
997 match repo_index.get(repo_ref).map(|n| n.sync_level) {
998 Some(SyncLevel::StateOnly) => {
999 state_only.insert(repo_ref.clone());
1000 }
1001 _ => {
1002 full.insert(repo_ref.clone());
1003 }
1004 }
1005 }
1006 (full, state_only)
1007 };
1008 let fallback_filters = filters::build_sync_level_aware_filters(
1009 &full_repos,
1010 &state_only_repos,
942 &batch_root_events, 1011 &batch_root_events,
943 None, 1012 None,
944 ); 1013 );
@@ -1272,7 +1341,7 @@ impl SyncManager {
1272 /// to be batched and create Layer 2/3 filters before we mark sync complete. 1341 /// to be batched and create Layer 2/3 filters before we mark sync complete.
1273 /// 1342 ///
1274 /// The 6-second delay is based on: 1343 /// The 6-second delay is based on:
1275 /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) 1344 /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`)
1276 /// - Buffer for processing: 1 second 1345 /// - Buffer for processing: 1 second
1277 /// 1346 ///
1278 /// Called after each batch is confirmed to detect completion. 1347 /// Called after each batch is confirmed to detect completion.
@@ -1486,7 +1555,17 @@ impl SyncManager {
1486 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; 1555 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await;
1487 }); 1556 });
1488 1557
1489 // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 1558 // 11. Spawn purgatory announcement sync timer (every 5s)
1559 // Ensures purgatory announcements (including user-submitted ones that never
1560 // touch the DB) are registered in repo_sync_index as StateOnly so that
1561 // state event subscriptions are established on their listed relay URLs.
1562 let purgatory_sync_manager = Arc::clone(&sync_manager);
1563 let purgatory_sync_shutdown = shutdown_tx.subscribe();
1564 tokio::spawn(async move {
1565 run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await;
1566 });
1567
1568 // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
1490 loop { 1569 loop {
1491 // Wait for an event without holding the lock 1570 // Wait for an event without holding the lock
1492 tokio::select! { 1571 tokio::select! {
@@ -1719,6 +1798,10 @@ impl SyncManager {
1719 1798
1720 // For sync-triggered events that go to purgatory, trigger immediate sync 1799 // For sync-triggered events that go to purgatory, trigger immediate sync
1721 // (instead of the default 3-minute delay for user-submitted events) 1800 // (instead of the default 3-minute delay for user-submitted events)
1801 //
1802 // Note: announcement events (kind 30617) are registered in repo_sync_index
1803 // by the purgatory announcement sync timer (run_purgatory_announcement_sync)
1804 // rather than inline here.
1722 if result == ProcessResult::Purgatory { 1805 if result == ProcessResult::Purgatory {
1723 // State events (kind 30618) - extract identifier and trigger immediate sync 1806 // State events (kind 30618) - extract identifier and trigger immediate sync
1724 if event.kind.as_u16() == 30618 { 1807 if event.kind.as_u16() == 30618 {
@@ -2303,6 +2386,80 @@ impl SyncManager {
2303 } 2386 }
2304 } 2387 }
2305 2388
2389 /// Sync purgatory announcements into repo_sync_index as StateOnly entries.
2390 ///
2391 /// Called periodically by the purgatory announcement sync timer (every 5s).
2392 /// For each announcement currently in purgatory, ensures a `StateOnly` entry
2393 /// exists in `repo_sync_index`. New entries are then picked up by
2394 /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes
2395 /// to state events for that repo.
2396 ///
2397 /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full).
2398 async fn sync_purgatory_announcements_to_index(&mut self) {
2399 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
2400
2401 // Collect all purgatory announcements (snapshot - no async holds)
2402 let announcements = self.purgatory.announcements_for_sync();
2403
2404 if announcements.is_empty() {
2405 return;
2406 }
2407
2408 // Register any new entries in repo_sync_index as StateOnly
2409 let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
2410 {
2411 let mut index = self.repo_sync_index.write().await;
2412 for (repo_id, relays) in &announcements {
2413 let entry = index.entry(repo_id.clone()).or_insert_with(|| {
2414 tracing::debug!(
2415 repo_id = %repo_id,
2416 "Registering purgatory announcement in repo_sync_index as StateOnly"
2417 );
2418 RepoSyncNeeds {
2419 relays: std::collections::HashSet::new(),
2420 root_events: std::collections::HashSet::new(),
2421 sync_level: SyncLevel::StateOnly,
2422 }
2423 });
2424 // Don't downgrade an already-Full entry
2425 // Add any new relay URLs
2426 for relay in relays {
2427 if entry.relays.insert(relay.clone()) {
2428 new_relay_urls.insert(relay.clone());
2429 }
2430 }
2431 }
2432 }
2433
2434 if new_relay_urls.is_empty() {
2435 return;
2436 }
2437
2438 // For any relay URLs that are new, compute and send AddFilters actions
2439 let all_targets = {
2440 let repo_index = self.repo_sync_index.read().await;
2441 derive_relay_targets(&repo_index)
2442 };
2443
2444 let actions = {
2445 let pending_index = self.pending_sync_index.read().await;
2446 let relay_index = self.relay_sync_index.read().await;
2447 compute_actions(&all_targets, &pending_index, &relay_index)
2448 };
2449
2450 for action in actions {
2451 // Only act on relays that have new URLs (avoids redundant work)
2452 if new_relay_urls.contains(&action.relay_url) {
2453 tracing::info!(
2454 relay = %action.relay_url,
2455 repos = action.items.repos.len(),
2456 "Purgatory sync timer: connecting to new relay from purgatory announcement"
2457 );
2458 self.handle_new_sync_filters(action).await;
2459 }
2460 }
2461 }
2462
2306 /// Handle a relay disconnection 2463 /// Handle a relay disconnection
2307 /// 2464 ///
2308 /// This method is called when the event loop terminates and sends a disconnect notification. 2465 /// This method is called when the event loop terminates and sends a disconnect notification.