upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-05 13:05:17 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-05 13:05:17 +0000
commit623cae575f8c9ce33f2d7fdc2526db495f846acb (patch)
tree41b346a136a6c3d56fcd7b8e522fab96a3da7b67
parent11b1655638b5a328662187a27f85a84df60fc759 (diff)
purgatory: add state git data sync
-rw-r--r--src/git/authorization.rs14
-rw-r--r--src/main.rs6
-rw-r--r--src/nostr/policy/state.rs8
-rw-r--r--src/purgatory/mod.rs399
-rw-r--r--src/sync/mod.rs2
5 files changed, 415 insertions, 14 deletions
diff --git a/src/git/authorization.rs b/src/git/authorization.rs
index 6b997d8..e7ea99b 100644
--- a/src/git/authorization.rs
+++ b/src/git/authorization.rs
@@ -287,6 +287,20 @@ pub async fn fetch_repository_data(
287 }) 287 })
288} 288}
289 289
290pub fn pubkey_authorised_for_repo_owners(
291 pubkey: &PublicKey,
292 db_repo_data: &RepositoryData,
293) -> Vec<String> {
294 let mut repo_owners_authorising_pubkey = HashSet::new();
295 let collections = collect_authorized_maintainers(&db_repo_data.announcements);
296 for (owner, authoised) in collections {
297 if authoised.contains(&pubkey.to_hex()) {
298 repo_owners_authorising_pubkey.insert(owner.to_string());
299 }
300 }
301 repo_owners_authorising_pubkey.iter().cloned().collect()
302}
303
290/// Collect authorized maintainers grouped by owner from a set of announcements 304/// Collect authorized maintainers grouped by owner from a set of announcements
291/// 305///
292/// For each announcement, returns a map from owner pubkey to authorized maintainers: 306/// For each announcement, returns a map from owner pubkey to authorized maintainers:
diff --git a/src/main.rs b/src/main.rs
index d382462..fbe3e34 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,5 @@
1use std::sync::Arc;
2use std::time::Duration; 1use std::time::Duration;
2use std::{path::PathBuf, sync::Arc};
3 3
4use anyhow::Result; 4use anyhow::Result;
5use tokio::signal; 5use tokio::signal;
@@ -49,7 +49,9 @@ async fn main() -> Result<()> {
49 }; 49 };
50 50
51 // Create purgatory for event/git coordination 51 // Create purgatory for event/git coordination
52 let purgatory = Arc::new(Purgatory::new()); 52 let purgatory = Arc::new(Purgatory::new(PathBuf::from(
53 config.effective_git_data_path(),
54 )));
53 info!("Purgatory initialized for event coordination"); 55 info!("Purgatory initialized for event coordination");
54 56
55 // Create Nostr relay with NIP-34 validation 57 // Create Nostr relay with NIP-34 validation
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index 48435ea..9ca3ee6 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -173,6 +173,14 @@ impl StatePolicy {
173 self.ctx 173 self.ctx
174 .purgatory 174 .purgatory
175 .add_state(event.clone(), state.identifier.clone(), event.pubkey); 175 .add_state(event.clone(), state.identifier.clone(), event.pubkey);
176
177 // Trigger background git data sync from remote servers
178 self.ctx.purgatory.start_state_sync(
179 state.clone(),
180 self.ctx.database.clone(),
181 Some(self.ctx.domain.clone()),
182 );
183
176 tracing::info!( 184 tracing::info!(
177 "state event added to purgatory: eventid: {}, identifier: {}", 185 "state event added to purgatory: eventid: {}, identifier: {}",
178 state.event.id, 186 state.event.id,
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 2987f15..f0a9ac5 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -14,15 +14,25 @@
14mod helpers; 14mod helpers;
15mod types; 15mod types;
16 16
17use anyhow::{bail, Result};
17pub use helpers::{can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; 18pub use helpers::{can_satisfy_state, extract_refs_from_state, get_unpushed_refs};
18pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 19pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
19 20
20use dashmap::DashMap; 21use dashmap::DashMap;
21use nostr_sdk::prelude::*; 22use nostr_sdk::prelude::*;
22use std::collections::HashSet; 23use std::collections::HashSet;
24use std::path::{Path, PathBuf};
25use std::process::Command;
23use std::sync::Arc; 26use std::sync::Arc;
24use std::time::{Duration, Instant}; 27use std::time::{Duration, Instant};
25 28
29use crate::git::authorization::{
30 fetch_repository_data, pubkey_authorised_for_repo_owners, RepositoryData,
31};
32use crate::git::oid_exists;
33use crate::nostr::builder::SharedDatabase;
34use crate::nostr::events::RepositoryState;
35
26/// Default expiry duration for purgatory entries (30 minutes) 36/// Default expiry duration for purgatory entries (30 minutes)
27const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); 37const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800);
28 38
@@ -40,14 +50,17 @@ pub struct Purgatory {
40 /// PR events (kind 1617/1618) or placeholders indexed by event ID (hex string). 50 /// PR events (kind 1617/1618) or placeholders indexed by event ID (hex string).
41 /// Event ID is from the 'e' tag in the PR event itself. 51 /// Event ID is from the 'e' tag in the PR event itself.
42 pr_events: Arc<DashMap<String, PrPurgatoryEntry>>, 52 pr_events: Arc<DashMap<String, PrPurgatoryEntry>>,
53
54 git_data_path: PathBuf,
43} 55}
44 56
45impl Purgatory { 57impl Purgatory {
46 /// Create a new empty purgatory. 58 /// Create a new empty purgatory.
47 pub fn new() -> Self { 59 pub fn new(git_data_path: impl Into<PathBuf>) -> Self {
48 Self { 60 Self {
49 state_events: Arc::new(DashMap::new()), 61 state_events: Arc::new(DashMap::new()),
50 pr_events: Arc::new(DashMap::new()), 62 pr_events: Arc::new(DashMap::new()),
63 git_data_path: git_data_path.into(),
51 } 64 }
52 } 65 }
53 66
@@ -73,6 +86,46 @@ impl Purgatory {
73 self.state_events.entry(identifier).or_default().push(entry); 86 self.state_events.entry(identifier).or_default().push(entry);
74 } 87 }
75 88
89 /// Trigger a background git data sync for a state event.
90 ///
91 /// This method spawns a background task to attempt fetching missing git data
92 /// from remote servers listed in the repository announcements. It's called
93 /// when a state event arrives but the required git data isn't available locally.
94 ///
95 /// # Arguments
96 /// * `state` - The parsed repository state event
97 /// * `database` - Database to query for repository announcements
98 /// * `our_domain` - Our service domain to exclude from fetch targets
99 pub fn start_state_sync(
100 &self,
101 state: RepositoryState,
102 database: SharedDatabase,
103 our_domain: Option<String>,
104 ) {
105 let git_data_path = self.git_data_path.clone();
106 let identifier = state.identifier.clone();
107 let event_id = state.event.id;
108
109 tokio::spawn(async move {
110 tracing::debug!(
111 identifier = %identifier,
112 event_id = %event_id,
113 "Starting background git data sync for purgatory state event"
114 );
115
116 if let Err(e) =
117 sync_state_git_data(state, &database, &git_data_path, our_domain.as_deref()).await
118 {
119 tracing::warn!(
120 identifier = %identifier,
121 event_id = %event_id,
122 error = %e,
123 "Failed to sync git data for purgatory state event"
124 );
125 }
126 });
127 }
128
76 /// Add a PR event to purgatory. 129 /// Add a PR event to purgatory.
77 /// 130 ///
78 /// The event will expire after the default duration unless matched with git data. 131 /// The event will expire after the default duration unless matched with git data.
@@ -366,10 +419,332 @@ impl Purgatory {
366 } 419 }
367} 420}
368 421
369impl Default for Purgatory { 422/// Async function to sync git data for a state event from remote servers.
370 fn default() -> Self { 423///
371 Self::new() 424/// This function:
425/// 1. Fetches repository data from the database
426/// 2. Identifies which owners authorize the state event author
427/// 3. Collects clone URLs from authorized announcements
428/// 4. Finds the most complete local repo to fetch into
429/// 5. Identifies missing OIDs and fetches them from remote servers
430async fn sync_state_git_data(
431 state: RepositoryState,
432 database: &SharedDatabase,
433 git_data_path: &Path,
434 our_domain: Option<&str>,
435) -> Result<()> {
436 // Fetch repository data from database
437 let db_repo_data = fetch_repository_data(database, &state.identifier).await?;
438
439 if db_repo_data.announcements.is_empty() {
440 bail!(
441 "No announcements found for identifier: {}",
442 state.identifier
443 );
444 }
445
446 // Find owners that authorize this pubkey as a maintainer
447 let repo_owners_authorising_pubkey =
448 pubkey_authorised_for_repo_owners(&state.event.pubkey, &db_repo_data);
449
450 if repo_owners_authorising_pubkey.is_empty() {
451 bail!(
452 "No owners authorize pubkey {} for identifier {}",
453 state.event.pubkey,
454 state.identifier
455 );
456 }
457
458 // Collect clone URLs from authorized announcements, excluding our own service
459 let servers: HashSet<String> = db_repo_data
460 .announcements
461 .iter()
462 .filter(|a| repo_owners_authorising_pubkey.contains(&a.event.pubkey.to_hex()))
463 .flat_map(|a| a.clone_urls.iter().cloned())
464 .filter(|url| {
465 // Exclude our own domain if specified
466 if let Some(domain) = our_domain {
467 !url.contains(domain)
468 } else {
469 true
470 }
471 })
472 .collect();
473
474 if servers.is_empty() {
475 bail!(
476 "No external clone URLs found for identifier: {}",
477 state.identifier
478 );
479 }
480
481 tracing::debug!(
482 identifier = %state.identifier,
483 servers = ?servers,
484 "Found {} external servers for git data sync",
485 servers.len()
486 );
487
488 // Find the most complete local repo to fetch into
489 let (repo_path, missing_oids) =
490 get_most_complete_local_repo(&db_repo_data, &state, git_data_path)?;
491
492 if missing_oids.is_empty() {
493 tracing::debug!(
494 identifier = %state.identifier,
495 repo_path = %repo_path.display(),
496 "No missing OIDs - git data is already complete"
497 );
498 return Ok(());
499 }
500
501 tracing::info!(
502 identifier = %state.identifier,
503 repo_path = %repo_path.display(),
504 missing_oids = ?missing_oids,
505 "Attempting to fetch {} missing OIDs from remote servers",
506 missing_oids.len()
507 );
508
509 // Try to fetch from each server until we get all missing OIDs
510 let mut last_error: Option<String> = None;
511 for server_url in &servers {
512 match fetch_missing_oids_from_server(&repo_path, server_url, &missing_oids).await {
513 Ok(fetched) => {
514 if fetched > 0 {
515 tracing::info!(
516 identifier = %state.identifier,
517 server = %server_url,
518 fetched = %fetched,
519 "Successfully fetched git data"
520 );
521 }
522
523 // Check if all OIDs are now available
524 let still_missing: Vec<_> = missing_oids
525 .iter()
526 .filter(|oid| !oid_exists(&repo_path, oid))
527 .collect();
528
529 if still_missing.is_empty() {
530 tracing::info!(
531 identifier = %state.identifier,
532 "All missing OIDs fetched successfully"
533 );
534 return Ok(());
535 }
536 }
537 Err(e) => {
538 tracing::debug!(
539 identifier = %state.identifier,
540 server = %server_url,
541 error = %e,
542 "Failed to fetch from server"
543 );
544 last_error = Some(e.to_string());
545 }
546 }
547 }
548
549 // Check final state
550 let still_missing: Vec<_> = missing_oids
551 .iter()
552 .filter(|oid| !oid_exists(&repo_path, oid))
553 .collect();
554
555 if still_missing.is_empty() {
556 Ok(())
557 } else {
558 bail!(
559 "Failed to fetch {} OIDs from any server. Last error: {:?}",
560 still_missing.len(),
561 last_error
562 )
563 }
564}
565
566/// Fetch missing OIDs from a remote git server.
567///
568/// Uses `git fetch` to retrieve specific commits from the server.
569async fn fetch_missing_oids_from_server(
570 repo_path: &Path,
571 server_url: &str,
572 missing_oids: &[String],
573) -> Result<usize> {
574 if missing_oids.is_empty() {
575 return Ok(0);
576 }
577
578 // Use tokio::task::spawn_blocking for the git operations since they're blocking
579 let repo_path = repo_path.to_path_buf();
580 let server_url = server_url.to_string();
581 let oids = missing_oids.to_vec();
582
583 tokio::task::spawn_blocking(move || {
584 let mut fetched_count = 0;
585
586 // Try to fetch each missing OID individually
587 // This uses git's ability to fetch specific commits
588 for oid in &oids {
589 // Skip if already exists
590 if oid_exists(&repo_path, oid) {
591 continue;
592 }
593
594 // git fetch <remote> <sha1>
595 let output = Command::new("git")
596 .args(["fetch", "--depth=1", &server_url, oid])
597 .current_dir(&repo_path)
598 .output();
599
600 match output {
601 Ok(result) if result.status.success() => {
602 fetched_count += 1;
603 tracing::debug!(
604 oid = %oid,
605 server = %server_url,
606 "Successfully fetched OID"
607 );
608 }
609 Ok(result) => {
610 let stderr = String::from_utf8_lossy(&result.stderr);
611 tracing::debug!(
612 oid = %oid,
613 server = %server_url,
614 stderr = %stderr,
615 "git fetch failed for OID"
616 );
617 }
618 Err(e) => {
619 tracing::debug!(
620 oid = %oid,
621 server = %server_url,
622 error = %e,
623 "git fetch command error"
624 );
625 }
626 }
627 }
628
629 // If individual fetches didn't work, try a broader fetch
630 if fetched_count == 0 {
631 // Try fetching all refs - this might get us the commits we need
632 let output = Command::new("git")
633 .args(["fetch", "--all", "--tags", &server_url])
634 .current_dir(&repo_path)
635 .output();
636
637 if let Ok(result) = output {
638 if result.status.success() {
639 // Count how many OIDs we now have
640 for oid in &oids {
641 if oid_exists(&repo_path, oid) {
642 fetched_count += 1;
643 }
644 }
645 }
646 }
647 }
648
649 Ok(fetched_count)
650 })
651 .await?
652}
653
654fn get_most_complete_local_repo(
655 db_repo_data: &RepositoryData,
656 state: &RepositoryState,
657 git_path: &Path,
658) -> Result<(PathBuf, Vec<String>)> {
659 // should we filter for those where pubkey is authorised?
660
661 let repo_onwers_authorising_pubkey =
662 pubkey_authorised_for_repo_owners(&state.event.pubkey, db_repo_data);
663
664 let mut res: Option<(Timestamp, PathBuf, Vec<String>)> = None;
665 for announcement in &db_repo_data.announcements {
666 if !repo_onwers_authorising_pubkey.contains(&announcement.event.pubkey.to_hex()) {
667 continue; // skip where event author isn't a maintainer
668 }
669 let repo_path = git_path.join(announcement.repo_path().clone());
670 if let Ok(missing_oids) = identify_missing_oids(state, &repo_path) {
671 let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path)
672 .unwrap_or(Timestamp::zero());
673 let newest_commmit_date = if let Some((d, _, _)) = &res {
674 d
675 } else {
676 &Timestamp::zero()
677 };
678 if commit_date.gt(newest_commmit_date) {
679 res = Some((commit_date, repo_path, missing_oids));
680 }
681 }
682 }
683 if let Some((_newest_commit_date, repo_path, missing_oids)) = res {
684 Ok((repo_path, missing_oids))
685 } else {
686 bail!("no repo directories exists yet");
687 }
688}
689
690fn identify_missing_oids(state: &RepositoryState, git_repo_path: &Path) -> Result<Vec<String>> {
691 if !git_repo_path.exists() {
692 bail!("repo directory doesn't exists");
372 } 693 }
694 let mut missing_oids = vec![];
695 for branch_state in &state.branches {
696 if !branch_state.commit.starts_with("ref: ")
697 && !oid_exists(git_repo_path, &branch_state.commit)
698 {
699 missing_oids.push(branch_state.commit.clone());
700 }
701 }
702 for tag_state in &state.tags {
703 if !tag_state.commit.starts_with("ref: ") && !oid_exists(git_repo_path, &tag_state.commit) {
704 missing_oids.push(tag_state.commit.clone());
705 }
706 }
707 Ok(missing_oids)
708}
709
710fn get_date_of_most_recent_commit_on_default_branch(git_repo_path: &Path) -> Result<Timestamp> {
711 if !git_repo_path.exists() {
712 bail!("repo directory doesn't exists");
713 }
714
715 // Get the default branch (HEAD)
716 let head_output = std::process::Command::new("git")
717 .args(["symbolic-ref", "HEAD"])
718 .current_dir(git_repo_path)
719 .output()?;
720
721 if !head_output.status.success() {
722 bail!("Failed to get repository HEAD");
723 }
724
725 let head_ref = String::from_utf8_lossy(&head_output.stdout)
726 .trim()
727 .to_string();
728
729 // Get the most recent commit timestamp on the default branch
730 // Use %ct to get the committer date as Unix timestamp
731 let log_output = std::process::Command::new("git")
732 .args(["log", "-1", "--format=%ct", &head_ref])
733 .current_dir(git_repo_path)
734 .output()?;
735
736 if !log_output.status.success() {
737 bail!("Failed to get commit timestamp for {}", head_ref);
738 }
739
740 let timestamp_str = String::from_utf8_lossy(&log_output.stdout)
741 .trim()
742 .to_string();
743 let unix_timestamp: u64 = timestamp_str
744 .parse()
745 .map_err(|_| anyhow::anyhow!("Failed to parse timestamp: {}", timestamp_str))?;
746
747 Ok(Timestamp::from(unix_timestamp))
373} 748}
374 749
375#[cfg(test)] 750#[cfg(test)]
@@ -378,7 +753,7 @@ mod tests {
378 753
379 #[test] 754 #[test]
380 fn test_purgatory_creation() { 755 fn test_purgatory_creation() {
381 let purgatory = Purgatory::new(); 756 let purgatory = Purgatory::new(PathBuf::new());
382 let (state_count, pr_count) = purgatory.count(); 757 let (state_count, pr_count) = purgatory.count();
383 assert_eq!(state_count, 0); 758 assert_eq!(state_count, 0);
384 assert_eq!(pr_count, 0); 759 assert_eq!(pr_count, 0);
@@ -386,7 +761,7 @@ mod tests {
386 761
387 #[test] 762 #[test]
388 fn test_purgatory_count() { 763 fn test_purgatory_count() {
389 let purgatory = Purgatory::new(); 764 let purgatory = Purgatory::new(PathBuf::new());
390 765
391 // Add some test data 766 // Add some test data
392 let keys = Keys::generate(); 767 let keys = Keys::generate();
@@ -405,7 +780,7 @@ mod tests {
405 780
406#[test] 781#[test]
407fn test_pr_event_vs_placeholder() { 782fn test_pr_event_vs_placeholder() {
408 let purgatory = Purgatory::new(); 783 let purgatory = Purgatory::new(PathBuf::new());
409 let keys = Keys::generate(); 784 let keys = Keys::generate();
410 let event = EventBuilder::text_note("test PR") 785 let event = EventBuilder::text_note("test PR")
411 .sign_with_keys(&keys) 786 .sign_with_keys(&keys)
@@ -435,7 +810,7 @@ fn test_pr_event_vs_placeholder() {
435 810
436#[test] 811#[test]
437fn test_pr_placeholder_creation_and_retrieval() { 812fn test_pr_placeholder_creation_and_retrieval() {
438 let purgatory = Purgatory::new(); 813 let purgatory = Purgatory::new(PathBuf::new());
439 814
440 // Add a placeholder 815 // Add a placeholder
441 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-123".to_string()); 816 purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-123".to_string());
@@ -456,7 +831,7 @@ fn test_pr_placeholder_creation_and_retrieval() {
456fn test_cleanup_removes_expired_entries() { 831fn test_cleanup_removes_expired_entries() {
457 use std::time::Duration; 832 use std::time::Duration;
458 833
459 let purgatory = Purgatory::new(); 834 let purgatory = Purgatory::new(PathBuf::new());
460 let keys = Keys::generate(); 835 let keys = Keys::generate();
461 836
462 // Create events 837 // Create events
@@ -509,7 +884,7 @@ fn test_cleanup_removes_expired_entries() {
509 884
510#[test] 885#[test]
511fn test_cleanup_preserves_non_expired_entries() { 886fn test_cleanup_preserves_non_expired_entries() {
512 let purgatory = Purgatory::new(); 887 let purgatory = Purgatory::new(PathBuf::new());
513 let keys = Keys::generate(); 888 let keys = Keys::generate();
514 889
515 let state_event = EventBuilder::text_note("state event") 890 let state_event = EventBuilder::text_note("state event")
@@ -540,7 +915,7 @@ fn test_cleanup_preserves_non_expired_entries() {
540fn test_cleanup_mixed_expired_and_fresh() { 915fn test_cleanup_mixed_expired_and_fresh() {
541 use std::time::Duration; 916 use std::time::Duration;
542 917
543 let purgatory = Purgatory::new(); 918 let purgatory = Purgatory::new(PathBuf::new());
544 let keys = Keys::generate(); 919 let keys = Keys::generate();
545 920
546 // Add multiple state events for same repo 921 // Add multiple state events for same repo
@@ -594,7 +969,7 @@ fn test_cleanup_mixed_expired_and_fresh() {
594fn test_remove_expired_legacy_method() { 969fn test_remove_expired_legacy_method() {
595 use std::time::Duration; 970 use std::time::Duration;
596 971
597 let purgatory = Purgatory::new(); 972 let purgatory = Purgatory::new(PathBuf::new());
598 let keys = Keys::generate(); 973 let keys = Keys::generate();
599 974
600 let state_event = EventBuilder::text_note("state") 975 let state_event = EventBuilder::text_note("state")
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index dcdbe3a..b56b6b7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1661,6 +1661,8 @@ impl SyncManager {
1661 reason = %message, 1661 reason = %message,
1662 "Event added to purgatory" 1662 "Event added to purgatory"
1663 ); 1663 );
1664 // Note: git data sync for state events is triggered by the policy
1665 // layer when adding to purgatory (via start_state_sync)
1664 ProcessResult::Purgatory 1666 ProcessResult::Purgatory
1665 } else { 1667 } else {
1666 tracing::debug!( 1668 tracing::debug!(