From c67ebe6f33bfa191f17eb0df24d3ee18092c74e1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 23:31:38 +0000 Subject: refactor: unify event processing logic Eliminates code duplication by extracting core event processing into reusable functions. All state and PR event processing now uses the same unified logic from src/git/process.rs. Changes: - Add src/git/process.rs with unified processing functions - process_state_with_git_data() for state events - process_pr_with_git_data() for PR events - Pure functions with comprehensive result types - Refactor policy handlers to use unified processing - src/nostr/policy/state.rs: Remove ~70 lines of duplicated logic - src/nostr/policy/pr_event.rs: Remove ~40 lines of duplicated logic - Refactor purgatory processing to use unified functions - src/git/sync.rs: Remove ~125 lines of duplicated logic - Make extract_owner_from_repo_path() public for reuse Benefits: - DRY: Single source of truth for event processing - Testable: Pure functions with clear contracts - Maintainable: Changes happen in one place - Consistent: All code paths use same logic All 217 unit tests + 40 integration tests pass (257/257). --- src/git/mod.rs | 1 + src/git/process.rs | 255 +++++++++++++++++++++++++++++++++++++++++++ src/git/sync.rs | 190 ++++++++------------------------ src/nostr/policy/pr_event.rs | 132 +++++++--------------- src/nostr/policy/state.rs | 194 +++++++------------------------- 5 files changed, 381 insertions(+), 391 deletions(-) create mode 100644 src/git/process.rs (limited to 'src') diff --git a/src/git/mod.rs b/src/git/mod.rs index fb17c53..205e3bc 100644 --- a/src/git/mod.rs +++ b/src/git/mod.rs @@ -19,6 +19,7 @@ pub mod authorization; pub mod handlers; +pub mod process; pub mod protocol; pub mod subprocess; pub mod sync; diff --git a/src/git/process.rs b/src/git/process.rs new file mode 100644 index 0000000..d052c04 --- /dev/null +++ b/src/git/process.rs @@ -0,0 +1,255 @@ +//! Event Processing - Unified logic for processing state and PR events with git data +//! +//! This module provides the core processing logic used when events have git data available. +//! These functions are used in multiple scenarios: +//! - When events arrive with git data already available (policy handlers) +//! - When events are released from purgatory (purgatory sync) +//! - When git pushes trigger purgatory releases (receive-pack handler) + +use std::path::Path; +use nostr_sdk::Event; +use crate::git::authorization::{collect_authorized_maintainers, RepositoryData}; +use crate::git::sync::{align_repository_with_state, sync_pr_refs_to_tagged_owner_repos, copy_missing_oids_between_repos}; +use crate::git; +use crate::nostr::events::RepositoryState; + +/// Result of processing a state event with git data +#[derive(Debug, Default, Clone)] +pub struct ProcessStateResult { + /// Number of repositories synced (OIDs copied + refs aligned) + pub repos_synced: usize, + /// Number of refs created across all repos + pub refs_created: usize, + /// Number of refs updated across all repos + pub refs_updated: usize, + /// Number of refs deleted across all repos + pub refs_deleted: usize, + /// Errors encountered (non-fatal) + pub errors: Vec, +} + +/// Result of processing a PR event with git data +#[derive(Debug, Default, Clone)] +pub struct ProcessPrResult { + /// Number of repositories synced + pub repos_synced: usize, + /// Number of refs created across all repos + pub refs_created: usize, + /// Errors encountered (non-fatal) + pub errors: Vec, +} + +/// Process a single state event that has git data available. +/// +/// This is the core processing logic used when: +/// - A state event arrives with git data already available +/// - A state event is released from purgatory +/// +/// Does NOT save to database or notify subscribers - caller handles that. +/// +/// # Processing Steps +/// 1. Identify owner repos where state author is an authorized maintainer +/// 2. For each owner repo, check if this state is the latest authorized +/// 3. Copy missing OIDs from source repo to target repo +/// 4. Align refs (branches, tags, HEAD) with the state +/// +/// # Arguments +/// * `state` - The state event to process +/// * `source_repo_path` - Path to repo that has the git data +/// * `db_repo_data` - Repository data from database (announcements + states) +/// * `git_data_path` - Base path for git repositories +/// +/// # Returns +/// ProcessStateResult with statistics +pub fn process_state_with_git_data( + state: &RepositoryState, + source_repo_path: &Path, + db_repo_data: &RepositoryData, + git_data_path: &Path, +) -> ProcessStateResult { + let mut result = ProcessStateResult::default(); + + let state_author = state.event.pubkey.to_hex(); + + // Collect authorized maintainers per owner + let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); + + // Step 1: Identify owner repos that the state event author is maintainer for + let authorized_owners: Vec<&String> = by_owner + .iter() + .filter(|(_, maintainers)| maintainers.contains(&state_author)) + .map(|(owner, _)| owner) + .collect(); + + if authorized_owners.is_empty() { + tracing::debug!( + identifier = %state.identifier, + author = %state_author, + "State event author not authorized for any owner" + ); + return result; + } + + // Process each owner repo that authorizes this state event author + for owner in &authorized_owners { + let maintainers = by_owner.get(*owner).unwrap(); + + // Step 2: Check if this state event is the latest authorized for this owner + let is_latest = crate::git::sync::is_latest_authorized_state_public( + state, + maintainers, + &db_repo_data.states, + ); + + if !is_latest { + tracing::debug!( + identifier = %state.identifier, + owner = %owner, + "Skipping owner - newer authorized state exists" + ); + continue; + } + + // Find the announcement for this owner + let Some(announcement) = db_repo_data + .announcements + .iter() + .find(|a| a.event.pubkey.to_hex() == **owner) + else { + continue; + }; + + let target_repo_path = git_data_path.join(announcement.repo_path()); + + // Step 3: Check git repo exists for that owner + if !target_repo_path.exists() { + tracing::debug!( + identifier = %state.identifier, + owner = %owner, + repo_path = %target_repo_path.display(), + "Skipping owner - repository doesn't exist" + ); + continue; + } + + // Step 4: Copy all required OIDs to that repo (unless it's source_repo_path) + if target_repo_path != source_repo_path { + if let Err(e) = copy_missing_oids_between_repos( + source_repo_path, + &target_repo_path, + state, + ) { + tracing::warn!( + identifier = %state.identifier, + source = %source_repo_path.display(), + target = %target_repo_path.display(), + error = %e, + "Failed to copy OIDs between repos" + ); + result.errors.push(e); + continue; // Skip this owner repo + } + } + + // Step 5: Reset the git state in that repo to match the state event + let align_result = align_repository_with_state(&target_repo_path, state); + result.repos_synced += 1; + result.refs_created += align_result.refs_created; + result.refs_updated += align_result.refs_updated; + result.refs_deleted += align_result.refs_deleted; + + tracing::info!( + identifier = %state.identifier, + owner = %owner, + repo_path = %target_repo_path.display(), + refs_created = align_result.refs_created, + refs_updated = align_result.refs_updated, + refs_deleted = align_result.refs_deleted, + head_set = align_result.head_set, + "Aligned repository with state" + ); + } + + result +} + +/// Process a single PR event that has git data available. +/// +/// This is the core processing logic used when: +/// - A PR event arrives with git data already available +/// - A PR event is released from purgatory +/// +/// Does NOT save to database or notify subscribers - caller handles that. +/// +/// # Processing Steps +/// 1. Sync PR commit to owner repos using tagged maintainer logic +/// 2. Create refs/nostr/ ref in source repo (if missing) +/// 3. Create refs/nostr/ refs in all synced repos +/// +/// # Arguments +/// * `event` - The PR event to process +/// * `commit` - The commit hash from the PR event +/// * `source_repo_path` - Path to repo that has the commit +/// * `db_repo_data` - Repository data from database (announcements + states) +/// * `git_data_path` - Base path for git repositories +/// * `source_owner_pubkey` - Owner pubkey of source repo (to skip) +/// +/// # Returns +/// ProcessPrResult with statistics +pub fn process_pr_with_git_data( + event: &Event, + commit: &str, + source_repo_path: &Path, + db_repo_data: &RepositoryData, + git_data_path: &Path, + source_owner_pubkey: &str, +) -> ProcessPrResult { + let mut result = ProcessPrResult::default(); + + let event_id = event.id.to_hex(); + + // Sync PR ref to owner repos using tagged maintainer logic + let pr_refs = vec![(event_id.clone(), commit.to_string())]; + let pr_events = vec![event.clone()]; + + let sync_result = sync_pr_refs_to_tagged_owner_repos( + source_repo_path, + &pr_refs, + &pr_events, + db_repo_data, + git_data_path, + source_owner_pubkey, + ); + result.repos_synced += sync_result.repos_synced; + result.refs_created += sync_result.refs_created; + result.errors.extend( + sync_result + .errors + .into_iter() + .map(|(_, e)| e), + ); + + // Create the ref in the source repo if it doesn't exist + let ref_name = format!("refs/nostr/{}", event_id); + if git::get_ref_commit(source_repo_path, &ref_name).is_none() { + if let Err(e) = git::update_ref(source_repo_path, &ref_name, commit) { + tracing::warn!( + event_id = %event_id, + repo = %source_repo_path.display(), + error = %e, + "Failed to create PR ref in source repo" + ); + result.errors.push(e); + } else { + result.refs_created += 1; + tracing::info!( + event_id = %event_id, + commit = %commit, + repo = %source_repo_path.display(), + "Created PR ref in source repo" + ); + } + } + + result +} diff --git a/src/git/sync.rs b/src/git/sync.rs index 2f43e6e..5e2d3f2 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -908,12 +908,9 @@ async fn process_purgatory_state_events( } }; - // Collect authorized maintainers per owner (computed once) - let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); - // Process each state event in chronological order for entry in &purgatory_states { - // Step 0: Check if we have all the git data needed to apply this state event + // Check if we have all the git data needed to apply this state event if !can_apply_state(&entry.event, source_repo_path) { debug!( identifier = %identifier, @@ -940,122 +937,19 @@ async fn process_purgatory_state_events( } }; - let state_author = state.event.pubkey.to_hex(); - - // Step 1: Identify owner repos that the state event author is maintainer for - let authorized_owners: Vec<&String> = by_owner - .iter() - .filter(|(_, maintainers)| maintainers.contains(&state_author)) - .map(|(owner, _)| owner) - .collect(); - - if authorized_owners.is_empty() { - debug!( - identifier = %identifier, - event_id = %entry.event.id, - pubkey = %state_author, - "State event author not authorized for any owner - skipping" - ); - continue; - } - - // Track if we applied to at least one owner repo - let mut applied_to_any = false; - - // Process each owner repo that authorizes this state event author - for owner in &authorized_owners { - let maintainers = by_owner.get(*owner).unwrap(); - - // Step 2: Check if this state event is the latest authorized for this owner - // Only consider database states, not other purgatory states - let is_latest = is_latest_authorized_state( - &state, - maintainers, - &db_repo_data.states, - ); - - if !is_latest { - debug!( - identifier = %identifier, - event_id = %entry.event.id, - owner = %owner, - "Skipping owner - a newer authorized state exists" - ); - continue; - } - - // Find the announcement for this owner - let announcement = db_repo_data - .announcements - .iter() - .find(|a| a.event.pubkey.to_hex() == **owner); - - let Some(announcement) = announcement else { - continue; - }; - - let target_repo_path = git_data_path.join(announcement.repo_path()); - - // Step 3: Check git repo exists for that owner - if !target_repo_path.exists() { - debug!( - identifier = %identifier, - owner = %owner, - repo_path = %target_repo_path.display(), - "Skipping owner - repository doesn't exist" - ); - continue; - } - - // Step 4: Copy all required OIDs to that repo (unless it's source_repo_path) - if target_repo_path != source_repo_path { - if let Err(e) = - copy_missing_oids_between_repos(source_repo_path, &target_repo_path, &state) - { - warn!( - identifier = %identifier, - source = %source_repo_path.display(), - target = %target_repo_path.display(), - error = %e, - "Failed to copy OIDs between repos" - ); - result - .errors - .push((target_repo_path.display().to_string(), e).1); - // Continue anyway - we'll try to align what we can - } - } - - // Step 5: Reset the git state in that repo to match the state event - // (excluding refs/nostr/*) - let align_result = align_repository_with_state(&target_repo_path, &state); - result.repos_synced += 1; - result.refs_created += align_result.refs_created; - result.refs_updated += align_result.refs_updated; - result.refs_deleted += align_result.refs_deleted; - - info!( - identifier = %identifier, - owner = %owner, - event_id = %entry.event.id, - repo_path = %target_repo_path.display(), - refs_created = align_result.refs_created, - refs_updated = align_result.refs_updated, - refs_deleted = align_result.refs_deleted, - head_set = align_result.head_set, - "Aligned repository with state from purgatory" - ); - - applied_to_any = true; - } + // Use unified processing function + let process_result = crate::git::process::process_state_with_git_data( + &state, + source_repo_path, + &db_repo_data, + git_data_path, + ); - // We have the git data now, so we should release from purgatory regardless of - // whether we applied to any repo. The question is: should we save to DB or just remove? - // - // - If there's a newer state event from the same author already in the DB, just remove - // (no point saving an older event that will never be used) - // - Otherwise, save it to the DB (even if we didn't apply to any repo, because in the - // future the currently-authorized state event might be deleted and this one should apply) + result.repos_synced += process_result.repos_synced; + result.refs_created += process_result.refs_created; + result.refs_updated += process_result.refs_updated; + result.refs_deleted += process_result.refs_deleted; + result.errors.extend(process_result.errors); // Check if there's a newer state from the same author in the database let has_newer_from_same_author = db_repo_data.states.iter().any(|s| { @@ -1073,17 +967,16 @@ async fn process_purgatory_state_events( debug!( identifier = %identifier, event_id = %entry.event.id, - author = %state_author, "Removed older state event from purgatory - newer event from same author exists in DB" ); } else { - // Save to database (even if we didn't apply to any repo) + // Save to database match database.save_event(&entry.event).await { Ok(_) => { info!( identifier = %identifier, event_id = %entry.event.id, - applied_to_repos = applied_to_any, + repos_synced = process_result.repos_synced, "Saved purgatory state event to database" ); @@ -1169,6 +1062,25 @@ fn is_latest_authorized_state( } } +/// Check if a state event is the latest authorized state for a given maintainer set. +/// +/// Only considers states already in the database, not other purgatory states. +/// +/// # Arguments +/// * `state` - The state event to check +/// * `maintainers` - The set of authorized maintainers for the owner +/// * `db_states` - State events from the database +/// +/// # Returns +/// true if this state is the latest (or equal latest) among all authorized states in the DB +pub fn is_latest_authorized_state_public( + state: &RepositoryState, + maintainers: &[String], + db_states: &[RepositoryState], +) -> bool { + is_latest_authorized_state(state, maintainers, db_states) +} + /// Process PR events from purgatory that can now be satisfied. async fn process_purgatory_pr_events( identifier: &str, @@ -1224,39 +1136,23 @@ async fn process_purgatory_pr_events( continue; } - // Sync PR ref to owner repos - let pr_refs = vec![(event.id.to_hex(), entry.commit.clone())]; - let pr_events = vec![event.clone()]; - - // Get owner pubkey from source repo path + // Extract owner pubkey let owner_pubkey = extract_owner_from_repo_path(source_repo_path, git_data_path) .unwrap_or_default(); - let sync_result = sync_pr_refs_to_tagged_owner_repos( + // Use unified processing function + let process_result = crate::git::process::process_pr_with_git_data( + event, + &entry.commit, source_repo_path, - &pr_refs, - &pr_events, &db_repo_data, git_data_path, &owner_pubkey, ); - result.repos_synced += sync_result.repos_synced; - result.refs_created += sync_result.refs_created; - // Create the ref in the source repo if it doesn't exist - let ref_name = format!("refs/nostr/{}", event.id.to_hex()); - if git::get_ref_commit(source_repo_path, &ref_name).is_none() { - if let Err(e) = git::update_ref(source_repo_path, &ref_name, &entry.commit) { - warn!( - identifier = %identifier, - event_id = %event.id, - error = %e, - "Failed to create PR ref in source repo" - ); - } else { - result.refs_created += 1; - } - } + result.repos_synced += process_result.repos_synced; + result.refs_created += process_result.refs_created; + result.errors.extend(process_result.errors); // Save event to database match database.save_event(event).await { @@ -1307,7 +1203,7 @@ async fn process_purgatory_pr_events( /// Extract owner pubkey from a repository path. /// /// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. -fn extract_owner_from_repo_path(repo_path: &Path, git_data_path: &Path) -> Option { +pub fn extract_owner_from_repo_path(repo_path: &Path, git_data_path: &Path) -> Option { let relative = repo_path.strip_prefix(git_data_path).ok()?; let components: Vec<_> = relative.components().collect(); if !components.is_empty() { diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index ff3bade..9942a6a 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs @@ -27,7 +27,7 @@ impl PrEventPolicy { /// 2. Commit existence in referenced repositories /// 3. Deletion of incorrect refs/nostr/ refs /// 4. Deletion of incorrect placeholders - /// 5. Copying git data to all referenced repositories when found + /// 5. Processing PR event with unified function /// /// # Returns /// - `Ok(true)` if git data ready (commit exists and is synced to all repos) @@ -64,7 +64,6 @@ impl PrEventPolicy { ); // Remove placeholder - event processing will continue normally self.ctx.purgatory.remove_pr(&event_id); - // Continue to validate and sync refs across all repos } else { // Placeholder has different commit - incoming event supersedes tracing::info!( @@ -75,8 +74,7 @@ impl PrEventPolicy { ); // Remove incorrect placeholder self.ctx.purgatory.remove_pr(&event_id); - // Delete incorrect git data (refs/nostr/) from all repos - // This will be handled below when we validate refs + // Delete incorrect git data (refs/nostr/) will be handled below } } @@ -87,9 +85,8 @@ impl PrEventPolicy { return Ok(false); } - // delete incorrect refs/nostr/ + // Delete incorrect refs/nostr/ for repo_path in &repo_paths { - // First, validate/delete any incorrect refs/nostr/ match git::validate_nostr_ref(repo_path, &event_id, &commit) { Ok(true) => { tracing::info!( @@ -110,10 +107,9 @@ impl PrEventPolicy { } } - // find location of correct git data (if exists) + // Find location of correct git data (if exists) let mut source_repo: Option = None; for repo_path in &repo_paths { - // Check if commit exists in this repository if git::commit_exists(repo_path, &commit) { source_repo = Some(repo_path.clone()); tracing::debug!( @@ -125,59 +121,50 @@ impl PrEventPolicy { } } - // Copy commit to all other referenced repositories if let Some(source_repo) = source_repo { - for repo_path in &repo_paths { - if repo_path == &source_repo { - // Skip source repo - continue; - } + // Extract identifier + let identifier = crate::git::sync::extract_identifier_from_pr_event(event) + .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; + + // Fetch repository data + let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; + + // Extract owner pubkey from source repo path + let owner_pubkey = crate::git::sync::extract_owner_from_repo_path( + &source_repo, + &self.ctx.git_data_path, + ) + .unwrap_or_default(); + + // Use unified processing function + let result = crate::git::process::process_pr_with_git_data( + event, + &commit, + &source_repo, + &db_repo_data, + &self.ctx.git_data_path, + &owner_pubkey, + ); - // Check if repository exists - if !repo_path.exists() { - tracing::debug!( - "Repository {} does not exist, skipping sync", - repo_path.display() - ); - continue; - } + tracing::info!( + identifier = %identifier, + event_id = %event_id, + repos_synced = result.repos_synced, + refs_created = result.refs_created, + "Processed PR event with git data already available" + ); - // Check if commit already exists - if git::commit_exists(repo_path, &commit) { - tracing::debug!( - "Commit {} already exists in {}, skipping sync", - commit, - repo_path.display() + if !result.errors.is_empty() { + for error in &result.errors { + tracing::warn!( + identifier = %identifier, + event_id = %event_id, + error = %error, + "Error processing PR event" ); - continue; - } - - // Fetch commit from source repo to target repo - tracing::info!( - "Syncing commit {} from {} to {}", - commit, - source_repo.display(), - repo_path.display() - ); - - match self.copy_commit(&source_repo, repo_path, &commit).await { - Ok(()) => { - tracing::info!( - "Successfully synced commit {} to {}", - commit, - repo_path.display() - ); - } - Err(e) => { - tracing::warn!( - "Failed to sync commit {} to {}: {}", - commit, - repo_path.display(), - e - ); - } } } + Ok(true) } else { tracing::debug!( @@ -250,40 +237,5 @@ impl PrEventPolicy { Ok(repo_paths) } - /// Copy a commit from source repository to target repository - /// - /// Uses `git fetch` to copy a specific commit between local repositories. - /// - /// # Arguments - /// * `source_repo` - Path to repository containing the commit - /// * `target_repo` - Path to repository to receive the commit - /// * `commit` - Commit hash to copy - /// - /// # Returns - /// Ok(()) on success, Err with error message on failure - async fn copy_commit( - &self, - source_repo: &std::path::Path, - target_repo: &std::path::Path, - commit: &str, - ) -> Result<(), String> { - use std::process::Command; - - let output = Command::new("git") - .args([ - "fetch", - source_repo.to_str().ok_or("Invalid source path")?, - commit, - ]) - .current_dir(target_repo) - .output() - .map_err(|e| format!("Failed to execute git fetch: {}", e))?; - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(format!("git fetch failed: {}", stderr)); - } - - Ok(()) - } } diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 7d69d7d..68b1e97 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -9,9 +9,8 @@ use nostr_relay_builder::builder::WritePolicyResult; use nostr_relay_builder::prelude::Event; use super::PolicyContext; -use crate::git::authorization::{collect_authorized_maintainers, fetch_repository_data}; -use crate::git::sync::align_repository_with_state; -use crate::git::{self}; +use crate::git::authorization::fetch_repository_data; +use crate::git; use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; /// Result of state policy evaluation @@ -50,7 +49,7 @@ impl StatePolicy { let state = RepositoryState::from_event(event.clone()).context("Failed to parse state event")?; - // duplicate check in purgatory + // Duplicate check in purgatory if self .ctx .purgatory @@ -63,95 +62,65 @@ impl StatePolicy { event.id, ); return Ok(WritePolicyResult::Reject { - status: true, // Client sees OK + status: true, message: "duplicate: in purgatory".into(), }); } - // get all repositories and state events from db with identifier + + // Get all repositories and state events from db with identifier let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; - // duplicate check in db + // Duplicate check in db if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { - tracing::debug!("processed state event duplicate (in db): {}", event.id,); + tracing::debug!("processed state event duplicate (in db): {}", event.id); return Ok(WritePolicyResult::Reject { - status: true, // Client sees OK + status: true, message: "duplicate".into(), }); } - // check if git data is avialable + // Check if git data is available if let Some(repo_with_git_data) = find_repo_with_git_data(&db_repo_data.announcements, &state, &self.ctx.git_data_path) { tracing::debug!( - "processing state event git as data already available: {}", + "processing state event as git data already available: {}", event.id, ); - // find repos for which this state is authorised and align the git refs to this state - let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); - let mut repo_count = 0; - for (owner, maintainers) in by_owner { - if maintainers.contains(&event.pubkey.to_string()) { - if let Some(previous_state) = db_repo_data - .states - .iter() - .filter(|e| maintainers.contains(&e.event.pubkey.to_string())) - .max_by_key(|e| e.event.created_at) - { - // TODO in event of a tie the event with the biggest event id wins - if state.event.created_at > previous_state.event.created_at { - if let Some(annoucement) = db_repo_data - .announcements - .iter() - .find(|a| a.event.pubkey.to_string().eq(&owner)) - { - let repo_path = - self.ctx.git_data_path.join(annoucement.repo_path().clone()); - if !repo_path.exists() { - // eg if annoucement doesnt list repo (but stored as its in maintainer set) - continue; - } - // If repo_path != repo_with_git_data, copy missing oids first - if repo_path != repo_with_git_data { - if let Err(e) = self.copy_missing_oids( - &repo_with_git_data, - &repo_path, - &state, - ) { - tracing::warn!( - "Failed to copy oids from {} to {}: {}", - repo_with_git_data.display(), - repo_path.display(), - e - ); - } - } + // Use unified processing function + let result = crate::git::process::process_state_with_git_data( + &state, + &repo_with_git_data, + &db_repo_data, + &self.ctx.git_data_path, + ); + + tracing::info!( + identifier = %state.identifier, + event_id = %event.id, + repos_synced = result.repos_synced, + refs_created = result.refs_created, + refs_updated = result.refs_updated, + refs_deleted = result.refs_deleted, + "Processed state event with git data already available" + ); - let result = align_repository_with_state(&repo_path, &state); - repo_count += 1; - tracing::info!( - "Aligned {} with state: created={}, updated={}, deleted={}, head_set={}", - repo_path.display(), - result.refs_created, - result.refs_updated, - result.refs_deleted, - result.head_set - ); - } - } - } + if !result.errors.is_empty() { + for error in &result.errors { + tracing::warn!( + identifier = %state.identifier, + event_id = %event.id, + error = %error, + "Error processing state event" + ); } } - tracing::info!( - "immediately accepting state event. Was latest authorised state and git data updated for {repo_count} repositories: eventid: {}", - state.event.id, - ); - // immediately accept the event, bypassing purgatory - Ok(WritePolicyResult::Accept) // event should be saved and broadcast + // Event will be saved and broadcast by relay builder + Ok(WritePolicyResult::Accept) } else { - // if no git data - add to purgatory + // If no git data - add to purgatory // (add_state automatically enqueues for background sync) self.ctx .purgatory @@ -163,96 +132,13 @@ impl StatePolicy { state.identifier, ); Ok(WritePolicyResult::Reject { - status: true, // Client sees OK + status: true, message: "purgatory: won't be served until git data arrives".into(), }) } } - /// Copy missing OIDs from a source repository to a target repository - /// - /// Identifies commits referenced in the state that are missing from the target - /// repository and copies them from the source repository using git fetch. - /// - /// # Arguments - /// * `source_repo` - Path to repository containing the commits - /// * `target_repo` - Path to repository to receive the commits - /// * `state` - Repository state containing commit references - /// - /// # Returns - /// Ok(()) on success, Err with error message on failure - fn copy_missing_oids( - &self, - source_repo: &Path, - target_repo: &Path, - state: &RepositoryState, - ) -> Result<(), String> { - use std::process::Command; - - // Collect all commits referenced in the state - let mut commits_to_check = Vec::new(); - - for branch in &state.branches { - if !branch.commit.starts_with("ref: ") { - commits_to_check.push(&branch.commit); - } - } - - for tag in &state.tags { - if !tag.commit.starts_with("ref: ") { - commits_to_check.push(&tag.commit); - } - } - - // Identify missing commits - let mut missing_commits = Vec::new(); - for commit in commits_to_check { - if !git::oid_exists(target_repo, commit) { - missing_commits.push(commit); - } - } - - if missing_commits.is_empty() { - tracing::debug!( - "No missing commits to copy from {} to {}", - source_repo.display(), - target_repo.display() - ); - return Ok(()); - } - - tracing::info!( - "Copying {} missing commits from {} to {}", - missing_commits.len(), - source_repo.display(), - target_repo.display() - ); - - // Fetch each missing commit from source to target - for commit in &missing_commits { - let output = Command::new("git") - .args([ - "fetch", - source_repo.to_str().ok_or("Invalid source path")?, - commit, - ]) - .current_dir(target_repo) - .output() - .map_err(|e| format!("Failed to execute git fetch: {}", e))?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(format!( - "git fetch failed for commit {}: {}", - commit, stderr - )); - } - - tracing::debug!("Copied commit {} to {}", commit, target_repo.display()); - } - Ok(()) - } } fn find_repo_with_git_data( -- cgit v1.2.3