upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/git/handlers.rs1
-rw-r--r--src/git/sync.rs21
-rw-r--r--src/main.rs20
-rw-r--r--src/nostr/builder.rs118
-rw-r--r--src/nostr/policy/mod.rs37
-rw-r--r--src/purgatory/sync/context.rs116
6 files changed, 313 insertions, 0 deletions
diff --git a/src/git/handlers.rs b/src/git/handlers.rs
index 017eee4..129ca2c 100644
--- a/src/git/handlers.rs
+++ b/src/git/handlers.rs
@@ -307,6 +307,7 @@ pub async fn handle_receive_pack(
307 Some(&relay), 307 Some(&relay),
308 &purgatory, 308 &purgatory,
309 git_data_path_buf, 309 git_data_path_buf,
310 None,
310 ) 311 )
311 .await 312 .await
312 { 313 {
diff --git a/src/git/sync.rs b/src/git/sync.rs
index 4b35023..b3fa11a 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -44,6 +44,7 @@ use crate::git::{self, oid_exists};
44use crate::nostr::builder::SharedDatabase; 44use crate::nostr::builder::SharedDatabase;
45use crate::nostr::events::RepositoryState; 45use crate::nostr::events::RepositoryState;
46use crate::purgatory::{can_apply_state, Purgatory}; 46use crate::purgatory::{can_apply_state, Purgatory};
47use crate::sync::{RepoSyncIndex, SyncLevel};
47 48
48/// Result of processing newly available git data. 49/// Result of processing newly available git data.
49/// 50///
@@ -809,6 +810,7 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option<String> {
809/// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) 810/// * `local_relay` - Local relay for notifying WebSocket subscribers (optional)
810/// * `purgatory` - Purgatory instance to check for satisfiable events 811/// * `purgatory` - Purgatory instance to check for satisfiable events
811/// * `git_data_path` - Base path for git repositories 812/// * `git_data_path` - Base path for git repositories
813/// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion
812/// 814///
813/// # Returns 815/// # Returns
814/// A `ProcessResult` describing what was processed 816/// A `ProcessResult` describing what was processed
@@ -819,6 +821,7 @@ pub async fn process_newly_available_git_data(
819 local_relay: Option<&nostr_relay_builder::LocalRelay>, 821 local_relay: Option<&nostr_relay_builder::LocalRelay>,
820 purgatory: &Purgatory, 822 purgatory: &Purgatory,
821 git_data_path: &Path, 823 git_data_path: &Path,
824 repo_sync_index: Option<RepoSyncIndex>,
822) -> anyhow::Result<ProcessResult> { 825) -> anyhow::Result<ProcessResult> {
823 let mut result = ProcessResult::default(); 826 let mut result = ProcessResult::default();
824 827
@@ -848,6 +851,7 @@ pub async fn process_newly_available_git_data(
848 local_relay, 851 local_relay,
849 purgatory, 852 purgatory,
850 git_data_path, 853 git_data_path,
854 repo_sync_index.as_ref(),
851 ) 855 )
852 .await; 856 .await;
853 result.merge(announcement_result); 857 result.merge(announcement_result);
@@ -1284,6 +1288,7 @@ async fn process_purgatory_announcements(
1284 local_relay: Option<&nostr_relay_builder::LocalRelay>, 1288 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1285 purgatory: &Purgatory, 1289 purgatory: &Purgatory,
1286 git_data_path: &Path, 1290 git_data_path: &Path,
1291 repo_sync_index: Option<&RepoSyncIndex>,
1287) -> ProcessResult { 1292) -> ProcessResult {
1288 let mut result = ProcessResult::default(); 1293 let mut result = ProcessResult::default();
1289 1294
@@ -1338,6 +1343,22 @@ async fn process_purgatory_announcements(
1338 } 1343 }
1339 } 1344 }
1340 1345
1346 // Upgrade sync level to Full in repo_sync_index
1347 if let Some(index) = repo_sync_index {
1348 let mut index = index.write().await;
1349 // Use hex pubkey format to match how repo_sync_index keys are built
1350 // (sync/mod.rs uses event.pubkey which is hex, not bech32)
1351 let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier);
1352 if let Some(entry) = index.get_mut(&repo_id) {
1353 entry.sync_level = SyncLevel::Full;
1354 debug!(
1355 identifier = %identifier,
1356 repo_id = %repo_id,
1357 "Upgraded sync level to Full after announcement promotion"
1358 );
1359 }
1360 }
1361
1341 result.announcements_released += 1; 1362 result.announcements_released += 1;
1342 } 1363 }
1343 Err(e) => { 1364 Err(e) => {
diff --git a/src/main.rs b/src/main.rs
index ab6ede7..3ff30fb 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -132,6 +132,24 @@ async fn main() -> Result<()> {
132 // Get a reference to the rejected events index for shutdown persistence 132 // Get a reference to the rejected events index for shutdown persistence
133 let shutdown_rejected_index = sync_manager.rejected_events_index(); 133 let shutdown_rejected_index = sync_manager.rejected_events_index();
134 134
135 // Get a reference to the repo sync index for upgrading sync levels on promotion
136 let repo_sync_index = sync_manager.repo_sync_index();
137
138 // Set the repo sync index on the write policy so user-submitted purgatory
139 // announcements can trigger relay discovery (connect to relays in announcement tags)
140 relay_with_db
141 .write_policy
142 .set_repo_sync_index(repo_sync_index.clone());
143
144 // Get the action sender BEFORE consuming sync_manager with spawn
145 let action_tx = sync_manager.action_tx();
146
147 // Set the sync action sender so the write policy can trigger relay connections
148 // when user-submitted purgatory announcements are registered with StateOnly level
149 if let Some(tx) = action_tx.clone() {
150 relay_with_db.write_policy.set_sync_action_tx(tx);
151 }
152
135 tokio::spawn(async move { 153 tokio::spawn(async move {
136 sync_manager.run().await; 154 sync_manager.run().await;
137 }); 155 });
@@ -184,6 +202,8 @@ async fn main() -> Result<()> {
184 Some(config.domain.clone()), 202 Some(config.domain.clone()),
185 Some(relay_with_db.relay.clone()), 203 Some(relay_with_db.relay.clone()),
186 git_naughty_list.clone(), 204 git_naughty_list.clone(),
205 Some(repo_sync_index),
206 action_tx,
187 )); 207 ));
188 208
189 // Create throttle manager for rate limiting remote git servers 209 // Create throttle manager for rate limiting remote git servers
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index aff12a6..4c66f6d 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -98,6 +98,24 @@ impl Nip34WritePolicy {
98 self.ctx.set_local_relay(relay); 98 self.ctx.set_local_relay(relay);
99 } 99 }
100 100
101 /// Set the repo sync index for relay discovery from user-submitted purgatory announcements.
102 ///
103 /// When a user submits an announcement that goes to purgatory (no git data yet),
104 /// the relay needs to discover and connect to relays listed in the announcement's
105 /// `relays` and `clone` tags. This index is updated when the announcement is accepted
106 /// into purgatory, triggering the sync system to connect and sync state events.
107 pub fn set_repo_sync_index(&self, index: crate::sync::RepoSyncIndex) {
108 self.ctx.set_repo_sync_index(index);
109 }
110
111 /// Set the sync action sender for sending AddFilters actions to SyncManager.
112 ///
113 /// This allows the write policy to notify the SyncManager when user-submitted
114 /// purgatory announcements need relay discovery (triggering new connections).
115 pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender<crate::sync::AddFilters>) {
116 self.ctx.set_sync_action_tx(tx);
117 }
118
101 /// Handle repository announcement event 119 /// Handle repository announcement event
102 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { 120 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult {
103 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); 121 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
@@ -146,6 +164,106 @@ impl Nip34WritePolicy {
146 "Accepted announcement to purgatory: {} (waiting for git data)", 164 "Accepted announcement to purgatory: {} (waiting for git data)",
147 event_id_str 165 event_id_str
148 ); 166 );
167
168 // Register in repo_sync_index with StateOnly level so the sync
169 // system discovers and connects to relays listed in this announcement.
170 // This is needed for user-submitted announcements (not via sync path)
171 // to trigger relay discovery and state event sync.
172 if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() {
173 if let Some(identifier) = event.tags.iter().find_map(|tag| {
174 let tag_vec = tag.as_slice();
175 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
176 Some(tag_vec[1].to_string())
177 } else {
178 None
179 }
180 }) {
181 let repo_id =
182 format!("30617:{}:{}", event.pubkey, identifier);
183
184 // Get relay URLs stored in purgatory for this announcement
185 let relays = self
186 .ctx
187 .purgatory
188 .find_announcement(&event.pubkey, &identifier)
189 .map(|entry| entry.relays)
190 .unwrap_or_default();
191
192 if !relays.is_empty() {
193 use crate::sync::{
194 AddFilters, PendingItems, RepoSyncNeeds, SyncLevel,
195 };
196
197 // Update repo_sync_index with StateOnly for this repo
198 let new_repos = {
199 let mut index = repo_sync_index.write().await;
200 let entry =
201 index.entry(repo_id.clone()).or_insert_with(|| {
202 RepoSyncNeeds {
203 relays: std::collections::HashSet::new(),
204 root_events: std::collections::HashSet::new(),
205 sync_level: SyncLevel::StateOnly,
206 }
207 });
208 entry.relays.extend(relays.iter().cloned());
209 // Don't upgrade if already Full
210 tracing::info!(
211 repo_id = %repo_id,
212 relay_count = entry.relays.len(),
213 "Registered user-submitted purgatory announcement in \
214 RepoSyncIndex with StateOnly level for relay discovery"
215 );
216 // Return cloned relays for AddFilters
217 relays.clone()
218 };
219
220 // Send AddFilters to SyncManager so it connects to these relays
221 if let Some(tx) = self.ctx.get_sync_action_tx() {
222 // Build state-only filters for this repo
223 let state_only_repos: std::collections::HashSet<String> =
224 std::iter::once(repo_id.clone()).collect();
225 let filters =
226 crate::sync::filters::build_sync_level_aware_filters(
227 &std::collections::HashSet::new(),
228 &state_only_repos,
229 &std::collections::HashSet::new(),
230 None,
231 );
232
233 for relay_url in new_repos {
234 // Skip our own domain
235 if relay_url.contains(&self.ctx.domain) {
236 continue;
237 }
238 let action = AddFilters {
239 relay_url: relay_url.clone(),
240 items: PendingItems {
241 repos: state_only_repos.clone(),
242 root_events: std::collections::HashSet::new(),
243 },
244 filters: filters.clone(),
245 };
246 if let Err(e) = tx.send(action).await {
247 tracing::warn!(
248 relay = %relay_url,
249 error = %e,
250 "Failed to send AddFilters action for \
251 user-submitted purgatory announcement"
252 );
253 } else {
254 tracing::info!(
255 relay = %relay_url,
256 repo_id = %repo_id,
257 "Sent AddFilters to SyncManager for \
258 user-submitted purgatory announcement relay"
259 );
260 }
261 }
262 }
263 }
264 }
265 }
266
149 WritePolicyResult::Reject { 267 WritePolicyResult::Reject {
150 status: true, // Client sees OK 268 status: true, // Client sees OK
151 message: "purgatory: won't be served until git data arrives".into(), 269 message: "purgatory: won't be served until git data arrives".into(),
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs
index 1566b6c..78a09fc 100644
--- a/src/nostr/policy/mod.rs
+++ b/src/nostr/policy/mod.rs
@@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult;
20 20
21use super::SharedDatabase; 21use super::SharedDatabase;
22use crate::purgatory::Purgatory; 22use crate::purgatory::Purgatory;
23use crate::sync::{AddFilters, RepoSyncIndex};
23use nostr_relay_builder::LocalRelay; 24use nostr_relay_builder::LocalRelay;
24use std::sync::Arc; 25use std::sync::Arc;
25 26
@@ -34,6 +35,16 @@ pub struct PolicyContext {
34 pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, 35 pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>,
35 /// Configuration reference for policy settings (includes blacklists) 36 /// Configuration reference for policy settings (includes blacklists)
36 pub config: crate::config::Config, 37 pub config: crate::config::Config,
38 /// Optional repo sync index for triggering relay discovery when announcements
39 /// go to purgatory via user submission (not via the sync path).
40 /// Wrapped in Arc<RwLock> for interior mutability (PolicyContext is Clone).
41 pub repo_sync_index: Arc<std::sync::RwLock<Option<RepoSyncIndex>>>,
42 /// Optional sender for AddFilters actions to SyncManager.
43 /// Used to trigger relay discovery when user-submitted purgatory announcements
44 /// are registered with StateOnly sync level.
45 /// Wrapped in Arc<RwLock> for interior mutability (PolicyContext is Clone).
46 pub sync_action_tx:
47 Arc<std::sync::RwLock<Option<tokio::sync::mpsc::Sender<AddFilters>>>>,
37} 48}
38 49
39impl PolicyContext { 50impl PolicyContext {
@@ -51,6 +62,8 @@ impl PolicyContext {
51 purgatory, 62 purgatory,
52 local_relay: Arc::new(std::sync::RwLock::new(None)), 63 local_relay: Arc::new(std::sync::RwLock::new(None)),
53 config, 64 config,
65 repo_sync_index: Arc::new(std::sync::RwLock::new(None)),
66 sync_action_tx: Arc::new(std::sync::RwLock::new(None)),
54 } 67 }
55 } 68 }
56 69
@@ -68,4 +81,28 @@ impl PolicyContext {
68 let guard = self.local_relay.read().unwrap(); 81 let guard = self.local_relay.read().unwrap();
69 guard.clone() 82 guard.clone()
70 } 83 }
84
85 /// Set the repo sync index for relay discovery from user-submitted purgatory announcements.
86 pub fn set_repo_sync_index(&self, index: RepoSyncIndex) {
87 let mut guard = self.repo_sync_index.write().unwrap();
88 *guard = Some(index);
89 }
90
91 /// Get a clone of the repo sync index if it's been set.
92 pub fn get_repo_sync_index(&self) -> Option<RepoSyncIndex> {
93 let guard = self.repo_sync_index.read().unwrap();
94 guard.clone()
95 }
96
97 /// Set the sync action sender for sending AddFilters actions to SyncManager.
98 pub fn set_sync_action_tx(&self, tx: tokio::sync::mpsc::Sender<AddFilters>) {
99 let mut guard = self.sync_action_tx.write().unwrap();
100 *guard = Some(tx);
101 }
102
103 /// Get a clone of the sync action sender if it's been set.
104 pub fn get_sync_action_tx(&self) -> Option<tokio::sync::mpsc::Sender<AddFilters>> {
105 let guard = self.sync_action_tx.read().unwrap();
106 guard.clone()
107 }
71} 108}
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 3568e89..4dbb402 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -193,6 +193,7 @@ use crate::nostr::builder::SharedDatabase;
193use crate::nostr::events::RepositoryState; 193use crate::nostr::events::RepositoryState;
194use crate::purgatory::Purgatory; 194use crate::purgatory::Purgatory;
195use crate::sync::naughty_list::NaughtyListTracker; 195use crate::sync::naughty_list::NaughtyListTracker;
196use crate::sync::RepoSyncIndex;
196 197
197use super::functions::extract_domain; 198use super::functions::extract_domain;
198 199
@@ -221,6 +222,13 @@ pub struct RealSyncContext {
221 222
222 /// Naughty list tracker for git remote domains with persistent errors 223 /// Naughty list tracker for git remote domains with persistent errors
223 git_naughty_list: Arc<NaughtyListTracker>, 224 git_naughty_list: Arc<NaughtyListTracker>,
225
226 /// Optional repo sync index for upgrading sync level on promotion
227 repo_sync_index: Option<RepoSyncIndex>,
228
229 /// Optional sender for AddFilters actions to SyncManager.
230 /// Used after announcement promotion to trigger PR event subscription on connected relays.
231 sync_action_tx: Option<tokio::sync::mpsc::Sender<crate::sync::AddFilters>>,
224} 232}
225 233
226impl RealSyncContext { 234impl RealSyncContext {
@@ -233,6 +241,9 @@ impl RealSyncContext {
233 /// * `our_domain` - Our domain to exclude from clone URLs 241 /// * `our_domain` - Our domain to exclude from clone URLs
234 /// * `local_relay` - Local relay for WebSocket notifications 242 /// * `local_relay` - Local relay for WebSocket notifications
235 /// * `git_naughty_list` - Naughty list tracker for git remote domains 243 /// * `git_naughty_list` - Naughty list tracker for git remote domains
244 /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion
245 /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion
246 #[allow(clippy::too_many_arguments)]
236 pub fn new( 247 pub fn new(
237 purgatory: Arc<Purgatory>, 248 purgatory: Arc<Purgatory>,
238 database: SharedDatabase, 249 database: SharedDatabase,
@@ -240,6 +251,8 @@ impl RealSyncContext {
240 our_domain: Option<String>, 251 our_domain: Option<String>,
241 local_relay: Option<LocalRelay>, 252 local_relay: Option<LocalRelay>,
242 git_naughty_list: Arc<NaughtyListTracker>, 253 git_naughty_list: Arc<NaughtyListTracker>,
254 repo_sync_index: Option<RepoSyncIndex>,
255 sync_action_tx: Option<tokio::sync::mpsc::Sender<crate::sync::AddFilters>>,
243 ) -> Self { 256 ) -> Self {
244 Self { 257 Self {
245 purgatory, 258 purgatory,
@@ -248,9 +261,23 @@ impl RealSyncContext {
248 our_domain_value: our_domain, 261 our_domain_value: our_domain,
249 local_relay, 262 local_relay,
250 git_naughty_list, 263 git_naughty_list,
264 repo_sync_index,
265 sync_action_tx,
251 } 266 }
252 } 267 }
253 268
269 /// Set the sync action sender for triggering filter recomputation after announcement promotion.
270 ///
271 /// When an announcement is promoted from purgatory to Full sync level, the SyncManager
272 /// needs to subscribe to PR events for that repo on all connected relays. This sender
273 /// is used to trigger that subscription.
274 pub fn set_sync_action_tx(
275 &mut self,
276 tx: tokio::sync::mpsc::Sender<crate::sync::AddFilters>,
277 ) {
278 self.sync_action_tx = Some(tx);
279 }
280
254 /// Get reference to the git naughty list tracker 281 /// Get reference to the git naughty list tracker
255 pub fn git_naughty_list(&self) -> &Arc<NaughtyListTracker> { 282 pub fn git_naughty_list(&self) -> &Arc<NaughtyListTracker> {
256 &self.git_naughty_list 283 &self.git_naughty_list
@@ -482,9 +509,98 @@ impl SyncContext for RealSyncContext {
482 self.local_relay.as_ref(), 509 self.local_relay.as_ref(),
483 &self.purgatory, 510 &self.purgatory,
484 &self.git_data_path, 511 &self.git_data_path,
512 self.repo_sync_index.clone(),
485 ) 513 )
486 .await?; 514 .await?;
487 515
516 // If announcements were promoted (now Full sync level), notify SyncManager to
517 // recompute filters so PR event subscriptions are created on connected relays.
518 if result.announcements_released > 0 {
519 if let (Some(ref tx), Some(ref repo_sync_index)) =
520 (&self.sync_action_tx, &self.repo_sync_index)
521 {
522 let index = repo_sync_index.read().await;
523 for (repo_id, needs) in index.iter() {
524 if needs.sync_level == crate::sync::SyncLevel::Full
525 && !needs.root_events.is_empty()
526 {
527 // Send AddFilters for Full repos with root events
528 for relay_url in &needs.relays {
529 if let Some(ref domain) = self.our_domain_value {
530 if relay_url.contains(domain.as_str()) {
531 continue;
532 }
533 }
534 let full_repos: std::collections::HashSet<String> =
535 std::iter::once(repo_id.clone()).collect();
536 let filters =
537 crate::sync::filters::build_sync_level_aware_filters(
538 &full_repos,
539 &std::collections::HashSet::new(),
540 &needs.root_events,
541 None,
542 );
543 let action = crate::sync::AddFilters {
544 relay_url: relay_url.clone(),
545 items: crate::sync::PendingItems {
546 repos: full_repos.clone(),
547 root_events: needs.root_events.clone(),
548 },
549 filters,
550 };
551 if let Err(e) = tx.send(action).await {
552 debug!(
553 relay = %relay_url,
554 error = %e,
555 "Failed to send AddFilters after announcement promotion"
556 );
557 } else {
558 debug!(
559 relay = %relay_url,
560 repo_id = %repo_id,
561 "Sent AddFilters to SyncManager after announcement promotion"
562 );
563 }
564 }
565 } else if needs.sync_level == crate::sync::SyncLevel::Full {
566 // Even without root_events, send empty repo filter to ensure
567 // Layer 2 subscriptions (PR events) are set up
568 for relay_url in &needs.relays {
569 if let Some(ref domain) = self.our_domain_value {
570 if relay_url.contains(domain.as_str()) {
571 continue;
572 }
573 }
574 let full_repos: std::collections::HashSet<String> =
575 std::iter::once(repo_id.clone()).collect();
576 let filters =
577 crate::sync::filters::build_sync_level_aware_filters(
578 &full_repos,
579 &std::collections::HashSet::new(),
580 &std::collections::HashSet::new(),
581 None,
582 );
583 let action = crate::sync::AddFilters {
584 relay_url: relay_url.clone(),
585 items: crate::sync::PendingItems {
586 repos: full_repos.clone(),
587 root_events: std::collections::HashSet::new(),
588 },
589 filters,
590 };
591 if let Err(e) = tx.send(action).await {
592 debug!(
593 relay = %relay_url,
594 error = %e,
595 "Failed to send AddFilters (no root_events) after announcement promotion"
596 );
597 }
598 }
599 }
600 }
601 }
602 }
603
488 // Convert from git::sync::ProcessResult to our ProcessResult 604 // Convert from git::sync::ProcessResult to our ProcessResult
489 Ok(ProcessResult { 605 Ok(ProcessResult {
490 states_released: result.states_released, 606 states_released: result.states_released,