upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-18 20:32:13 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-18 20:32:13 +0000
commitee113a654e2971a6ebdb07398cc5638dbe59b48c (patch)
tree6e4aacd207553c367d9b533fd6d4824d34994c82 /src/sync/mod.rs
parente7e61d1abfb3609c6818e6040294c6be19ba805f (diff)
fix: replace repo_sync_index wiring with purgatory announcement sync timer
Instead of threading repo_sync_index through PolicyContext/builder.rs/main.rs to handle user-submitted purgatory announcements, add a simple background timer (run_purgatory_announcement_sync, every 5s) that scans the purgatory for announcement entries and registers them in repo_sync_index as StateOnly. This is simpler and covers both flows: - Sync-path announcements: inline registration still happens during event processing (sync/mod.rs:1839+), timer provides a safety net - User-submitted announcements: SelfSubscriber never sees them (rejected from DB), timer is the primary registration path The timer calls sync_purgatory_announcements_to_index() which: 1. Snapshots purgatory via new announcements_for_sync() public method 2. Or_inserts StateOnly entries (never downgrades Full entries) 3. Detects newly added relay URLs and calls handle_new_sync_filters to connect and subscribe - fixing the failing test that expected relay discovery from a user-submitted purgatory announcement Removes: repo_sync_index field from PolicyContext, set/get_repo_sync_index methods, set_repo_sync_index on Nip34WritePolicy, wiring in main.rs, and the inline AcceptPurgatory registration block in builder.rs.
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs125
1 files changed, 116 insertions, 9 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 916e2b0..ed5b6e7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -397,6 +397,37 @@ async fn run_daily_timer(
397 } 397 }
398} 398}
399 399
400/// Background task that periodically syncs purgatory announcements into repo_sync_index.
401///
402/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there
403/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters`
404/// which connects to the relay URLs listed in the announcement and subscribes to state
405/// events (kind 30618).
406///
407/// This covers two cases:
408/// - Sync-path announcements: registered inline during event processing, but this
409/// provides a safety net in case the inline registration was missed.
410/// - User-submitted purgatory announcements: the SelfSubscriber never sees them
411/// (they're rejected from DB), so this timer is the primary registration path.
412async fn run_purgatory_announcement_sync(
413 sync_manager: Arc<Mutex<SyncManager>>,
414 mut shutdown_rx: broadcast::Receiver<()>,
415) {
416 let interval = Duration::from_secs(5);
417 loop {
418 tokio::select! {
419 _ = tokio::time::sleep(interval) => {
420 let mut manager = sync_manager.lock().await;
421 manager.sync_purgatory_announcements_to_index().await;
422 }
423 _ = shutdown_rx.recv() => {
424 tracing::debug!("Purgatory announcement sync timer received shutdown signal");
425 break;
426 }
427 }
428 }
429}
430
400// Combined Health and Metrics Checker 431// Combined Health and Metrics Checker
401 432
402/// Background task for cleaning up expired entries from the rejected events index 433/// Background task for cleaning up expired entries from the rejected events index
@@ -700,14 +731,6 @@ impl SyncManager {
700 self.rejected_events_index.save_to_disk(path) 731 self.rejected_events_index.save_to_disk(path)
701 } 732 }
702 733
703 /// Get a clone of the repo sync index Arc.
704 ///
705 /// This allows the write policy to register user-submitted purgatory announcements
706 /// in the sync index so that state event sync starts promptly.
707 pub fn repo_sync_index(&self) -> RepoSyncIndex {
708 self.repo_sync_index.clone()
709 }
710
711 /// Handle EOSE (End Of Stored Events) for a subscription 734 /// Handle EOSE (End Of Stored Events) for a subscription
712 /// 735 ///
713 /// This method: 736 /// This method:
@@ -1560,7 +1583,17 @@ impl SyncManager {
1560 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; 1583 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await;
1561 }); 1584 });
1562 1585
1563 // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 1586 // 11. Spawn purgatory announcement sync timer (every 5s)
1587 // Ensures purgatory announcements (including user-submitted ones that never
1588 // touch the DB) are registered in repo_sync_index as StateOnly so that
1589 // state event subscriptions are established on their listed relay URLs.
1590 let purgatory_sync_manager = Arc::clone(&sync_manager);
1591 let purgatory_sync_shutdown = shutdown_tx.subscribe();
1592 tokio::spawn(async move {
1593 run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await;
1594 });
1595
1596 // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
1564 loop { 1597 loop {
1565 // Wait for an event without holding the lock 1598 // Wait for an event without holding the lock
1566 tokio::select! { 1599 tokio::select! {
@@ -2419,6 +2452,80 @@ impl SyncManager {
2419 } 2452 }
2420 } 2453 }
2421 2454
2455 /// Sync purgatory announcements into repo_sync_index as StateOnly entries.
2456 ///
2457 /// Called periodically by the purgatory announcement sync timer (every 5s).
2458 /// For each announcement currently in purgatory, ensures a `StateOnly` entry
2459 /// exists in `repo_sync_index`. New entries are then picked up by
2460 /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes
2461 /// to state events for that repo.
2462 ///
2463 /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full).
2464 async fn sync_purgatory_announcements_to_index(&mut self) {
2465 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
2466
2467 // Collect all purgatory announcements (snapshot - no async holds)
2468 let announcements = self.purgatory.announcements_for_sync();
2469
2470 if announcements.is_empty() {
2471 return;
2472 }
2473
2474 // Register any new entries in repo_sync_index as StateOnly
2475 let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
2476 {
2477 let mut index = self.repo_sync_index.write().await;
2478 for (repo_id, relays) in &announcements {
2479 let entry = index.entry(repo_id.clone()).or_insert_with(|| {
2480 tracing::debug!(
2481 repo_id = %repo_id,
2482 "Registering purgatory announcement in repo_sync_index as StateOnly"
2483 );
2484 RepoSyncNeeds {
2485 relays: std::collections::HashSet::new(),
2486 root_events: std::collections::HashSet::new(),
2487 sync_level: SyncLevel::StateOnly,
2488 }
2489 });
2490 // Don't downgrade an already-Full entry
2491 // Add any new relay URLs
2492 for relay in relays {
2493 if entry.relays.insert(relay.clone()) {
2494 new_relay_urls.insert(relay.clone());
2495 }
2496 }
2497 }
2498 }
2499
2500 if new_relay_urls.is_empty() {
2501 return;
2502 }
2503
2504 // For any relay URLs that are new, compute and send AddFilters actions
2505 let all_targets = {
2506 let repo_index = self.repo_sync_index.read().await;
2507 derive_relay_targets(&repo_index)
2508 };
2509
2510 let actions = {
2511 let pending_index = self.pending_sync_index.read().await;
2512 let relay_index = self.relay_sync_index.read().await;
2513 compute_actions(&all_targets, &pending_index, &relay_index)
2514 };
2515
2516 for action in actions {
2517 // Only act on relays that have new URLs (avoids redundant work)
2518 if new_relay_urls.contains(&action.relay_url) {
2519 tracing::info!(
2520 relay = %action.relay_url,
2521 repos = action.items.repos.len(),
2522 "Purgatory sync timer: connecting to new relay from purgatory announcement"
2523 );
2524 self.handle_new_sync_filters(action).await;
2525 }
2526 }
2527 }
2528
2422 /// Handle a relay disconnection 2529 /// Handle a relay disconnection
2423 /// 2530 ///
2424 /// This method is called when the event loop terminates and sends a disconnect notification. 2531 /// This method is called when the event loop terminates and sends a disconnect notification.