From 623cae575f8c9ce33f2d7fdc2526db495f846acb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 5 Jan 2026 13:05:17 +0000 Subject: purgatory: add state git data sync --- src/git/authorization.rs | 14 ++ src/main.rs | 6 +- src/nostr/policy/state.rs | 8 + src/purgatory/mod.rs | 399 ++++++++++++++++++++++++++++++++++++++++++++-- src/sync/mod.rs | 2 + 5 files changed, 415 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/git/authorization.rs b/src/git/authorization.rs index 6b997d8..e7ea99b 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs @@ -287,6 +287,20 @@ pub async fn fetch_repository_data( }) } +pub fn pubkey_authorised_for_repo_owners( + pubkey: &PublicKey, + db_repo_data: &RepositoryData, +) -> Vec { + let mut repo_owners_authorising_pubkey = HashSet::new(); + let collections = collect_authorized_maintainers(&db_repo_data.announcements); + for (owner, authoised) in collections { + if authoised.contains(&pubkey.to_hex()) { + repo_owners_authorising_pubkey.insert(owner.to_string()); + } + } + repo_owners_authorising_pubkey.iter().cloned().collect() +} + /// Collect authorized maintainers grouped by owner from a set of announcements /// /// For each announcement, returns a map from owner pubkey to authorized maintainers: diff --git a/src/main.rs b/src/main.rs index d382462..fbe3e34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::time::Duration; +use std::{path::PathBuf, sync::Arc}; use anyhow::Result; use tokio::signal; @@ -49,7 +49,9 @@ async fn main() -> Result<()> { }; // Create purgatory for event/git coordination - let purgatory = Arc::new(Purgatory::new()); + let purgatory = Arc::new(Purgatory::new(PathBuf::from( + config.effective_git_data_path(), + ))); info!("Purgatory initialized for event coordination"); // Create Nostr relay with NIP-34 validation diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 48435ea..9ca3ee6 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -173,6 +173,14 @@ impl StatePolicy { self.ctx .purgatory .add_state(event.clone(), state.identifier.clone(), event.pubkey); + + // Trigger background git data sync from remote servers + self.ctx.purgatory.start_state_sync( + state.clone(), + self.ctx.database.clone(), + Some(self.ctx.domain.clone()), + ); + tracing::info!( "state event added to purgatory: eventid: {}, identifier: {}", state.event.id, diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 2987f15..f0a9ac5 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -14,15 +14,25 @@ mod helpers; mod types; +use anyhow::{bail, Result}; pub use helpers::{can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; use dashmap::DashMap; use nostr_sdk::prelude::*; use std::collections::HashSet; +use std::path::{Path, PathBuf}; +use std::process::Command; use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::git::authorization::{ + fetch_repository_data, pubkey_authorised_for_repo_owners, RepositoryData, +}; +use crate::git::oid_exists; +use crate::nostr::builder::SharedDatabase; +use crate::nostr::events::RepositoryState; + /// Default expiry duration for purgatory entries (30 minutes) const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); @@ -40,14 +50,17 @@ pub struct Purgatory { /// PR events (kind 1617/1618) or placeholders indexed by event ID (hex string). /// Event ID is from the 'e' tag in the PR event itself. pr_events: Arc>, + + git_data_path: PathBuf, } impl Purgatory { /// Create a new empty purgatory. - pub fn new() -> Self { + pub fn new(git_data_path: impl Into) -> Self { Self { state_events: Arc::new(DashMap::new()), pr_events: Arc::new(DashMap::new()), + git_data_path: git_data_path.into(), } } @@ -73,6 +86,46 @@ impl Purgatory { self.state_events.entry(identifier).or_default().push(entry); } + /// Trigger a background git data sync for a state event. + /// + /// This method spawns a background task to attempt fetching missing git data + /// from remote servers listed in the repository announcements. It's called + /// when a state event arrives but the required git data isn't available locally. + /// + /// # Arguments + /// * `state` - The parsed repository state event + /// * `database` - Database to query for repository announcements + /// * `our_domain` - Our service domain to exclude from fetch targets + pub fn start_state_sync( + &self, + state: RepositoryState, + database: SharedDatabase, + our_domain: Option, + ) { + let git_data_path = self.git_data_path.clone(); + let identifier = state.identifier.clone(); + let event_id = state.event.id; + + tokio::spawn(async move { + tracing::debug!( + identifier = %identifier, + event_id = %event_id, + "Starting background git data sync for purgatory state event" + ); + + if let Err(e) = + sync_state_git_data(state, &database, &git_data_path, our_domain.as_deref()).await + { + tracing::warn!( + identifier = %identifier, + event_id = %event_id, + error = %e, + "Failed to sync git data for purgatory state event" + ); + } + }); + } + /// Add a PR event to purgatory. /// /// The event will expire after the default duration unless matched with git data. @@ -366,10 +419,332 @@ impl Purgatory { } } -impl Default for Purgatory { - fn default() -> Self { - Self::new() +/// Async function to sync git data for a state event from remote servers. +/// +/// This function: +/// 1. Fetches repository data from the database +/// 2. Identifies which owners authorize the state event author +/// 3. Collects clone URLs from authorized announcements +/// 4. Finds the most complete local repo to fetch into +/// 5. Identifies missing OIDs and fetches them from remote servers +async fn sync_state_git_data( + state: RepositoryState, + database: &SharedDatabase, + git_data_path: &Path, + our_domain: Option<&str>, +) -> Result<()> { + // Fetch repository data from database + let db_repo_data = fetch_repository_data(database, &state.identifier).await?; + + if db_repo_data.announcements.is_empty() { + bail!( + "No announcements found for identifier: {}", + state.identifier + ); + } + + // Find owners that authorize this pubkey as a maintainer + let repo_owners_authorising_pubkey = + pubkey_authorised_for_repo_owners(&state.event.pubkey, &db_repo_data); + + if repo_owners_authorising_pubkey.is_empty() { + bail!( + "No owners authorize pubkey {} for identifier {}", + state.event.pubkey, + state.identifier + ); + } + + // Collect clone URLs from authorized announcements, excluding our own service + let servers: HashSet = db_repo_data + .announcements + .iter() + .filter(|a| repo_owners_authorising_pubkey.contains(&a.event.pubkey.to_hex())) + .flat_map(|a| a.clone_urls.iter().cloned()) + .filter(|url| { + // Exclude our own domain if specified + if let Some(domain) = our_domain { + !url.contains(domain) + } else { + true + } + }) + .collect(); + + if servers.is_empty() { + bail!( + "No external clone URLs found for identifier: {}", + state.identifier + ); + } + + tracing::debug!( + identifier = %state.identifier, + servers = ?servers, + "Found {} external servers for git data sync", + servers.len() + ); + + // Find the most complete local repo to fetch into + let (repo_path, missing_oids) = + get_most_complete_local_repo(&db_repo_data, &state, git_data_path)?; + + if missing_oids.is_empty() { + tracing::debug!( + identifier = %state.identifier, + repo_path = %repo_path.display(), + "No missing OIDs - git data is already complete" + ); + return Ok(()); + } + + tracing::info!( + identifier = %state.identifier, + repo_path = %repo_path.display(), + missing_oids = ?missing_oids, + "Attempting to fetch {} missing OIDs from remote servers", + missing_oids.len() + ); + + // Try to fetch from each server until we get all missing OIDs + let mut last_error: Option = None; + for server_url in &servers { + match fetch_missing_oids_from_server(&repo_path, server_url, &missing_oids).await { + Ok(fetched) => { + if fetched > 0 { + tracing::info!( + identifier = %state.identifier, + server = %server_url, + fetched = %fetched, + "Successfully fetched git data" + ); + } + + // Check if all OIDs are now available + let still_missing: Vec<_> = missing_oids + .iter() + .filter(|oid| !oid_exists(&repo_path, oid)) + .collect(); + + if still_missing.is_empty() { + tracing::info!( + identifier = %state.identifier, + "All missing OIDs fetched successfully" + ); + return Ok(()); + } + } + Err(e) => { + tracing::debug!( + identifier = %state.identifier, + server = %server_url, + error = %e, + "Failed to fetch from server" + ); + last_error = Some(e.to_string()); + } + } + } + + // Check final state + let still_missing: Vec<_> = missing_oids + .iter() + .filter(|oid| !oid_exists(&repo_path, oid)) + .collect(); + + if still_missing.is_empty() { + Ok(()) + } else { + bail!( + "Failed to fetch {} OIDs from any server. Last error: {:?}", + still_missing.len(), + last_error + ) + } +} + +/// Fetch missing OIDs from a remote git server. +/// +/// Uses `git fetch` to retrieve specific commits from the server. +async fn fetch_missing_oids_from_server( + repo_path: &Path, + server_url: &str, + missing_oids: &[String], +) -> Result { + if missing_oids.is_empty() { + return Ok(0); + } + + // Use tokio::task::spawn_blocking for the git operations since they're blocking + let repo_path = repo_path.to_path_buf(); + let server_url = server_url.to_string(); + let oids = missing_oids.to_vec(); + + tokio::task::spawn_blocking(move || { + let mut fetched_count = 0; + + // Try to fetch each missing OID individually + // This uses git's ability to fetch specific commits + for oid in &oids { + // Skip if already exists + if oid_exists(&repo_path, oid) { + continue; + } + + // git fetch + let output = Command::new("git") + .args(["fetch", "--depth=1", &server_url, oid]) + .current_dir(&repo_path) + .output(); + + match output { + Ok(result) if result.status.success() => { + fetched_count += 1; + tracing::debug!( + oid = %oid, + server = %server_url, + "Successfully fetched OID" + ); + } + Ok(result) => { + let stderr = String::from_utf8_lossy(&result.stderr); + tracing::debug!( + oid = %oid, + server = %server_url, + stderr = %stderr, + "git fetch failed for OID" + ); + } + Err(e) => { + tracing::debug!( + oid = %oid, + server = %server_url, + error = %e, + "git fetch command error" + ); + } + } + } + + // If individual fetches didn't work, try a broader fetch + if fetched_count == 0 { + // Try fetching all refs - this might get us the commits we need + let output = Command::new("git") + .args(["fetch", "--all", "--tags", &server_url]) + .current_dir(&repo_path) + .output(); + + if let Ok(result) = output { + if result.status.success() { + // Count how many OIDs we now have + for oid in &oids { + if oid_exists(&repo_path, oid) { + fetched_count += 1; + } + } + } + } + } + + Ok(fetched_count) + }) + .await? +} + +fn get_most_complete_local_repo( + db_repo_data: &RepositoryData, + state: &RepositoryState, + git_path: &Path, +) -> Result<(PathBuf, Vec)> { + // should we filter for those where pubkey is authorised? + + let repo_onwers_authorising_pubkey = + pubkey_authorised_for_repo_owners(&state.event.pubkey, db_repo_data); + + let mut res: Option<(Timestamp, PathBuf, Vec)> = None; + for announcement in &db_repo_data.announcements { + if !repo_onwers_authorising_pubkey.contains(&announcement.event.pubkey.to_hex()) { + continue; // skip where event author isn't a maintainer + } + let repo_path = git_path.join(announcement.repo_path().clone()); + if let Ok(missing_oids) = identify_missing_oids(state, &repo_path) { + let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path) + .unwrap_or(Timestamp::zero()); + let newest_commmit_date = if let Some((d, _, _)) = &res { + d + } else { + &Timestamp::zero() + }; + if commit_date.gt(newest_commmit_date) { + res = Some((commit_date, repo_path, missing_oids)); + } + } + } + if let Some((_newest_commit_date, repo_path, missing_oids)) = res { + Ok((repo_path, missing_oids)) + } else { + bail!("no repo directories exists yet"); + } +} + +fn identify_missing_oids(state: &RepositoryState, git_repo_path: &Path) -> Result> { + if !git_repo_path.exists() { + bail!("repo directory doesn't exists"); } + let mut missing_oids = vec![]; + for branch_state in &state.branches { + if !branch_state.commit.starts_with("ref: ") + && !oid_exists(git_repo_path, &branch_state.commit) + { + missing_oids.push(branch_state.commit.clone()); + } + } + for tag_state in &state.tags { + if !tag_state.commit.starts_with("ref: ") && !oid_exists(git_repo_path, &tag_state.commit) { + missing_oids.push(tag_state.commit.clone()); + } + } + Ok(missing_oids) +} + +fn get_date_of_most_recent_commit_on_default_branch(git_repo_path: &Path) -> Result { + if !git_repo_path.exists() { + bail!("repo directory doesn't exists"); + } + + // Get the default branch (HEAD) + let head_output = std::process::Command::new("git") + .args(["symbolic-ref", "HEAD"]) + .current_dir(git_repo_path) + .output()?; + + if !head_output.status.success() { + bail!("Failed to get repository HEAD"); + } + + let head_ref = String::from_utf8_lossy(&head_output.stdout) + .trim() + .to_string(); + + // Get the most recent commit timestamp on the default branch + // Use %ct to get the committer date as Unix timestamp + let log_output = std::process::Command::new("git") + .args(["log", "-1", "--format=%ct", &head_ref]) + .current_dir(git_repo_path) + .output()?; + + if !log_output.status.success() { + bail!("Failed to get commit timestamp for {}", head_ref); + } + + let timestamp_str = String::from_utf8_lossy(&log_output.stdout) + .trim() + .to_string(); + let unix_timestamp: u64 = timestamp_str + .parse() + .map_err(|_| anyhow::anyhow!("Failed to parse timestamp: {}", timestamp_str))?; + + Ok(Timestamp::from(unix_timestamp)) } #[cfg(test)] @@ -378,7 +753,7 @@ mod tests { #[test] fn test_purgatory_creation() { - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); let (state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); @@ -386,7 +761,7 @@ mod tests { #[test] fn test_purgatory_count() { - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); // Add some test data let keys = Keys::generate(); @@ -405,7 +780,7 @@ mod tests { #[test] fn test_pr_event_vs_placeholder() { - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); let keys = Keys::generate(); let event = EventBuilder::text_note("test PR") .sign_with_keys(&keys) @@ -435,7 +810,7 @@ fn test_pr_event_vs_placeholder() { #[test] fn test_pr_placeholder_creation_and_retrieval() { - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); // Add a placeholder purgatory.add_pr_placeholder("placeholder-id".to_string(), "commit-123".to_string()); @@ -456,7 +831,7 @@ fn test_pr_placeholder_creation_and_retrieval() { fn test_cleanup_removes_expired_entries() { use std::time::Duration; - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); let keys = Keys::generate(); // Create events @@ -509,7 +884,7 @@ fn test_cleanup_removes_expired_entries() { #[test] fn test_cleanup_preserves_non_expired_entries() { - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); let keys = Keys::generate(); let state_event = EventBuilder::text_note("state event") @@ -540,7 +915,7 @@ fn test_cleanup_preserves_non_expired_entries() { fn test_cleanup_mixed_expired_and_fresh() { use std::time::Duration; - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); let keys = Keys::generate(); // Add multiple state events for same repo @@ -594,7 +969,7 @@ fn test_cleanup_mixed_expired_and_fresh() { fn test_remove_expired_legacy_method() { use std::time::Duration; - let purgatory = Purgatory::new(); + let purgatory = Purgatory::new(PathBuf::new()); let keys = Keys::generate(); let state_event = EventBuilder::text_note("state") diff --git a/src/sync/mod.rs b/src/sync/mod.rs index dcdbe3a..b56b6b7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1661,6 +1661,8 @@ impl SyncManager { reason = %message, "Event added to purgatory" ); + // Note: git data sync for state events is triggered by the policy + // layer when adding to purgatory (via start_state_sync) ProcessResult::Purgatory } else { tracing::debug!( -- cgit v1.2.3