From c67ebe6f33bfa191f17eb0df24d3ee18092c74e1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 23:31:38 +0000 Subject: refactor: unify event processing logic Eliminates code duplication by extracting core event processing into reusable functions. All state and PR event processing now uses the same unified logic from src/git/process.rs. Changes: - Add src/git/process.rs with unified processing functions - process_state_with_git_data() for state events - process_pr_with_git_data() for PR events - Pure functions with comprehensive result types - Refactor policy handlers to use unified processing - src/nostr/policy/state.rs: Remove ~70 lines of duplicated logic - src/nostr/policy/pr_event.rs: Remove ~40 lines of duplicated logic - Refactor purgatory processing to use unified functions - src/git/sync.rs: Remove ~125 lines of duplicated logic - Make extract_owner_from_repo_path() public for reuse Benefits: - DRY: Single source of truth for event processing - Testable: Pure functions with clear contracts - Maintainable: Changes happen in one place - Consistent: All code paths use same logic All 217 unit tests + 40 integration tests pass (257/257). --- src/nostr/policy/pr_event.rs | 132 ++++++++++------------------- src/nostr/policy/state.rs | 194 +++++++++---------------------------------- 2 files changed, 82 insertions(+), 244 deletions(-) (limited to 'src/nostr') diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index ff3bade..9942a6a 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs @@ -27,7 +27,7 @@ impl PrEventPolicy { /// 2. Commit existence in referenced repositories /// 3. Deletion of incorrect refs/nostr/ refs /// 4. Deletion of incorrect placeholders - /// 5. Copying git data to all referenced repositories when found + /// 5. Processing PR event with unified function /// /// # Returns /// - `Ok(true)` if git data ready (commit exists and is synced to all repos) @@ -64,7 +64,6 @@ impl PrEventPolicy { ); // Remove placeholder - event processing will continue normally self.ctx.purgatory.remove_pr(&event_id); - // Continue to validate and sync refs across all repos } else { // Placeholder has different commit - incoming event supersedes tracing::info!( @@ -75,8 +74,7 @@ impl PrEventPolicy { ); // Remove incorrect placeholder self.ctx.purgatory.remove_pr(&event_id); - // Delete incorrect git data (refs/nostr/) from all repos - // This will be handled below when we validate refs + // Delete incorrect git data (refs/nostr/) will be handled below } } @@ -87,9 +85,8 @@ impl PrEventPolicy { return Ok(false); } - // delete incorrect refs/nostr/ + // Delete incorrect refs/nostr/ for repo_path in &repo_paths { - // First, validate/delete any incorrect refs/nostr/ match git::validate_nostr_ref(repo_path, &event_id, &commit) { Ok(true) => { tracing::info!( @@ -110,10 +107,9 @@ impl PrEventPolicy { } } - // find location of correct git data (if exists) + // Find location of correct git data (if exists) let mut source_repo: Option = None; for repo_path in &repo_paths { - // Check if commit exists in this repository if git::commit_exists(repo_path, &commit) { source_repo = Some(repo_path.clone()); tracing::debug!( @@ -125,59 +121,50 @@ impl PrEventPolicy { } } - // Copy commit to all other referenced repositories if let Some(source_repo) = source_repo { - for repo_path in &repo_paths { - if repo_path == &source_repo { - // Skip source repo - continue; - } + // Extract identifier + let identifier = crate::git::sync::extract_identifier_from_pr_event(event) + .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; + + // Fetch repository data + let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; + + // Extract owner pubkey from source repo path + let owner_pubkey = crate::git::sync::extract_owner_from_repo_path( + &source_repo, + &self.ctx.git_data_path, + ) + .unwrap_or_default(); + + // Use unified processing function + let result = crate::git::process::process_pr_with_git_data( + event, + &commit, + &source_repo, + &db_repo_data, + &self.ctx.git_data_path, + &owner_pubkey, + ); - // Check if repository exists - if !repo_path.exists() { - tracing::debug!( - "Repository {} does not exist, skipping sync", - repo_path.display() - ); - continue; - } + tracing::info!( + identifier = %identifier, + event_id = %event_id, + repos_synced = result.repos_synced, + refs_created = result.refs_created, + "Processed PR event with git data already available" + ); - // Check if commit already exists - if git::commit_exists(repo_path, &commit) { - tracing::debug!( - "Commit {} already exists in {}, skipping sync", - commit, - repo_path.display() + if !result.errors.is_empty() { + for error in &result.errors { + tracing::warn!( + identifier = %identifier, + event_id = %event_id, + error = %error, + "Error processing PR event" ); - continue; - } - - // Fetch commit from source repo to target repo - tracing::info!( - "Syncing commit {} from {} to {}", - commit, - source_repo.display(), - repo_path.display() - ); - - match self.copy_commit(&source_repo, repo_path, &commit).await { - Ok(()) => { - tracing::info!( - "Successfully synced commit {} to {}", - commit, - repo_path.display() - ); - } - Err(e) => { - tracing::warn!( - "Failed to sync commit {} to {}: {}", - commit, - repo_path.display(), - e - ); - } } } + Ok(true) } else { tracing::debug!( @@ -250,40 +237,5 @@ impl PrEventPolicy { Ok(repo_paths) } - /// Copy a commit from source repository to target repository - /// - /// Uses `git fetch` to copy a specific commit between local repositories. - /// - /// # Arguments - /// * `source_repo` - Path to repository containing the commit - /// * `target_repo` - Path to repository to receive the commit - /// * `commit` - Commit hash to copy - /// - /// # Returns - /// Ok(()) on success, Err with error message on failure - async fn copy_commit( - &self, - source_repo: &std::path::Path, - target_repo: &std::path::Path, - commit: &str, - ) -> Result<(), String> { - use std::process::Command; - - let output = Command::new("git") - .args([ - "fetch", - source_repo.to_str().ok_or("Invalid source path")?, - commit, - ]) - .current_dir(target_repo) - .output() - .map_err(|e| format!("Failed to execute git fetch: {}", e))?; - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(format!("git fetch failed: {}", stderr)); - } - - Ok(()) - } } diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 7d69d7d..68b1e97 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -9,9 +9,8 @@ use nostr_relay_builder::builder::WritePolicyResult; use nostr_relay_builder::prelude::Event; use super::PolicyContext; -use crate::git::authorization::{collect_authorized_maintainers, fetch_repository_data}; -use crate::git::sync::align_repository_with_state; -use crate::git::{self}; +use crate::git::authorization::fetch_repository_data; +use crate::git; use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; /// Result of state policy evaluation @@ -50,7 +49,7 @@ impl StatePolicy { let state = RepositoryState::from_event(event.clone()).context("Failed to parse state event")?; - // duplicate check in purgatory + // Duplicate check in purgatory if self .ctx .purgatory @@ -63,95 +62,65 @@ impl StatePolicy { event.id, ); return Ok(WritePolicyResult::Reject { - status: true, // Client sees OK + status: true, message: "duplicate: in purgatory".into(), }); } - // get all repositories and state events from db with identifier + + // Get all repositories and state events from db with identifier let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; - // duplicate check in db + // Duplicate check in db if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { - tracing::debug!("processed state event duplicate (in db): {}", event.id,); + tracing::debug!("processed state event duplicate (in db): {}", event.id); return Ok(WritePolicyResult::Reject { - status: true, // Client sees OK + status: true, message: "duplicate".into(), }); } - // check if git data is avialable + // Check if git data is available if let Some(repo_with_git_data) = find_repo_with_git_data(&db_repo_data.announcements, &state, &self.ctx.git_data_path) { tracing::debug!( - "processing state event git as data already available: {}", + "processing state event as git data already available: {}", event.id, ); - // find repos for which this state is authorised and align the git refs to this state - let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); - let mut repo_count = 0; - for (owner, maintainers) in by_owner { - if maintainers.contains(&event.pubkey.to_string()) { - if let Some(previous_state) = db_repo_data - .states - .iter() - .filter(|e| maintainers.contains(&e.event.pubkey.to_string())) - .max_by_key(|e| e.event.created_at) - { - // TODO in event of a tie the event with the biggest event id wins - if state.event.created_at > previous_state.event.created_at { - if let Some(annoucement) = db_repo_data - .announcements - .iter() - .find(|a| a.event.pubkey.to_string().eq(&owner)) - { - let repo_path = - self.ctx.git_data_path.join(annoucement.repo_path().clone()); - if !repo_path.exists() { - // eg if annoucement doesnt list repo (but stored as its in maintainer set) - continue; - } - // If repo_path != repo_with_git_data, copy missing oids first - if repo_path != repo_with_git_data { - if let Err(e) = self.copy_missing_oids( - &repo_with_git_data, - &repo_path, - &state, - ) { - tracing::warn!( - "Failed to copy oids from {} to {}: {}", - repo_with_git_data.display(), - repo_path.display(), - e - ); - } - } + // Use unified processing function + let result = crate::git::process::process_state_with_git_data( + &state, + &repo_with_git_data, + &db_repo_data, + &self.ctx.git_data_path, + ); + + tracing::info!( + identifier = %state.identifier, + event_id = %event.id, + repos_synced = result.repos_synced, + refs_created = result.refs_created, + refs_updated = result.refs_updated, + refs_deleted = result.refs_deleted, + "Processed state event with git data already available" + ); - let result = align_repository_with_state(&repo_path, &state); - repo_count += 1; - tracing::info!( - "Aligned {} with state: created={}, updated={}, deleted={}, head_set={}", - repo_path.display(), - result.refs_created, - result.refs_updated, - result.refs_deleted, - result.head_set - ); - } - } - } + if !result.errors.is_empty() { + for error in &result.errors { + tracing::warn!( + identifier = %state.identifier, + event_id = %event.id, + error = %error, + "Error processing state event" + ); } } - tracing::info!( - "immediately accepting state event. Was latest authorised state and git data updated for {repo_count} repositories: eventid: {}", - state.event.id, - ); - // immediately accept the event, bypassing purgatory - Ok(WritePolicyResult::Accept) // event should be saved and broadcast + // Event will be saved and broadcast by relay builder + Ok(WritePolicyResult::Accept) } else { - // if no git data - add to purgatory + // If no git data - add to purgatory // (add_state automatically enqueues for background sync) self.ctx .purgatory @@ -163,96 +132,13 @@ impl StatePolicy { state.identifier, ); Ok(WritePolicyResult::Reject { - status: true, // Client sees OK + status: true, message: "purgatory: won't be served until git data arrives".into(), }) } } - /// Copy missing OIDs from a source repository to a target repository - /// - /// Identifies commits referenced in the state that are missing from the target - /// repository and copies them from the source repository using git fetch. - /// - /// # Arguments - /// * `source_repo` - Path to repository containing the commits - /// * `target_repo` - Path to repository to receive the commits - /// * `state` - Repository state containing commit references - /// - /// # Returns - /// Ok(()) on success, Err with error message on failure - fn copy_missing_oids( - &self, - source_repo: &Path, - target_repo: &Path, - state: &RepositoryState, - ) -> Result<(), String> { - use std::process::Command; - - // Collect all commits referenced in the state - let mut commits_to_check = Vec::new(); - - for branch in &state.branches { - if !branch.commit.starts_with("ref: ") { - commits_to_check.push(&branch.commit); - } - } - - for tag in &state.tags { - if !tag.commit.starts_with("ref: ") { - commits_to_check.push(&tag.commit); - } - } - - // Identify missing commits - let mut missing_commits = Vec::new(); - for commit in commits_to_check { - if !git::oid_exists(target_repo, commit) { - missing_commits.push(commit); - } - } - - if missing_commits.is_empty() { - tracing::debug!( - "No missing commits to copy from {} to {}", - source_repo.display(), - target_repo.display() - ); - return Ok(()); - } - - tracing::info!( - "Copying {} missing commits from {} to {}", - missing_commits.len(), - source_repo.display(), - target_repo.display() - ); - - // Fetch each missing commit from source to target - for commit in &missing_commits { - let output = Command::new("git") - .args([ - "fetch", - source_repo.to_str().ok_or("Invalid source path")?, - commit, - ]) - .current_dir(target_repo) - .output() - .map_err(|e| format!("Failed to execute git fetch: {}", e))?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(format!( - "git fetch failed for commit {}: {}", - commit, stderr - )); - } - - tracing::debug!("Copied commit {} to {}", commit, target_repo.display()); - } - Ok(()) - } } fn find_repo_with_git_data( -- cgit v1.2.3