diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 20:32:13 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 20:32:13 +0000 |
| commit | ee113a654e2971a6ebdb07398cc5638dbe59b48c (patch) | |
| tree | 6e4aacd207553c367d9b533fd6d4824d34994c82 /src | |
| parent | e7e61d1abfb3609c6818e6040294c6be19ba805f (diff) | |
fix: replace repo_sync_index wiring with purgatory announcement sync timer
Instead of threading repo_sync_index through PolicyContext/builder.rs/main.rs
to handle user-submitted purgatory announcements, add a simple background
timer (run_purgatory_announcement_sync, every 5s) that scans the purgatory
for announcement entries and registers them in repo_sync_index as StateOnly.
This is simpler and covers both flows:
- Sync-path announcements: inline registration still happens during event
processing (sync/mod.rs:1839+), timer provides a safety net
- User-submitted announcements: SelfSubscriber never sees them (rejected
from DB), timer is the primary registration path
The timer calls sync_purgatory_announcements_to_index() which:
1. Snapshots purgatory via new announcements_for_sync() public method
2. Or_inserts StateOnly entries (never downgrades Full entries)
3. Detects newly added relay URLs and calls handle_new_sync_filters to
connect and subscribe - fixing the failing test that expected relay
discovery from a user-submitted purgatory announcement
Removes: repo_sync_index field from PolicyContext, set/get_repo_sync_index
methods, set_repo_sync_index on Nip34WritePolicy, wiring in main.rs, and
the inline AcceptPurgatory registration block in builder.rs.
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 7 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 54 | ||||
| -rw-r--r-- | src/nostr/policy/mod.rs | 19 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 17 | ||||
| -rw-r--r-- | src/sync/mod.rs | 125 |
5 files changed, 134 insertions, 88 deletions
diff --git a/src/main.rs b/src/main.rs index ebe05a3..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -132,13 +132,6 @@ async fn main() -> Result<()> { | |||
| 132 | // Get a reference to the rejected events index for shutdown persistence | 132 | // Get a reference to the rejected events index for shutdown persistence |
| 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); | 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); |
| 134 | 134 | ||
| 135 | // Wire repo_sync_index into write policy so user-submitted purgatory announcements | ||
| 136 | // get registered for state event sync immediately (Fix 3). | ||
| 137 | let repo_sync_index = sync_manager.repo_sync_index(); | ||
| 138 | relay_with_db | ||
| 139 | .write_policy | ||
| 140 | .set_repo_sync_index(repo_sync_index); | ||
| 141 | |||
| 142 | tokio::spawn(async move { | 135 | tokio::spawn(async move { |
| 143 | sync_manager.run().await; | 136 | sync_manager.run().await; |
| 144 | }); | 137 | }); |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 8d1e461..c2d4939 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -17,7 +17,7 @@ use crate::nostr::policy::{ | |||
| 17 | AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, | 17 | AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, |
| 18 | RelatedEventPolicy, StatePolicy, StateResult, | 18 | RelatedEventPolicy, StatePolicy, StateResult, |
| 19 | }; | 19 | }; |
| 20 | use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; | 20 | |
| 21 | 21 | ||
| 22 | /// Type alias for the shared database used by the relay | 22 | /// Type alias for the shared database used by the relay |
| 23 | pub type SharedDatabase = Arc<dyn NostrDatabase>; | 23 | pub type SharedDatabase = Arc<dyn NostrDatabase>; |
| @@ -99,14 +99,6 @@ impl Nip34WritePolicy { | |||
| 99 | self.ctx.set_local_relay(relay); | 99 | self.ctx.set_local_relay(relay); |
| 100 | } | 100 | } |
| 101 | 101 | ||
| 102 | /// Set the repo sync index so that user-submitted purgatory announcements can | ||
| 103 | /// be registered for state event sync immediately. | ||
| 104 | /// | ||
| 105 | /// This must be called after SyncManager is created. | ||
| 106 | pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { | ||
| 107 | self.ctx.set_repo_sync_index(index); | ||
| 108 | } | ||
| 109 | |||
| 110 | /// Handle repository announcement event | 102 | /// Handle repository announcement event |
| 111 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { | 103 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { |
| 112 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | 104 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); |
| @@ -156,50 +148,6 @@ impl Nip34WritePolicy { | |||
| 156 | event_id_str | 148 | event_id_str |
| 157 | ); | 149 | ); |
| 158 | 150 | ||
| 159 | // Register repo in repo_sync_index with StateOnly level so that | ||
| 160 | // state event sync starts promptly via the next batch EOSE recompute. | ||
| 161 | // This handles user-submitted purgatory announcements - the SelfSubscriber | ||
| 162 | // only sees DB events, so it won't pick these up automatically. | ||
| 163 | if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { | ||
| 164 | if let Ok(announcement) = | ||
| 165 | RepositoryAnnouncement::from_event(event.clone()) | ||
| 166 | { | ||
| 167 | use std::collections::HashSet; | ||
| 168 | let repo_id = format!( | ||
| 169 | "30617:{}:{}", | ||
| 170 | event.pubkey, | ||
| 171 | announcement.identifier | ||
| 172 | ); | ||
| 173 | |||
| 174 | // Extract relay URLs from the announcement event tags | ||
| 175 | let relays: HashSet<String> = event | ||
| 176 | .tags | ||
| 177 | .iter() | ||
| 178 | .flat_map(|tag| { | ||
| 179 | let tag_vec = tag.as_slice(); | ||
| 180 | if !tag_vec.is_empty() && tag_vec[0] == "relays" { | ||
| 181 | tag_vec[1..].iter().map(|s| s.to_string()).collect::<Vec<_>>() | ||
| 182 | } else { | ||
| 183 | vec![] | ||
| 184 | } | ||
| 185 | }) | ||
| 186 | .collect(); | ||
| 187 | |||
| 188 | let mut index = repo_sync_index.write().await; | ||
| 189 | index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { | ||
| 190 | relays, | ||
| 191 | root_events: HashSet::new(), | ||
| 192 | sync_level: SyncLevel::StateOnly, | ||
| 193 | }); | ||
| 194 | drop(index); | ||
| 195 | |||
| 196 | tracing::debug!( | ||
| 197 | repo_id = %repo_id, | ||
| 198 | "Registered purgatory announcement in repo_sync_index as StateOnly" | ||
| 199 | ); | ||
| 200 | } | ||
| 201 | } | ||
| 202 | |||
| 203 | WritePolicyResult::Reject { | 151 | WritePolicyResult::Reject { |
| 204 | status: true, // Client sees OK | 152 | status: true, // Client sees OK |
| 205 | message: "purgatory: won't be served until git data arrives".into(), | 153 | message: "purgatory: won't be served until git data arrives".into(), |
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index c958586..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs | |||
| @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; | |||
| 20 | 20 | ||
| 21 | use super::SharedDatabase; | 21 | use super::SharedDatabase; |
| 22 | use crate::purgatory::Purgatory; | 22 | use crate::purgatory::Purgatory; |
| 23 | use crate::sync::RepoSyncIndex; | ||
| 24 | use nostr_relay_builder::LocalRelay; | 23 | use nostr_relay_builder::LocalRelay; |
| 25 | use std::sync::Arc; | 24 | use std::sync::Arc; |
| 26 | 25 | ||
| @@ -35,8 +34,6 @@ pub struct PolicyContext { | |||
| 35 | pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, | 34 | pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, |
| 36 | /// Configuration reference for policy settings (includes blacklists) | 35 | /// Configuration reference for policy settings (includes blacklists) |
| 37 | pub config: crate::config::Config, | 36 | pub config: crate::config::Config, |
| 38 | /// Repo sync index for registering purgatory announcements (set after SyncManager creation) | ||
| 39 | pub repo_sync_index: Arc<std::sync::RwLock<Option<RepoSyncIndex>>>, | ||
| 40 | } | 37 | } |
| 41 | 38 | ||
| 42 | impl PolicyContext { | 39 | impl PolicyContext { |
| @@ -54,7 +51,6 @@ impl PolicyContext { | |||
| 54 | purgatory, | 51 | purgatory, |
| 55 | local_relay: Arc::new(std::sync::RwLock::new(None)), | 52 | local_relay: Arc::new(std::sync::RwLock::new(None)), |
| 56 | config, | 53 | config, |
| 57 | repo_sync_index: Arc::new(std::sync::RwLock::new(None)), | ||
| 58 | } | 54 | } |
| 59 | } | 55 | } |
| 60 | 56 | ||
| @@ -72,19 +68,4 @@ impl PolicyContext { | |||
| 72 | let guard = self.local_relay.read().unwrap(); | 68 | let guard = self.local_relay.read().unwrap(); |
| 73 | guard.clone() | 69 | guard.clone() |
| 74 | } | 70 | } |
| 75 | |||
| 76 | /// Set the repo sync index after SyncManager has been created. | ||
| 77 | /// | ||
| 78 | /// This allows purgatory announcements submitted by users to be registered | ||
| 79 | /// in the sync index so state event sync starts promptly. | ||
| 80 | pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { | ||
| 81 | let mut guard = self.repo_sync_index.write().unwrap(); | ||
| 82 | *guard = Some(index); | ||
| 83 | } | ||
| 84 | |||
| 85 | /// Get a clone of the repo sync index if it has been set. | ||
| 86 | pub fn get_repo_sync_index(&self) -> Option<RepoSyncIndex> { | ||
| 87 | let guard = self.repo_sync_index.read().unwrap(); | ||
| 88 | guard.clone() | ||
| 89 | } | ||
| 90 | } | 71 | } |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 3b5514b..1894738 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -680,6 +680,23 @@ impl Purgatory { | |||
| 680 | self.announcement_purgatory.len() | 680 | self.announcement_purgatory.len() |
| 681 | } | 681 | } |
| 682 | 682 | ||
| 683 | /// Collect (repo_id, relay_urls) for all announcements currently in purgatory. | ||
| 684 | /// | ||
| 685 | /// Returns a vec of `(repo_id, relay_urls)` where `repo_id` is the addressable | ||
| 686 | /// coordinate string `"30617:{pubkey_hex}:{identifier}"`. Used by the purgatory | ||
| 687 | /// announcement sync timer to register StateOnly entries in `repo_sync_index`. | ||
| 688 | pub fn announcements_for_sync(&self) -> Vec<(String, HashSet<String>)> { | ||
| 689 | self.announcement_purgatory | ||
| 690 | .iter() | ||
| 691 | .map(|entry| { | ||
| 692 | let (owner, identifier) = entry.key(); | ||
| 693 | let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); | ||
| 694 | let relays = entry.value().relays.clone(); | ||
| 695 | (repo_id, relays) | ||
| 696 | }) | ||
| 697 | .collect() | ||
| 698 | } | ||
| 699 | |||
| 683 | /// Get all event IDs currently stored in purgatory AND previously expired events. | 700 | /// Get all event IDs currently stored in purgatory AND previously expired events. |
| 684 | /// | 701 | /// |
| 685 | /// Returns a HashSet of all event IDs for: | 702 | /// Returns a HashSet of all event IDs for: |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 916e2b0..ed5b6e7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -397,6 +397,37 @@ async fn run_daily_timer( | |||
| 397 | } | 397 | } |
| 398 | } | 398 | } |
| 399 | 399 | ||
| 400 | /// Background task that periodically syncs purgatory announcements into repo_sync_index. | ||
| 401 | /// | ||
| 402 | /// Runs every 5 seconds. For each announcement currently in purgatory, ensures there | ||
| 403 | /// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` | ||
| 404 | /// which connects to the relay URLs listed in the announcement and subscribes to state | ||
| 405 | /// events (kind 30618). | ||
| 406 | /// | ||
| 407 | /// This covers two cases: | ||
| 408 | /// - Sync-path announcements: registered inline during event processing, but this | ||
| 409 | /// provides a safety net in case the inline registration was missed. | ||
| 410 | /// - User-submitted purgatory announcements: the SelfSubscriber never sees them | ||
| 411 | /// (they're rejected from DB), so this timer is the primary registration path. | ||
| 412 | async fn run_purgatory_announcement_sync( | ||
| 413 | sync_manager: Arc<Mutex<SyncManager>>, | ||
| 414 | mut shutdown_rx: broadcast::Receiver<()>, | ||
| 415 | ) { | ||
| 416 | let interval = Duration::from_secs(5); | ||
| 417 | loop { | ||
| 418 | tokio::select! { | ||
| 419 | _ = tokio::time::sleep(interval) => { | ||
| 420 | let mut manager = sync_manager.lock().await; | ||
| 421 | manager.sync_purgatory_announcements_to_index().await; | ||
| 422 | } | ||
| 423 | _ = shutdown_rx.recv() => { | ||
| 424 | tracing::debug!("Purgatory announcement sync timer received shutdown signal"); | ||
| 425 | break; | ||
| 426 | } | ||
| 427 | } | ||
| 428 | } | ||
| 429 | } | ||
| 430 | |||
| 400 | // Combined Health and Metrics Checker | 431 | // Combined Health and Metrics Checker |
| 401 | 432 | ||
| 402 | /// Background task for cleaning up expired entries from the rejected events index | 433 | /// Background task for cleaning up expired entries from the rejected events index |
| @@ -700,14 +731,6 @@ impl SyncManager { | |||
| 700 | self.rejected_events_index.save_to_disk(path) | 731 | self.rejected_events_index.save_to_disk(path) |
| 701 | } | 732 | } |
| 702 | 733 | ||
| 703 | /// Get a clone of the repo sync index Arc. | ||
| 704 | /// | ||
| 705 | /// This allows the write policy to register user-submitted purgatory announcements | ||
| 706 | /// in the sync index so that state event sync starts promptly. | ||
| 707 | pub fn repo_sync_index(&self) -> RepoSyncIndex { | ||
| 708 | self.repo_sync_index.clone() | ||
| 709 | } | ||
| 710 | |||
| 711 | /// Handle EOSE (End Of Stored Events) for a subscription | 734 | /// Handle EOSE (End Of Stored Events) for a subscription |
| 712 | /// | 735 | /// |
| 713 | /// This method: | 736 | /// This method: |
| @@ -1560,7 +1583,17 @@ impl SyncManager { | |||
| 1560 | run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; | 1583 | run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; |
| 1561 | }); | 1584 | }); |
| 1562 | 1585 | ||
| 1563 | // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 1586 | // 11. Spawn purgatory announcement sync timer (every 5s) |
| 1587 | // Ensures purgatory announcements (including user-submitted ones that never | ||
| 1588 | // touch the DB) are registered in repo_sync_index as StateOnly so that | ||
| 1589 | // state event subscriptions are established on their listed relay URLs. | ||
| 1590 | let purgatory_sync_manager = Arc::clone(&sync_manager); | ||
| 1591 | let purgatory_sync_shutdown = shutdown_tx.subscribe(); | ||
| 1592 | tokio::spawn(async move { | ||
| 1593 | run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await; | ||
| 1594 | }); | ||
| 1595 | |||
| 1596 | // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | ||
| 1564 | loop { | 1597 | loop { |
| 1565 | // Wait for an event without holding the lock | 1598 | // Wait for an event without holding the lock |
| 1566 | tokio::select! { | 1599 | tokio::select! { |
| @@ -2419,6 +2452,80 @@ impl SyncManager { | |||
| 2419 | } | 2452 | } |
| 2420 | } | 2453 | } |
| 2421 | 2454 | ||
| 2455 | /// Sync purgatory announcements into repo_sync_index as StateOnly entries. | ||
| 2456 | /// | ||
| 2457 | /// Called periodically by the purgatory announcement sync timer (every 5s). | ||
| 2458 | /// For each announcement currently in purgatory, ensures a `StateOnly` entry | ||
| 2459 | /// exists in `repo_sync_index`. New entries are then picked up by | ||
| 2460 | /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes | ||
| 2461 | /// to state events for that repo. | ||
| 2462 | /// | ||
| 2463 | /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full). | ||
| 2464 | async fn sync_purgatory_announcements_to_index(&mut self) { | ||
| 2465 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | ||
| 2466 | |||
| 2467 | // Collect all purgatory announcements (snapshot - no async holds) | ||
| 2468 | let announcements = self.purgatory.announcements_for_sync(); | ||
| 2469 | |||
| 2470 | if announcements.is_empty() { | ||
| 2471 | return; | ||
| 2472 | } | ||
| 2473 | |||
| 2474 | // Register any new entries in repo_sync_index as StateOnly | ||
| 2475 | let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new(); | ||
| 2476 | { | ||
| 2477 | let mut index = self.repo_sync_index.write().await; | ||
| 2478 | for (repo_id, relays) in &announcements { | ||
| 2479 | let entry = index.entry(repo_id.clone()).or_insert_with(|| { | ||
| 2480 | tracing::debug!( | ||
| 2481 | repo_id = %repo_id, | ||
| 2482 | "Registering purgatory announcement in repo_sync_index as StateOnly" | ||
| 2483 | ); | ||
| 2484 | RepoSyncNeeds { | ||
| 2485 | relays: std::collections::HashSet::new(), | ||
| 2486 | root_events: std::collections::HashSet::new(), | ||
| 2487 | sync_level: SyncLevel::StateOnly, | ||
| 2488 | } | ||
| 2489 | }); | ||
| 2490 | // Don't downgrade an already-Full entry | ||
| 2491 | // Add any new relay URLs | ||
| 2492 | for relay in relays { | ||
| 2493 | if entry.relays.insert(relay.clone()) { | ||
| 2494 | new_relay_urls.insert(relay.clone()); | ||
| 2495 | } | ||
| 2496 | } | ||
| 2497 | } | ||
| 2498 | } | ||
| 2499 | |||
| 2500 | if new_relay_urls.is_empty() { | ||
| 2501 | return; | ||
| 2502 | } | ||
| 2503 | |||
| 2504 | // For any relay URLs that are new, compute and send AddFilters actions | ||
| 2505 | let all_targets = { | ||
| 2506 | let repo_index = self.repo_sync_index.read().await; | ||
| 2507 | derive_relay_targets(&repo_index) | ||
| 2508 | }; | ||
| 2509 | |||
| 2510 | let actions = { | ||
| 2511 | let pending_index = self.pending_sync_index.read().await; | ||
| 2512 | let relay_index = self.relay_sync_index.read().await; | ||
| 2513 | compute_actions(&all_targets, &pending_index, &relay_index) | ||
| 2514 | }; | ||
| 2515 | |||
| 2516 | for action in actions { | ||
| 2517 | // Only act on relays that have new URLs (avoids redundant work) | ||
| 2518 | if new_relay_urls.contains(&action.relay_url) { | ||
| 2519 | tracing::info!( | ||
| 2520 | relay = %action.relay_url, | ||
| 2521 | repos = action.items.repos.len(), | ||
| 2522 | "Purgatory sync timer: connecting to new relay from purgatory announcement" | ||
| 2523 | ); | ||
| 2524 | self.handle_new_sync_filters(action).await; | ||
| 2525 | } | ||
| 2526 | } | ||
| 2527 | } | ||
| 2528 | |||
| 2422 | /// Handle a relay disconnection | 2529 | /// Handle a relay disconnection |
| 2423 | /// | 2530 | /// |
| 2424 | /// This method is called when the event loop terminates and sends a disconnect notification. | 2531 | /// This method is called when the event loop terminates and sends a disconnect notification. |