From e72edbae86affcb9fc0429bd197639bf438ffb6c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 12:24:42 +0000 Subject: Add unified process_newly_available_git_data function Implement the unified function that handles all post-git-data-available processing, regardless of how data arrived (git push or purgatory sync). This function: - Discovers satisfiable events from purgatory (state and PR events) - Syncs OIDs to authorized owner repos - Aligns refs and sets HEAD - Saves events to database - Notifies WebSocket subscribers - Removes from purgatory New additions: - ProcessResult struct for tracking processing outcomes - process_newly_available_git_data async function in src/git/sync.rs - Helper functions: extract_identifier_from_repo_path, extract_identifier_from_pr_event - Purgatory::find_prs_for_identifier method for PR event discovery - Unit tests for all helper functions Also fixes: - Simplified extract_domain to avoid url crate dependency - Removed unused imports in sync/loop.rs --- src/git/sync.rs | 648 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 647 insertions(+), 1 deletion(-) (limited to 'src/git/sync.rs') diff --git a/src/git/sync.rs b/src/git/sync.rs index 9a8af5a..e57a0cc 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -16,6 +16,18 @@ //! repository identifier, and they may share maintainers. When a state event //! authorizes a push, that push should be reflected in ALL owner repositories //! that would authorize the same state. +//! +//! ## Unified Processing +//! +//! The `process_newly_available_git_data` function provides unified processing +//! for newly available git data, regardless of how it arrived (git push or +//! purgatory sync). This ensures consistent behavior for: +//! - Discovering satisfiable events from purgatory +//! - Syncing OIDs to authorized owner repos +//! - Aligning refs (+ setting HEAD) +//! - Saving events to database +//! - Notifying WebSocket subscribers +//! - Removing from purgatory use std::collections::{HashMap, HashSet}; use std::path::Path; @@ -24,9 +36,55 @@ use tracing::{debug, info, warn}; use nostr_sdk::Event; -use crate::git::authorization::{collect_authorized_maintainers, RepositoryData}; +use crate::git::authorization::{ + collect_authorized_maintainers, fetch_repository_data, pubkey_authorised_for_repo_owners, + RepositoryData, +}; use crate::git::{self, oid_exists}; +use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; +use crate::purgatory::{can_satisfy_state, Purgatory}; + +/// Result of processing newly available git data. +/// +/// This struct captures what happened when we tried to release events from +/// purgatory after new git data became available (whether from a git push +/// or from purgatory sync fetching OIDs from remote servers). +#[derive(Debug, Default, Clone)] +pub struct ProcessResult { + /// Number of state events released from purgatory + pub states_released: usize, + /// Number of PR events released from purgatory + pub prs_released: usize, + /// Number of repositories synced (OIDs copied + refs aligned) + pub repos_synced: usize, + /// Number of refs created across all repos + pub refs_created: usize, + /// Number of refs updated across all repos + pub refs_updated: usize, + /// Number of refs deleted across all repos + pub refs_deleted: usize, + /// Errors encountered (non-fatal) + pub errors: Vec, +} + +impl ProcessResult { + /// Check if any events were released + pub fn released_any(&self) -> bool { + self.states_released > 0 || self.prs_released > 0 + } + + /// Merge another ProcessResult into this one + pub fn merge(&mut self, other: ProcessResult) { + self.states_released += other.states_released; + self.prs_released += other.prs_released; + self.repos_synced += other.repos_synced; + self.refs_created += other.refs_created; + self.refs_updated += other.refs_updated; + self.refs_deleted += other.refs_deleted; + self.errors.extend(other.errors); + } +} /// Result of syncing git data to owner repositories #[derive(Debug, Default)] @@ -665,10 +723,598 @@ pub fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) -> result } +// ============================================================================= +// Unified Git Data Processing +// ============================================================================= + +/// Extract repository identifier from a repository path. +/// +/// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the identifier. +/// +/// # Arguments +/// * `repo_path` - Full path to the git repository +/// * `git_data_path` - Base path for git repositories +/// +/// # Returns +/// The identifier if the path matches the expected pattern, None otherwise +pub fn extract_identifier_from_repo_path(repo_path: &Path, git_data_path: &Path) -> Option { + // Get the relative path from git_data_path + let relative = repo_path.strip_prefix(git_data_path).ok()?; + + // Expected structure: {npub}/{identifier}.git + let components: Vec<_> = relative.components().collect(); + if components.len() != 2 { + return None; + } + + // Get the repo directory name (e.g., "my-repo.git") + let repo_name = components[1].as_os_str().to_str()?; + + // Strip the .git suffix + repo_name.strip_suffix(".git").map(|s| s.to_string()) +} + +/// Extract repository identifier from a PR event. +/// +/// PR events reference repositories via `a` tags with format `30617::`. +/// This function extracts the identifier from the first matching `a` tag. +/// +/// # Arguments +/// * `event` - The PR event (kind 1617 or 1618) +/// +/// # Returns +/// The identifier if found, None otherwise +pub fn extract_identifier_from_pr_event(event: &Event) -> Option { + for tag in event.tags.iter() { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { + // Format: 30617:: + let parts: Vec<&str> = tag_vec[1].split(':').collect(); + if parts.len() >= 3 { + return Some(parts[2].to_string()); + } + } + } + None +} + +/// Unified processing of newly available git data. +/// +/// This function is called whenever git data becomes available, whether from: +/// - A successful `git push` (handle_receive_pack) +/// - Purgatory sync fetching OIDs from remote servers +/// +/// It handles all post-git-data-available processing: +/// 1. Discovers satisfiable events from purgatory (state events and PR events) +/// 2. For each satisfiable state event: +/// - Syncs OIDs to authorized owner repos +/// - Aligns refs (+ sets HEAD) +/// - Saves event to database +/// - Notifies WebSocket subscribers +/// - Removes from purgatory +/// 3. For each satisfiable PR event: +/// - Syncs commit to owner repos +/// - Creates refs/nostr/ refs +/// - Saves event to database +/// - Notifies WebSocket subscribers +/// - Removes from purgatory +/// +/// # Arguments +/// * `source_repo_path` - Path to the repository that has the new git data +/// * `new_oids` - Set of OIDs that were just made available (used for logging/debugging) +/// * `database` - Database for saving events and querying repository data +/// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) +/// * `purgatory` - Purgatory instance to check for satisfiable events +/// * `git_data_path` - Base path for git repositories +/// +/// # Returns +/// A `ProcessResult` describing what was processed +pub async fn process_newly_available_git_data( + source_repo_path: &Path, + new_oids: &HashSet, + database: &SharedDatabase, + local_relay: Option<&nostr_relay_builder::LocalRelay>, + purgatory: &Purgatory, + git_data_path: &Path, +) -> anyhow::Result { + let mut result = ProcessResult::default(); + + // Extract identifier from repo path + let identifier = match extract_identifier_from_repo_path(source_repo_path, git_data_path) { + Some(id) => id, + None => { + debug!( + repo_path = %source_repo_path.display(), + "Could not extract identifier from repo path" + ); + return Ok(result); + } + }; + + debug!( + identifier = %identifier, + new_oids_count = new_oids.len(), + "Processing newly available git data" + ); + + // Get current refs from the repository for state matching + let current_refs: HashMap = git::list_refs(source_repo_path) + .unwrap_or_default() + .into_iter() + .collect(); + + // Process state events from purgatory + let state_result = + process_purgatory_state_events(&identifier, source_repo_path, ¤t_refs, database, local_relay, purgatory, git_data_path).await; + result.merge(state_result); + + // Process PR events from purgatory + let pr_result = + process_purgatory_pr_events(&identifier, source_repo_path, database, local_relay, purgatory, git_data_path).await; + result.merge(pr_result); + + if result.released_any() { + info!( + identifier = %identifier, + states_released = result.states_released, + prs_released = result.prs_released, + repos_synced = result.repos_synced, + "Released events from purgatory after git data became available" + ); + } + + Ok(result) +} + +/// Process state events from purgatory that can now be satisfied. +async fn process_purgatory_state_events( + identifier: &str, + source_repo_path: &Path, + current_refs: &HashMap, + database: &SharedDatabase, + local_relay: Option<&nostr_relay_builder::LocalRelay>, + purgatory: &Purgatory, + git_data_path: &Path, +) -> ProcessResult { + let mut result = ProcessResult::default(); + + // Find state events in purgatory for this identifier + let purgatory_states = purgatory.find_state(identifier); + if purgatory_states.is_empty() { + return result; + } + + debug!( + identifier = %identifier, + purgatory_states_count = purgatory_states.len(), + "Checking purgatory state events for satisfaction" + ); + + // Build ref updates from current refs (treating all as "creations" for matching purposes) + let ref_updates: Vec = current_refs + .iter() + .map(|(ref_name, commit)| crate::purgatory::RefUpdate { + old_oid: "0000000000000000000000000000000000000000".to_string(), + new_oid: commit.clone(), + ref_name: ref_name.clone(), + }) + .collect(); + + // Check which state events can be satisfied + for entry in &purgatory_states { + // Check if this state event can be satisfied with current refs + if !can_satisfy_state(&entry.event, &ref_updates, current_refs) { + debug!( + identifier = %identifier, + event_id = %entry.event.id, + "State event cannot be satisfied with current refs" + ); + continue; + } + + // Parse the state event + let state = match RepositoryState::from_event(entry.event.clone()) { + Ok(s) => s, + Err(e) => { + warn!( + identifier = %identifier, + event_id = %entry.event.id, + error = %e, + "Failed to parse state event from purgatory" + ); + result.errors.push(format!("Failed to parse state event: {}", e)); + continue; + } + }; + + // Fetch repository data for authorization check + let db_repo_data = match fetch_repository_data(database, identifier).await { + Ok(data) => data, + Err(e) => { + warn!( + identifier = %identifier, + event_id = %entry.event.id, + error = %e, + "Failed to fetch repository data for state event" + ); + result.errors.push(format!("Failed to fetch repo data: {}", e)); + continue; + } + }; + + // Check authorization at release time + let repo_owners_authorising_pubkey = + pubkey_authorised_for_repo_owners(&entry.event.pubkey, &db_repo_data); + if repo_owners_authorising_pubkey.is_empty() { + debug!( + identifier = %identifier, + event_id = %entry.event.id, + pubkey = %entry.event.pubkey, + "State event author no longer authorized - skipping" + ); + continue; + } + + // Sync to owner repos and align refs + let sync_result = sync_to_owner_repos(source_repo_path, &state, &db_repo_data, git_data_path); + result.repos_synced += sync_result.repos_synced; + result.refs_created += sync_result.refs_created; + result.refs_updated += sync_result.refs_updated; + result.refs_deleted += sync_result.refs_deleted; + + // Save event to database + match database.save_event(&entry.event).await { + Ok(_) => { + info!( + identifier = %identifier, + event_id = %entry.event.id, + "Saved purgatory state event to database" + ); + + // Notify WebSocket subscribers + if let Some(relay) = local_relay { + if relay.notify_event(entry.event.clone()) { + debug!( + identifier = %identifier, + event_id = %entry.event.id, + "Broadcast state event to WebSocket listeners" + ); + } + } + + // Remove from purgatory + purgatory.remove_state_event(identifier, &entry.event.id); + result.states_released += 1; + + info!( + identifier = %identifier, + event_id = %entry.event.id, + "Released state event from purgatory" + ); + } + Err(e) => { + warn!( + identifier = %identifier, + event_id = %entry.event.id, + error = %e, + "Failed to save state event to database" + ); + result.errors.push(format!("Failed to save state event: {}", e)); + } + } + } + + result +} + +/// Process PR events from purgatory that can now be satisfied. +async fn process_purgatory_pr_events( + identifier: &str, + source_repo_path: &Path, + database: &SharedDatabase, + local_relay: Option<&nostr_relay_builder::LocalRelay>, + purgatory: &Purgatory, + git_data_path: &Path, +) -> ProcessResult { + let mut result = ProcessResult::default(); + + // Find PR events in purgatory for this identifier + let purgatory_prs = purgatory.find_prs_for_identifier(identifier); + if purgatory_prs.is_empty() { + return result; + } + + debug!( + identifier = %identifier, + purgatory_prs_count = purgatory_prs.len(), + "Checking purgatory PR events for satisfaction" + ); + + // Fetch repository data for syncing + let db_repo_data = match fetch_repository_data(database, identifier).await { + Ok(data) => data, + Err(e) => { + warn!( + identifier = %identifier, + error = %e, + "Failed to fetch repository data for PR events" + ); + result.errors.push(format!("Failed to fetch repo data: {}", e)); + return result; + } + }; + + for entry in purgatory_prs { + // Only process entries that have actual events (not placeholders) + let event = match &entry.event { + Some(e) => e, + None => continue, + }; + + // Check if the commit exists in the source repo + if !oid_exists(source_repo_path, &entry.commit) { + debug!( + identifier = %identifier, + event_id = %event.id, + commit = %entry.commit, + "PR commit not available yet" + ); + continue; + } + + // Sync PR ref to owner repos + let pr_refs = vec![(event.id.to_hex(), entry.commit.clone())]; + let pr_events = vec![event.clone()]; + + // Get owner pubkey from source repo path + let owner_pubkey = extract_owner_from_repo_path(source_repo_path, git_data_path) + .unwrap_or_default(); + + let sync_result = sync_pr_refs_to_tagged_owner_repos( + source_repo_path, + &pr_refs, + &pr_events, + &db_repo_data, + git_data_path, + &owner_pubkey, + ); + result.repos_synced += sync_result.repos_synced; + result.refs_created += sync_result.refs_created; + + // Create the ref in the source repo if it doesn't exist + let ref_name = format!("refs/nostr/{}", event.id.to_hex()); + if git::get_ref_commit(source_repo_path, &ref_name).is_none() { + if let Err(e) = git::update_ref(source_repo_path, &ref_name, &entry.commit) { + warn!( + identifier = %identifier, + event_id = %event.id, + error = %e, + "Failed to create PR ref in source repo" + ); + } else { + result.refs_created += 1; + } + } + + // Save event to database + match database.save_event(event).await { + Ok(_) => { + info!( + identifier = %identifier, + event_id = %event.id, + "Saved purgatory PR event to database" + ); + + // Notify WebSocket subscribers + if let Some(relay) = local_relay { + if relay.notify_event(event.clone()) { + debug!( + identifier = %identifier, + event_id = %event.id, + "Broadcast PR event to WebSocket listeners" + ); + } + } + + // Remove from purgatory + let event_id_hex = event.id.to_hex(); + purgatory.remove_pr(&event_id_hex); + result.prs_released += 1; + + info!( + identifier = %identifier, + event_id = %event.id, + "Released PR event from purgatory" + ); + } + Err(e) => { + warn!( + identifier = %identifier, + event_id = %event.id, + error = %e, + "Failed to save PR event to database" + ); + result.errors.push(format!("Failed to save PR event: {}", e)); + } + } + } + + result +} + +/// Extract owner pubkey from a repository path. +/// +/// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. +fn extract_owner_from_repo_path(repo_path: &Path, git_data_path: &Path) -> Option { + let relative = repo_path.strip_prefix(git_data_path).ok()?; + let components: Vec<_> = relative.components().collect(); + if components.len() >= 1 { + components[0].as_os_str().to_str().map(|s| s.to_string()) + } else { + None + } +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn test_process_result_default() { + let result = ProcessResult::default(); + assert_eq!(result.states_released, 0); + assert_eq!(result.prs_released, 0); + assert_eq!(result.repos_synced, 0); + assert!(!result.released_any()); + } + + #[test] + fn test_process_result_released_any() { + let mut result = ProcessResult::default(); + assert!(!result.released_any()); + + result.states_released = 1; + assert!(result.released_any()); + + result.states_released = 0; + result.prs_released = 1; + assert!(result.released_any()); + } + + #[test] + fn test_process_result_merge() { + let mut result1 = ProcessResult { + states_released: 1, + prs_released: 2, + repos_synced: 3, + refs_created: 4, + refs_updated: 5, + refs_deleted: 6, + errors: vec!["error1".to_string()], + }; + + let result2 = ProcessResult { + states_released: 10, + prs_released: 20, + repos_synced: 30, + refs_created: 40, + refs_updated: 50, + refs_deleted: 60, + errors: vec!["error2".to_string()], + }; + + result1.merge(result2); + + assert_eq!(result1.states_released, 11); + assert_eq!(result1.prs_released, 22); + assert_eq!(result1.repos_synced, 33); + assert_eq!(result1.refs_created, 44); + assert_eq!(result1.refs_updated, 55); + assert_eq!(result1.refs_deleted, 66); + assert_eq!(result1.errors.len(), 2); + } + + #[test] + fn test_extract_identifier_from_repo_path_valid() { + use std::path::PathBuf; + + let git_data_path = PathBuf::from("/data/git"); + let repo_path = PathBuf::from("/data/git/npub1abc123/my-repo.git"); + + let result = extract_identifier_from_repo_path(&repo_path, &git_data_path); + assert_eq!(result, Some("my-repo".to_string())); + } + + #[test] + fn test_extract_identifier_from_repo_path_nested() { + use std::path::PathBuf; + + let git_data_path = PathBuf::from("/var/lib/ngit/git"); + let repo_path = PathBuf::from("/var/lib/ngit/git/npub1xyz/ngit-grasp.git"); + + let result = extract_identifier_from_repo_path(&repo_path, &git_data_path); + assert_eq!(result, Some("ngit-grasp".to_string())); + } + + #[test] + fn test_extract_identifier_from_repo_path_invalid_no_git_suffix() { + use std::path::PathBuf; + + let git_data_path = PathBuf::from("/data/git"); + let repo_path = PathBuf::from("/data/git/npub1abc123/my-repo"); + + let result = extract_identifier_from_repo_path(&repo_path, &git_data_path); + assert_eq!(result, None); + } + + #[test] + fn test_extract_identifier_from_repo_path_invalid_wrong_depth() { + use std::path::PathBuf; + + let git_data_path = PathBuf::from("/data/git"); + let repo_path = PathBuf::from("/data/git/my-repo.git"); // Missing npub level + + let result = extract_identifier_from_repo_path(&repo_path, &git_data_path); + assert_eq!(result, None); + } + + #[test] + fn test_extract_identifier_from_pr_event_valid() { + use nostr_sdk::{EventBuilder, Keys, Kind, Tag, TagKind}; + + let keys = Keys::generate(); + let tags = vec![Tag::custom( + TagKind::Custom("a".into()), + vec!["30617:abc123def456:test-repo".to_string()], + )]; + + let event = EventBuilder::new(Kind::from(1618), "PR content") + .tags(tags) + .sign_with_keys(&keys) + .unwrap(); + + let result = extract_identifier_from_pr_event(&event); + assert_eq!(result, Some("test-repo".to_string())); + } + + #[test] + fn test_extract_identifier_from_pr_event_missing_tag() { + use nostr_sdk::{EventBuilder, Keys, Kind, Tag, TagKind}; + + let keys = Keys::generate(); + let tags = vec![Tag::custom( + TagKind::Custom("c".into()), + vec!["commit123".to_string()], + )]; + + let event = EventBuilder::new(Kind::from(1618), "PR content") + .tags(tags) + .sign_with_keys(&keys) + .unwrap(); + + let result = extract_identifier_from_pr_event(&event); + assert_eq!(result, None); + } + + #[test] + fn test_extract_identifier_from_pr_event_wrong_kind_a_tag() { + use nostr_sdk::{EventBuilder, Keys, Kind, Tag, TagKind}; + + let keys = Keys::generate(); + let tags = vec![Tag::custom( + TagKind::Custom("a".into()), + vec!["30618:abc123:test-repo".to_string()], // 30618 not 30617 + )]; + + let event = EventBuilder::new(Kind::from(1618), "PR content") + .tags(tags) + .sign_with_keys(&keys) + .unwrap(); + + let result = extract_identifier_from_pr_event(&event); + assert_eq!(result, None); + } + #[test] fn test_sync_result_default() { let result = SyncResult::default(); -- cgit v1.2.3