From f75e1c59aacf5ce668fd327e4e3d827511661c2a Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 8 Jan 2026 00:50:54 +0000 Subject: chore: cargo fmt --- src/git/authorization.rs | 4 +-- src/git/process.rs | 66 +++++++++++++++++++-------------------- src/git/sync.rs | 40 ++++++++++++++++-------- src/main.rs | 4 ++- src/nostr/builder.rs | 9 ++++-- src/nostr/policy/pr_event.rs | 1 - src/nostr/policy/state.rs | 10 +++--- src/purgatory/helpers.rs | 22 ++++++------- src/purgatory/mod.rs | 3 +- src/purgatory/sync/context.rs | 19 +++--------- src/purgatory/sync/functions.rs | 69 +++++++++++++++++++++-------------------- src/purgatory/sync/loop.rs | 5 ++- src/purgatory/sync/throttle.rs | 31 ++++++++---------- src/sync/mod.rs | 4 ++- 14 files changed, 150 insertions(+), 137 deletions(-) (limited to 'src') diff --git a/src/git/authorization.rs b/src/git/authorization.rs index fbddb98..7502a52 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs @@ -134,11 +134,11 @@ pub async fn authorize_push( e ))); } - + // Create placeholder for git-data-first scenario // This allows cleanup if the PR event never arrives purgatory.add_pr_placeholder(event_id_hex.to_string(), new_oid.clone()); - + debug!( "Created placeholder for {} - awaiting PR event (will expire in 30min if event doesn't arrive)", event_id_hex diff --git a/src/git/process.rs b/src/git/process.rs index d052c04..215b423 100644 --- a/src/git/process.rs +++ b/src/git/process.rs @@ -6,12 +6,15 @@ //! - When events are released from purgatory (purgatory sync) //! - When git pushes trigger purgatory releases (receive-pack handler) -use std::path::Path; -use nostr_sdk::Event; -use crate::git::authorization::{collect_authorized_maintainers, RepositoryData}; -use crate::git::sync::{align_repository_with_state, sync_pr_refs_to_tagged_owner_repos, copy_missing_oids_between_repos}; use crate::git; +use crate::git::authorization::{collect_authorized_maintainers, RepositoryData}; +use crate::git::sync::{ + align_repository_with_state, copy_missing_oids_between_repos, + sync_pr_refs_to_tagged_owner_repos, +}; use crate::nostr::events::RepositoryState; +use nostr_sdk::Event; +use std::path::Path; /// Result of processing a state event with git data #[derive(Debug, Default, Clone)] @@ -68,19 +71,19 @@ pub fn process_state_with_git_data( git_data_path: &Path, ) -> ProcessStateResult { let mut result = ProcessStateResult::default(); - + let state_author = state.event.pubkey.to_hex(); - + // Collect authorized maintainers per owner let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); - + // Step 1: Identify owner repos that the state event author is maintainer for let authorized_owners: Vec<&String> = by_owner .iter() .filter(|(_, maintainers)| maintainers.contains(&state_author)) .map(|(owner, _)| owner) .collect(); - + if authorized_owners.is_empty() { tracing::debug!( identifier = %state.identifier, @@ -89,18 +92,18 @@ pub fn process_state_with_git_data( ); return result; } - + // Process each owner repo that authorizes this state event author for owner in &authorized_owners { let maintainers = by_owner.get(*owner).unwrap(); - + // Step 2: Check if this state event is the latest authorized for this owner let is_latest = crate::git::sync::is_latest_authorized_state_public( state, maintainers, &db_repo_data.states, ); - + if !is_latest { tracing::debug!( identifier = %state.identifier, @@ -109,7 +112,7 @@ pub fn process_state_with_git_data( ); continue; } - + // Find the announcement for this owner let Some(announcement) = db_repo_data .announcements @@ -118,9 +121,9 @@ pub fn process_state_with_git_data( else { continue; }; - + let target_repo_path = git_data_path.join(announcement.repo_path()); - + // Step 3: Check git repo exists for that owner if !target_repo_path.exists() { tracing::debug!( @@ -131,14 +134,12 @@ pub fn process_state_with_git_data( ); continue; } - + // Step 4: Copy all required OIDs to that repo (unless it's source_repo_path) if target_repo_path != source_repo_path { - if let Err(e) = copy_missing_oids_between_repos( - source_repo_path, - &target_repo_path, - state, - ) { + if let Err(e) = + copy_missing_oids_between_repos(source_repo_path, &target_repo_path, state) + { tracing::warn!( identifier = %state.identifier, source = %source_repo_path.display(), @@ -150,14 +151,14 @@ pub fn process_state_with_git_data( continue; // Skip this owner repo } } - + // Step 5: Reset the git state in that repo to match the state event let align_result = align_repository_with_state(&target_repo_path, state); result.repos_synced += 1; result.refs_created += align_result.refs_created; result.refs_updated += align_result.refs_updated; result.refs_deleted += align_result.refs_deleted; - + tracing::info!( identifier = %state.identifier, owner = %owner, @@ -169,7 +170,7 @@ pub fn process_state_with_git_data( "Aligned repository with state" ); } - + result } @@ -205,13 +206,13 @@ pub fn process_pr_with_git_data( source_owner_pubkey: &str, ) -> ProcessPrResult { let mut result = ProcessPrResult::default(); - + let event_id = event.id.to_hex(); - + // Sync PR ref to owner repos using tagged maintainer logic let pr_refs = vec![(event_id.clone(), commit.to_string())]; let pr_events = vec![event.clone()]; - + let sync_result = sync_pr_refs_to_tagged_owner_repos( source_repo_path, &pr_refs, @@ -222,13 +223,10 @@ pub fn process_pr_with_git_data( ); result.repos_synced += sync_result.repos_synced; result.refs_created += sync_result.refs_created; - result.errors.extend( - sync_result - .errors - .into_iter() - .map(|(_, e)| e), - ); - + result + .errors + .extend(sync_result.errors.into_iter().map(|(_, e)| e)); + // Create the ref in the source repo if it doesn't exist let ref_name = format!("refs/nostr/{}", event_id); if git::get_ref_commit(source_repo_path, &ref_name).is_none() { @@ -250,6 +248,6 @@ pub fn process_pr_with_git_data( ); } } - + result } diff --git a/src/git/sync.rs b/src/git/sync.rs index 5e2d3f2..06013a5 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -837,13 +837,27 @@ pub async fn process_newly_available_git_data( ); // Process state events from purgatory - let state_result = - process_purgatory_state_events(&identifier, source_repo_path, database, local_relay, purgatory, git_data_path).await; + let state_result = process_purgatory_state_events( + &identifier, + source_repo_path, + database, + local_relay, + purgatory, + git_data_path, + ) + .await; result.merge(state_result); // Process PR events from purgatory - let pr_result = - process_purgatory_pr_events(&identifier, source_repo_path, database, local_relay, purgatory, git_data_path).await; + let pr_result = process_purgatory_pr_events( + &identifier, + source_repo_path, + database, + local_relay, + purgatory, + git_data_path, + ) + .await; result.merge(pr_result); if result.released_any() { @@ -1113,7 +1127,9 @@ async fn process_purgatory_pr_events( error = %e, "Failed to fetch repository data for PR events" ); - result.errors.push(format!("Failed to fetch repo data: {}", e)); + result + .errors + .push(format!("Failed to fetch repo data: {}", e)); return result; } }; @@ -1137,8 +1153,8 @@ async fn process_purgatory_pr_events( } // Extract owner pubkey - let owner_pubkey = extract_owner_from_repo_path(source_repo_path, git_data_path) - .unwrap_or_default(); + let owner_pubkey = + extract_owner_from_repo_path(source_repo_path, git_data_path).unwrap_or_default(); // Use unified processing function let process_result = crate::git::process::process_pr_with_git_data( @@ -1192,7 +1208,9 @@ async fn process_purgatory_pr_events( error = %e, "Failed to save PR event to database" ); - result.errors.push(format!("Failed to save PR event: {}", e)); + result + .errors + .push(format!("Failed to save PR event: {}", e)); } } } @@ -1527,11 +1545,7 @@ mod tests { } // Helper function to create a test state event with specific timestamp - fn create_test_state_event( - keys: &Keys, - identifier: &str, - created_at: u64, - ) -> RepositoryState { + fn create_test_state_event(keys: &Keys, identifier: &str, created_at: u64) -> RepositoryState { create_test_state_event_with_nonce(keys, identifier, created_at, "") } diff --git a/src/main.rs b/src/main.rs index 8b870dc..5e9e2d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -146,7 +146,9 @@ async fn main() -> Result<()> { throttle_manager.set_context(sync_ctx.clone()); // Start the sync loop - let _sync_loop_handle = purgatory.clone().start_sync_loop(sync_ctx, throttle_manager); + let _sync_loop_handle = purgatory + .clone() + .start_sync_loop(sync_ctx, throttle_manager); info!("Purgatory sync loop started (1s interval)"); // Setup shutdown handler for purgatory cleanup diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 0e5c18a..81f7fbb 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -162,7 +162,11 @@ impl Nip34WritePolicy { match self.state_policy.validate(event) { StateResult::Accept => { // Process state alignment asynchronously - match self.state_policy.process_state_event(event, is_synced).await { + match self + .state_policy + .process_state_event(event, is_synced) + .await + { Ok(poilicy_result) => poilicy_result, Err(e) => { tracing::warn!("Failed to process state event {}: {}", event_id_str, e); @@ -247,7 +251,8 @@ impl Nip34WritePolicy { ); return WritePolicyResult::Reject { status: false, - message: "invalid: previously expired from purgatory without git data".into(), + message: "invalid: previously expired from purgatory without git data" + .into(), }; } diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index 9942a6a..00e09c3 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs @@ -237,5 +237,4 @@ impl PrEventPolicy { Ok(repo_paths) } - } diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 7bbb379..acb76a3 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -9,8 +9,8 @@ use nostr_relay_builder::builder::WritePolicyResult; use nostr_relay_builder::prelude::Event; use super::PolicyContext; -use crate::git::authorization::fetch_repository_data; use crate::git; +use crate::git::authorization::fetch_repository_data; use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; /// Result of state policy evaluation @@ -48,7 +48,11 @@ impl StatePolicy { /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) /// /// Returns the true if git data already availale or false if added to purgatory - pub async fn process_state_event(&self, event: &Event, is_synced: bool) -> Result { + pub async fn process_state_event( + &self, + event: &Event, + is_synced: bool, + ) -> Result { // Parse state to get HEAD and branch info let state = RepositoryState::from_event(event.clone()).context("Failed to parse state event")?; @@ -155,8 +159,6 @@ impl StatePolicy { }) } } - - } fn find_repo_with_git_data( diff --git a/src/purgatory/helpers.rs b/src/purgatory/helpers.rs index 93dc378..193ef99 100644 --- a/src/purgatory/helpers.rs +++ b/src/purgatory/helpers.rs @@ -515,7 +515,7 @@ mod tests { // Create a working repo to generate a commit let work_dir = tempfile::tempdir().unwrap(); - + Command::new("git") .args(["init"]) .current_dir(work_dir.path()) @@ -585,7 +585,7 @@ mod tests { use std::process::Command; let temp_dir = tempfile::tempdir().unwrap(); - + Command::new("git") .args(["init", "--bare"]) .current_dir(temp_dir.path()) @@ -603,10 +603,7 @@ mod tests { let commit_hash = commit_hash.expect("Should have a commit"); // Create a state event referencing that commit - let event = create_test_state_event( - "test-repo", - vec![("refs/heads/main", &commit_hash)], - ); + let event = create_test_state_event("test-repo", vec![("refs/heads/main", &commit_hash)]); // Should return true since the OID exists assert!(can_apply_state(&event, repo_path)); @@ -621,7 +618,10 @@ mod tests { // Create a state event referencing a non-existent commit let event = create_test_state_event( "test-repo", - vec![("refs/heads/main", "0000000000000000000000000000000000000000")], + vec![( + "refs/heads/main", + "0000000000000000000000000000000000000000", + )], ); // Should return false since the OID doesn't exist @@ -655,8 +655,8 @@ mod tests { let event = create_test_state_event( "test-repo", vec![ - ("refs/heads/main", &commit_hash), // exists - ("refs/heads/dev", "0000000000000000000000000000000000000000"), // doesn't exist + ("refs/heads/main", &commit_hash), // exists + ("refs/heads/dev", "0000000000000000000000000000000000000000"), // doesn't exist ], ); @@ -687,8 +687,8 @@ mod tests { let event = create_test_state_event( "test-repo", vec![ - ("refs/heads/main", &commit_hash), // real OID that exists - ("refs/heads/alias", "ref: refs/heads/main"), // symbolic ref + ("refs/heads/main", &commit_hash), // real OID that exists + ("refs/heads/alias", "ref: refs/heads/main"), // symbolic ref ], ); diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index fe0a439..20df19b 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -1134,7 +1134,8 @@ fn test_cleanup_expired_events() { // Manually set event1's expiry time to be old if let Some(mut entry) = purgatory.expired_events.get_mut(&event1_id) { - *entry.value_mut() = Instant::now() - Duration::from_secs(8 * 24 * 3600); // 8 days ago + *entry.value_mut() = Instant::now() - Duration::from_secs(8 * 24 * 3600); + // 8 days ago } // Clean up expired events older than 7 days diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 2922f10..9e195c7 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -119,12 +119,8 @@ pub trait SyncContext: Send + Sync { /// /// # Returns /// List of OIDs that were successfully fetched - async fn fetch_oids( - &self, - repo_path: &Path, - url: &str, - oids: &[String], - ) -> Result>; + async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) + -> Result>; /// Process newly available git data. /// @@ -368,10 +364,7 @@ impl SyncContext for RealSyncContext { .cloned() .collect(); - debug!( - fetched_count = fetched.len(), - "Successfully fetched OIDs" - ); + debug!(fetched_count = fetched.len(), "Successfully fetched OIDs"); fetched } @@ -702,11 +695,7 @@ pub mod mock { } // Get OIDs this URL can provide - let provides = self - .url_provides_oids - .get(url) - .cloned() - .unwrap_or_default(); + let provides = self.url_provides_oids.get(url).cloned().unwrap_or_default(); // Find which requested OIDs this URL can provide let fetched: Vec = oids diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index bb7c0b9..370990e 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs @@ -32,15 +32,17 @@ use super::throttle::ThrottleManager; fn extract_domain(url: &str) -> Option { // Simple URL parsing for HTTP(S) URLs // Format: scheme://[user@]host[:port]/path - let url = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://"))?; - + let url = url + .strip_prefix("https://") + .or_else(|| url.strip_prefix("http://"))?; + // Remove user info if present (e.g., "user@host" -> "host") let url = url.split('@').next_back()?; - + // Extract host (before first '/' or ':') let host = url.split('/').next()?; let host = host.split(':').next()?; - + if host.is_empty() { None } else { @@ -112,17 +114,17 @@ pub async fn sync_identifier_next_url( // 4. Collect clone URLs from announcements AND PR events in purgatory let our_domain = ctx.our_domain(); - + // Get clone URLs from repository announcements let announcement_urls: HashSet = repo_data .announcements .iter() .flat_map(|a| a.clone_urls.iter().cloned()) .collect(); - + // Get clone URLs from PR events in purgatory let pr_urls = ctx.collect_pr_clone_urls(identifier); - + // Merge and filter out our domain let all_urls: HashSet = announcement_urls .union(&pr_urls) @@ -151,11 +153,9 @@ pub async fn sync_identifier_next_url( match domain { Some(specific_domain) => { // Only look at URLs from this specific domain - urls_by_domain.get(specific_domain).and_then(|urls| { - urls.iter() - .find(|url| !tried_urls.contains(*url)) - .cloned() - }) + urls_by_domain + .get(specific_domain) + .and_then(|urls| urls.iter().find(|url| !tried_urls.contains(*url)).cloned()) } None => { // Try any non-throttled domain @@ -217,17 +217,17 @@ pub async fn get_throttled_domains_with_untried_urls( }; let our_domain = ctx.our_domain(); - + // Get clone URLs from repository announcements let announcement_urls: HashSet = repo_data .announcements .iter() .flat_map(|a| a.clone_urls.iter().cloned()) .collect(); - + // Get clone URLs from PR events in purgatory let pr_urls = ctx.collect_pr_clone_urls(identifier); - + // Merge and filter out our domain let all_urls: HashSet = announcement_urls .union(&pr_urls) @@ -766,9 +766,13 @@ mod tests { let mut tried_urls = HashSet::new(); tried_urls.insert("https://github.com/foo/bar.git".to_string()); - let throttled = - get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) - .await; + let throttled = get_throttled_domains_with_untried_urls( + &mock, + "test-repo", + &tried_urls, + &throttle_manager, + ) + .await; // Should only include gitlab.com (throttled with untried URLs) // github.com is throttled but URL was tried @@ -885,11 +889,10 @@ mod tests { #[tokio::test] async fn test_collect_pr_clone_urls_returns_configured_urls() { // Test that MockSyncContext returns configured PR clone URLs - let mock = MockSyncContext::new() - .with_pr_clone_urls(&[ - "https://pr-server.com/fork.git", - "https://another-server.com/fork.git", - ]); + let mock = MockSyncContext::new().with_pr_clone_urls(&[ + "https://pr-server.com/fork.git", + "https://another-server.com/fork.git", + ]); let pr_urls = mock.collect_pr_clone_urls("test-repo"); @@ -945,7 +948,7 @@ mod tests { .with_urls(&["https://github.com/owner/repo.git"]) .with_pr_clone_urls(&[ "https://our-relay.com/fork.git", // Should be filtered - "https://external.com/fork.git", // Should be included + "https://external.com/fork.git", // Should be included ]) .with_our_domain("our-relay.com") .with_needed_oids(&["abc123"]) @@ -957,8 +960,7 @@ mod tests { // Collect all available URLs let mut available_urls = Vec::new(); while let Some(url) = - sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager) - .await + sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager).await { available_urls.push(url.clone()); tried_urls.insert(url); @@ -1006,16 +1008,17 @@ mod tests { let tried_urls = HashSet::new(); - let throttled = - get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) - .await; + let throttled = get_throttled_domains_with_untried_urls( + &mock, + "test-repo", + &tried_urls, + &throttle_manager, + ) + .await; // Should include both throttled domains let domains: Vec<&str> = throttled.iter().map(|t| t.domain.as_str()).collect(); - assert!( - domains.contains(&"github.com"), - "Should include github.com" - ); + assert!(domains.contains(&"github.com"), "Should include github.com"); assert!( domains.contains(&"pr-server.com"), "Should include pr-server.com from PR clone URLs" diff --git a/src/purgatory/sync/loop.rs b/src/purgatory/sync/loop.rs index ebca766..92e0594 100644 --- a/src/purgatory/sync/loop.rs +++ b/src/purgatory/sync/loop.rs @@ -62,7 +62,10 @@ impl Purgatory { ctx: Arc, throttle_manager: Arc, ) -> JoinHandle<()> { - info!("Starting purgatory sync loop (interval: {:?})", SYNC_LOOP_INTERVAL); + info!( + "Starting purgatory sync loop (interval: {:?})", + SYNC_LOOP_INTERVAL + ); tokio::spawn(async move { let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL); diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index e6efe1f..ad6e8ea 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs @@ -316,15 +316,13 @@ impl ThrottleManager { } // Create new throttle - self.throttles - .entry(domain.to_string()) - .or_insert_with(|| { - Mutex::new(DomainThrottle::new( - domain.to_string(), - self.max_concurrent_per_domain, - self.max_per_minute_per_domain, - )) - }); + self.throttles.entry(domain.to_string()).or_insert_with(|| { + Mutex::new(DomainThrottle::new( + domain.to_string(), + self.max_concurrent_per_domain, + self.max_per_minute_per_domain, + )) + }); // Return the entry (we know it exists now) self.throttles.get(domain).unwrap() @@ -438,7 +436,9 @@ impl ThrottleManager { let domain = domain.to_string(); tokio::spawn(async move { - manager.process_queued_identifier(&domain, &identifier).await; + manager + .process_queued_identifier(&domain, &identifier) + .await; }); } } @@ -480,14 +480,9 @@ impl ThrottleManager { }; // Get next URL for this identifier on this specific domain - let url = sync_identifier_next_url( - ctx.as_ref(), - identifier, - Some(domain), - &tried_urls, - self, - ) - .await; + let url = + sync_identifier_next_url(ctx.as_ref(), identifier, Some(domain), &tried_urls, self) + .await; match url { Some(url) => { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 7d60ea4..fa44ab1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1069,7 +1069,9 @@ impl SyncManager { } // PR events (kind 1617/1618) - extract identifier from 'a' tag else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 { - if let Some(identifier) = crate::git::sync::extract_identifier_from_pr_event(&event) { + if let Some(identifier) = + crate::git::sync::extract_identifier_from_pr_event(&event) + { tracing::debug!( event_id = %event.id, identifier = %identifier, -- cgit v1.2.3