upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main.rs7
-rw-r--r--src/nostr/builder.rs54
-rw-r--r--src/nostr/policy/mod.rs19
-rw-r--r--src/purgatory/mod.rs17
-rw-r--r--src/sync/mod.rs125
5 files changed, 134 insertions, 88 deletions
diff --git a/src/main.rs b/src/main.rs
index ebe05a3..ab6ede7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -132,13 +132,6 @@ async fn main() -> Result<()> {
132 // Get a reference to the rejected events index for shutdown persistence 132 // Get a reference to the rejected events index for shutdown persistence
133 let shutdown_rejected_index = sync_manager.rejected_events_index(); 133 let shutdown_rejected_index = sync_manager.rejected_events_index();
134 134
135 // Wire repo_sync_index into write policy so user-submitted purgatory announcements
136 // get registered for state event sync immediately (Fix 3).
137 let repo_sync_index = sync_manager.repo_sync_index();
138 relay_with_db
139 .write_policy
140 .set_repo_sync_index(repo_sync_index);
141
142 tokio::spawn(async move { 135 tokio::spawn(async move {
143 sync_manager.run().await; 136 sync_manager.run().await;
144 }); 137 });
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 8d1e461..c2d4939 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -17,7 +17,7 @@ use crate::nostr::policy::{
17 AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, 17 AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult,
18 RelatedEventPolicy, StatePolicy, StateResult, 18 RelatedEventPolicy, StatePolicy, StateResult,
19}; 19};
20use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; 20
21 21
22/// Type alias for the shared database used by the relay 22/// Type alias for the shared database used by the relay
23pub type SharedDatabase = Arc<dyn NostrDatabase>; 23pub type SharedDatabase = Arc<dyn NostrDatabase>;
@@ -99,14 +99,6 @@ impl Nip34WritePolicy {
99 self.ctx.set_local_relay(relay); 99 self.ctx.set_local_relay(relay);
100 } 100 }
101 101
102 /// Set the repo sync index so that user-submitted purgatory announcements can
103 /// be registered for state event sync immediately.
104 ///
105 /// This must be called after SyncManager is created.
106 pub fn set_repo_sync_index(&self, index: RepoSyncIndex) {
107 self.ctx.set_repo_sync_index(index);
108 }
109
110 /// Handle repository announcement event 102 /// Handle repository announcement event
111 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { 103 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult {
112 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); 104 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
@@ -156,50 +148,6 @@ impl Nip34WritePolicy {
156 event_id_str 148 event_id_str
157 ); 149 );
158 150
159 // Register repo in repo_sync_index with StateOnly level so that
160 // state event sync starts promptly via the next batch EOSE recompute.
161 // This handles user-submitted purgatory announcements - the SelfSubscriber
162 // only sees DB events, so it won't pick these up automatically.
163 if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() {
164 if let Ok(announcement) =
165 RepositoryAnnouncement::from_event(event.clone())
166 {
167 use std::collections::HashSet;
168 let repo_id = format!(
169 "30617:{}:{}",
170 event.pubkey,
171 announcement.identifier
172 );
173
174 // Extract relay URLs from the announcement event tags
175 let relays: HashSet<String> = event
176 .tags
177 .iter()
178 .flat_map(|tag| {
179 let tag_vec = tag.as_slice();
180 if !tag_vec.is_empty() && tag_vec[0] == "relays" {
181 tag_vec[1..].iter().map(|s| s.to_string()).collect::<Vec<_>>()
182 } else {
183 vec![]
184 }
185 })
186 .collect();
187
188 let mut index = repo_sync_index.write().await;
189 index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds {
190 relays,
191 root_events: HashSet::new(),
192 sync_level: SyncLevel::StateOnly,
193 });
194 drop(index);
195
196 tracing::debug!(
197 repo_id = %repo_id,
198 "Registered purgatory announcement in repo_sync_index as StateOnly"
199 );
200 }
201 }
202
203 WritePolicyResult::Reject { 151 WritePolicyResult::Reject {
204 status: true, // Client sees OK 152 status: true, // Client sees OK
205 message: "purgatory: won't be served until git data arrives".into(), 153 message: "purgatory: won't be served until git data arrives".into(),
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs
index c958586..1566b6c 100644
--- a/src/nostr/policy/mod.rs
+++ b/src/nostr/policy/mod.rs
@@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult;
20 20
21use super::SharedDatabase; 21use super::SharedDatabase;
22use crate::purgatory::Purgatory; 22use crate::purgatory::Purgatory;
23use crate::sync::RepoSyncIndex;
24use nostr_relay_builder::LocalRelay; 23use nostr_relay_builder::LocalRelay;
25use std::sync::Arc; 24use std::sync::Arc;
26 25
@@ -35,8 +34,6 @@ pub struct PolicyContext {
35 pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, 34 pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>,
36 /// Configuration reference for policy settings (includes blacklists) 35 /// Configuration reference for policy settings (includes blacklists)
37 pub config: crate::config::Config, 36 pub config: crate::config::Config,
38 /// Repo sync index for registering purgatory announcements (set after SyncManager creation)
39 pub repo_sync_index: Arc<std::sync::RwLock<Option<RepoSyncIndex>>>,
40} 37}
41 38
42impl PolicyContext { 39impl PolicyContext {
@@ -54,7 +51,6 @@ impl PolicyContext {
54 purgatory, 51 purgatory,
55 local_relay: Arc::new(std::sync::RwLock::new(None)), 52 local_relay: Arc::new(std::sync::RwLock::new(None)),
56 config, 53 config,
57 repo_sync_index: Arc::new(std::sync::RwLock::new(None)),
58 } 54 }
59 } 55 }
60 56
@@ -72,19 +68,4 @@ impl PolicyContext {
72 let guard = self.local_relay.read().unwrap(); 68 let guard = self.local_relay.read().unwrap();
73 guard.clone() 69 guard.clone()
74 } 70 }
75
76 /// Set the repo sync index after SyncManager has been created.
77 ///
78 /// This allows purgatory announcements submitted by users to be registered
79 /// in the sync index so state event sync starts promptly.
80 pub fn set_repo_sync_index(&self, index: RepoSyncIndex) {
81 let mut guard = self.repo_sync_index.write().unwrap();
82 *guard = Some(index);
83 }
84
85 /// Get a clone of the repo sync index if it has been set.
86 pub fn get_repo_sync_index(&self) -> Option<RepoSyncIndex> {
87 let guard = self.repo_sync_index.read().unwrap();
88 guard.clone()
89 }
90} 71}
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 3b5514b..1894738 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -680,6 +680,23 @@ impl Purgatory {
680 self.announcement_purgatory.len() 680 self.announcement_purgatory.len()
681 } 681 }
682 682
683 /// Collect (repo_id, relay_urls) for all announcements currently in purgatory.
684 ///
685 /// Returns a vec of `(repo_id, relay_urls)` where `repo_id` is the addressable
686 /// coordinate string `"30617:{pubkey_hex}:{identifier}"`. Used by the purgatory
687 /// announcement sync timer to register StateOnly entries in `repo_sync_index`.
688 pub fn announcements_for_sync(&self) -> Vec<(String, HashSet<String>)> {
689 self.announcement_purgatory
690 .iter()
691 .map(|entry| {
692 let (owner, identifier) = entry.key();
693 let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier);
694 let relays = entry.value().relays.clone();
695 (repo_id, relays)
696 })
697 .collect()
698 }
699
683 /// Get all event IDs currently stored in purgatory AND previously expired events. 700 /// Get all event IDs currently stored in purgatory AND previously expired events.
684 /// 701 ///
685 /// Returns a HashSet of all event IDs for: 702 /// Returns a HashSet of all event IDs for:
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 916e2b0..ed5b6e7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -397,6 +397,37 @@ async fn run_daily_timer(
397 } 397 }
398} 398}
399 399
400/// Background task that periodically syncs purgatory announcements into repo_sync_index.
401///
402/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there
403/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters`
404/// which connects to the relay URLs listed in the announcement and subscribes to state
405/// events (kind 30618).
406///
407/// This covers two cases:
408/// - Sync-path announcements: registered inline during event processing, but this
409/// provides a safety net in case the inline registration was missed.
410/// - User-submitted purgatory announcements: the SelfSubscriber never sees them
411/// (they're rejected from DB), so this timer is the primary registration path.
412async fn run_purgatory_announcement_sync(
413 sync_manager: Arc<Mutex<SyncManager>>,
414 mut shutdown_rx: broadcast::Receiver<()>,
415) {
416 let interval = Duration::from_secs(5);
417 loop {
418 tokio::select! {
419 _ = tokio::time::sleep(interval) => {
420 let mut manager = sync_manager.lock().await;
421 manager.sync_purgatory_announcements_to_index().await;
422 }
423 _ = shutdown_rx.recv() => {
424 tracing::debug!("Purgatory announcement sync timer received shutdown signal");
425 break;
426 }
427 }
428 }
429}
430
400// Combined Health and Metrics Checker 431// Combined Health and Metrics Checker
401 432
402/// Background task for cleaning up expired entries from the rejected events index 433/// Background task for cleaning up expired entries from the rejected events index
@@ -700,14 +731,6 @@ impl SyncManager {
700 self.rejected_events_index.save_to_disk(path) 731 self.rejected_events_index.save_to_disk(path)
701 } 732 }
702 733
703 /// Get a clone of the repo sync index Arc.
704 ///
705 /// This allows the write policy to register user-submitted purgatory announcements
706 /// in the sync index so that state event sync starts promptly.
707 pub fn repo_sync_index(&self) -> RepoSyncIndex {
708 self.repo_sync_index.clone()
709 }
710
711 /// Handle EOSE (End Of Stored Events) for a subscription 734 /// Handle EOSE (End Of Stored Events) for a subscription
712 /// 735 ///
713 /// This method: 736 /// This method:
@@ -1560,7 +1583,17 @@ impl SyncManager {
1560 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; 1583 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await;
1561 }); 1584 });
1562 1585
1563 // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 1586 // 11. Spawn purgatory announcement sync timer (every 5s)
1587 // Ensures purgatory announcements (including user-submitted ones that never
1588 // touch the DB) are registered in repo_sync_index as StateOnly so that
1589 // state event subscriptions are established on their listed relay URLs.
1590 let purgatory_sync_manager = Arc::clone(&sync_manager);
1591 let purgatory_sync_shutdown = shutdown_tx.subscribe();
1592 tokio::spawn(async move {
1593 run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await;
1594 });
1595
1596 // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
1564 loop { 1597 loop {
1565 // Wait for an event without holding the lock 1598 // Wait for an event without holding the lock
1566 tokio::select! { 1599 tokio::select! {
@@ -2419,6 +2452,80 @@ impl SyncManager {
2419 } 2452 }
2420 } 2453 }
2421 2454
2455 /// Sync purgatory announcements into repo_sync_index as StateOnly entries.
2456 ///
2457 /// Called periodically by the purgatory announcement sync timer (every 5s).
2458 /// For each announcement currently in purgatory, ensures a `StateOnly` entry
2459 /// exists in `repo_sync_index`. New entries are then picked up by
2460 /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes
2461 /// to state events for that repo.
2462 ///
2463 /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full).
2464 async fn sync_purgatory_announcements_to_index(&mut self) {
2465 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
2466
2467 // Collect all purgatory announcements (snapshot - no async holds)
2468 let announcements = self.purgatory.announcements_for_sync();
2469
2470 if announcements.is_empty() {
2471 return;
2472 }
2473
2474 // Register any new entries in repo_sync_index as StateOnly
2475 let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
2476 {
2477 let mut index = self.repo_sync_index.write().await;
2478 for (repo_id, relays) in &announcements {
2479 let entry = index.entry(repo_id.clone()).or_insert_with(|| {
2480 tracing::debug!(
2481 repo_id = %repo_id,
2482 "Registering purgatory announcement in repo_sync_index as StateOnly"
2483 );
2484 RepoSyncNeeds {
2485 relays: std::collections::HashSet::new(),
2486 root_events: std::collections::HashSet::new(),
2487 sync_level: SyncLevel::StateOnly,
2488 }
2489 });
2490 // Don't downgrade an already-Full entry
2491 // Add any new relay URLs
2492 for relay in relays {
2493 if entry.relays.insert(relay.clone()) {
2494 new_relay_urls.insert(relay.clone());
2495 }
2496 }
2497 }
2498 }
2499
2500 if new_relay_urls.is_empty() {
2501 return;
2502 }
2503
2504 // For any relay URLs that are new, compute and send AddFilters actions
2505 let all_targets = {
2506 let repo_index = self.repo_sync_index.read().await;
2507 derive_relay_targets(&repo_index)
2508 };
2509
2510 let actions = {
2511 let pending_index = self.pending_sync_index.read().await;
2512 let relay_index = self.relay_sync_index.read().await;
2513 compute_actions(&all_targets, &pending_index, &relay_index)
2514 };
2515
2516 for action in actions {
2517 // Only act on relays that have new URLs (avoids redundant work)
2518 if new_relay_urls.contains(&action.relay_url) {
2519 tracing::info!(
2520 relay = %action.relay_url,
2521 repos = action.items.repos.len(),
2522 "Purgatory sync timer: connecting to new relay from purgatory announcement"
2523 );
2524 self.handle_new_sync_filters(action).await;
2525 }
2526 }
2527 }
2528
2422 /// Handle a relay disconnection 2529 /// Handle a relay disconnection
2423 /// 2530 ///
2424 /// This method is called when the event loop terminates and sends a disconnect notification. 2531 /// This method is called when the event loop terminates and sends a disconnect notification.