From 9d86cf15f0275ffeee4519bd054e3b61dc8992ac Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 26 Feb 2026 15:42:09 +0000 Subject: chore: apply cargo fmt and fix clippy warnings Fix pre-existing clippy lints: - &PathBuf -> &Path in audit_cleanup.rs - too_many_arguments on process_newly_available_git_data, process_purgatory_announcements, and HttpService::new - clone_on_copy for PublicKey (Copy type) in purgatory cleanup loop --- grasp-audit/src/bin/grasp-audit.rs | 13 +- grasp-audit/src/fixtures.rs | 16 ++- grasp-audit/src/probe.rs | 190 ++++++++++++++++++----------- grasp-audit/src/specs/grasp01/purgatory.rs | 4 +- src/audit_cleanup.rs | 24 ++-- src/git/handlers.rs | 66 ++++------ src/git/sync.rs | 2 + src/http/mod.rs | 1 + src/nostr/builder.rs | 1 - src/nostr/policy/announcement.rs | 18 ++- src/nostr/policy/deletion.rs | 45 ++++--- src/nostr/policy/pr_event.rs | 10 +- src/nostr/policy/state.rs | 6 +- src/purgatory/mod.rs | 51 ++++++-- src/sync/mod.rs | 3 +- tests/common/sync_helpers.rs | 7 +- tests/purgatory_persistence.rs | 10 +- tests/purgatory_sync.rs | 15 +-- tests/sync/discovery.rs | 6 +- tests/sync/maintainer_reprocessing.rs | 25 ++-- tests/sync/metrics.rs | 19 ++- 21 files changed, 326 insertions(+), 206 deletions(-) diff --git a/grasp-audit/src/bin/grasp-audit.rs b/grasp-audit/src/bin/grasp-audit.rs index 305e5eb..ab835e7 100644 --- a/grasp-audit/src/bin/grasp-audit.rs +++ b/grasp-audit/src/bin/grasp-audit.rs @@ -132,7 +132,11 @@ async fn main() -> Result<()> { println!("\n[Run {}]", run); } let report = grasp_audit::probe::run_probe( - &relay, keys.clone(), read_only, timeout, overall_secs, + &relay, + keys.clone(), + read_only, + timeout, + overall_secs, ) .await; if json { @@ -144,10 +148,9 @@ async fn main() -> Result<()> { tokio::time::sleep(Duration::from_secs(interval)).await; } } else { - let report = grasp_audit::probe::run_probe( - &relay, keys, read_only, timeout, overall_secs, - ) - .await; + let report = + grasp_audit::probe::run_probe(&relay, keys, read_only, timeout, overall_secs) + .await; if json { report.print_json(); } else { diff --git a/grasp-audit/src/fixtures.rs b/grasp-audit/src/fixtures.rs index 4678790..d09c36b 100644 --- a/grasp-audit/src/fixtures.rs +++ b/grasp-audit/src/fixtures.rs @@ -967,7 +967,9 @@ impl<'a> TestContext<'a> { FixtureKind::PREvent2Served => self.build_pr_event_2_served().await, FixtureKind::PurgatoryValidRepoSent => self.build_purgatory_valid_repo_sent().await, - FixtureKind::PurgatoryOwnerStateDataPushed => self.build_purgatory_owner_state_data_pushed().await, + FixtureKind::PurgatoryOwnerStateDataPushed => { + self.build_purgatory_owner_state_data_pushed().await + } FixtureKind::OwnerStateDataPushed => self.build_owner_state_data_pushed().await, @@ -1147,7 +1149,10 @@ impl<'a> TestContext<'a> { Ok(h) => h, Err(e) => { cleanup(&clone_path); - return Err(anyhow::anyhow!("Failed to create deterministic commit: {}", e)); + return Err(anyhow::anyhow!( + "Failed to create deterministic commit: {}", + e + )); } }; @@ -1186,7 +1191,12 @@ impl<'a> TestContext<'a> { DETERMINISTIC_COMMIT_HASH )); } - Err(e) => return Err(anyhow::anyhow!("PurgatoryOwnerStateDataPushed push error: {}", e)), + Err(e) => { + return Err(anyhow::anyhow!( + "PurgatoryOwnerStateDataPushed push error: {}", + e + )) + } } // ============================================================ diff --git a/grasp-audit/src/probe.rs b/grasp-audit/src/probe.rs index ecbbcc9..8dad1d4 100644 --- a/grasp-audit/src/probe.rs +++ b/grasp-audit/src/probe.rs @@ -167,10 +167,7 @@ fn now_iso8601() -> String { let mo = if mp < 10 { mp + 3 } else { mp - 9 }; // month [1, 12] let yr = if mo <= 2 { y + 1 } else { y }; - format!( - "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z", - yr, mo, d, h, m, s - ) + format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z", yr, mo, d, h, m, s) } // ============================================================ @@ -246,8 +243,7 @@ pub async fn run_probe( error: Some(error_msg), }); // Skip all subsequent checks - let already: std::collections::HashSet<&str> = - $checks.iter().map(|c| c.name).collect(); + let already: std::collections::HashSet<&str> = $checks.iter().map(|c| c.name).collect(); for name in ALL_CHECK_NAMES { if !already.contains(name) { $checks.push(skipped(name, "overall timeout")); @@ -303,8 +299,8 @@ pub async fn run_probe( let clone_url = format!("{}/{}/{}.git", http_base, npub, repo_id); // Create temp dir for local repo - let local_repo_path = std::env::temp_dir() - .join(format!("grasp-probe-{}", uuid::Uuid::new_v4())); + let local_repo_path = + std::env::temp_dir().join(format!("grasp-probe-{}", uuid::Uuid::new_v4())); // Initialise local repo (offline) let init_result = init_local_repo(&local_repo_path, &clone_url); @@ -368,7 +364,14 @@ pub async fn run_probe( // Step 1: connect_websocket // ============================================================ if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "connect_websocket"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "connect_websocket" + ); } let step1_start = Instant::now(); let client_result = tokio::time::timeout( @@ -426,12 +429,21 @@ pub async fn run_probe( // ============================================================ { if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "nip11_fetch"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "nip11_fetch" + ); } let step2_start = Instant::now(); let http_client = reqwest::Client::new(); let nip11_result = tokio::time::timeout( - deadline.saturating_duration_since(Instant::now()).min(Duration::from_secs(timeout_secs)), + deadline + .saturating_duration_since(Instant::now()) + .min(Duration::from_secs(timeout_secs)), http_client .get(&http_base) .header("Accept", "application/nostr+json") @@ -443,24 +455,20 @@ pub async fn run_probe( match nip11_result { Ok(Ok(resp)) if resp.status().is_success() => { - let detail = resp - .json::() - .await - .ok() - .map(|v| { - let name = v.get("name").and_then(|n| n.as_str()).unwrap_or("unknown"); - // software is typically a repo URL; take the last path segment - let software = v - .get("software") - .and_then(|s| s.as_str()) - .map(|s| s.trim_end_matches('/').rsplit('/').next().unwrap_or(s)) - .unwrap_or("unknown"); - let version = v - .get("version") - .and_then(|ver| ver.as_str()) - .unwrap_or("unknown"); - format!("{} ({} v{})", name, software, version) - }); + let detail = resp.json::().await.ok().map(|v| { + let name = v.get("name").and_then(|n| n.as_str()).unwrap_or("unknown"); + // software is typically a repo URL; take the last path segment + let software = v + .get("software") + .and_then(|s| s.as_str()) + .map(|s| s.trim_end_matches('/').rsplit('/').next().unwrap_or(s)) + .unwrap_or("unknown"); + let version = v + .get("version") + .and_then(|ver| ver.as_str()) + .unwrap_or("unknown"); + format!("{} ({} v{})", name, software, version) + }); checks.push(ProbeCheck { name: "nip11_fetch", passed: true, @@ -509,7 +517,14 @@ pub async fn run_probe( let mut write_succeeded = false; if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "publish_events"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "publish_events" + ); } if read_only { @@ -558,10 +573,7 @@ pub async fn run_probe( error: Some(e.to_string()), }); // Skip steps 4 and 5; step 6 will use fallback - checks.push(skipped( - "git_repo_initialised", - "publish_events failed", - )); + checks.push(skipped("git_repo_initialised", "publish_events failed")); checks.push(skipped("git_push", "publish_events failed")); } } @@ -571,7 +583,14 @@ pub async fn run_probe( // ============================================================ if write_succeeded { if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "git_repo_initialised"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "git_repo_initialised" + ); } let step4_start = Instant::now(); let poll_url = format!("{}/info/refs?service=git-upload-pack", clone_url); @@ -624,7 +643,14 @@ pub async fn run_probe( // ============================================================ if write_succeeded { if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "git_push"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "git_push" + ); } let step5_start = Instant::now(); let push_result = try_push(&local_repo_path); @@ -690,8 +716,14 @@ pub async fn run_probe( continue; } let mut parts = content.splitn(2, ' '); - let hash = match parts.next() { Some(h) if h.len() == 40 => h, _ => continue }; - let refname = match parts.next() { Some(r) => r.trim(), None => continue }; + let hash = match parts.next() { + Some(h) if h.len() == 40 => h, + _ => continue, + }; + let refname = match parts.next() { + Some(r) => r.trim(), + None => continue, + }; // Skip refs/nostr/* — only branches (refs/heads/*) and tags (refs/tags/*) if refname.starts_with("refs/nostr/") { continue; @@ -705,14 +737,23 @@ pub async fn run_probe( // ---- Write path ---- // Step 6a: git_fetch_refs — just verify the endpoint returns 200 if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "git_fetch_refs"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "git_fetch_refs" + ); } let refs_url = format!("{}/info/refs?service=git-upload-pack", clone_url); let http_client = reqwest::Client::new(); let step6_start = Instant::now(); let refs_result = tokio::time::timeout( - deadline.saturating_duration_since(Instant::now()).min(Duration::from_secs(timeout_secs)), + deadline + .saturating_duration_since(Instant::now()) + .min(Duration::from_secs(timeout_secs)), http_client.get(&refs_url).send(), ) .await; @@ -833,14 +874,23 @@ pub async fn run_probe( // In read-only mode: first check that at least one announcement exists if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "serves_latest_announcement"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "serves_latest_announcement" + ); } let filter = Filter::new().kind(Kind::GitRepoAnnouncement).limit(1); let existing = client .client() .fetch_events( filter, - deadline.saturating_duration_since(Instant::now()).min(Duration::from_secs(5)), + deadline + .saturating_duration_since(Instant::now()) + .min(Duration::from_secs(5)), ) .await .unwrap_or_default(); @@ -909,18 +959,25 @@ pub async fn run_probe( .find(|t| t.kind() == TagKind::custom("clone")) .and_then(|t| t.content()) .map(|s| s.to_string()) - .unwrap_or_else(|| { - format!("{}/{}/{}.git", http_base, ann_npub, ann_id) - }); + .unwrap_or_else(|| format!("{}/{}/{}.git", http_base, ann_npub, ann_id)); if Instant::now() >= deadline { - deadline_return!(relay_url, timestamp, total_start, overall_secs, checks, "git_fetch_refs"); + deadline_return!( + relay_url, + timestamp, + total_start, + overall_secs, + checks, + "git_fetch_refs" + ); } let step6_start = Instant::now(); let refs_url = format!("{}/info/refs?service=git-upload-pack", fetch_url); let http_client = reqwest::Client::new(); let refs_result = tokio::time::timeout( - deadline.saturating_duration_since(Instant::now()).min(Duration::from_secs(timeout_secs)), + deadline + .saturating_duration_since(Instant::now()) + .min(Duration::from_secs(timeout_secs)), http_client.get(&refs_url).send(), ) .await; @@ -935,7 +992,7 @@ pub async fn run_probe( passed: true, skipped: false, duration_ms: step6_ms, - detail: None, + detail: None, error: None, }); Some(body) @@ -946,7 +1003,7 @@ pub async fn run_probe( passed: false, skipped: false, duration_ms: step6_ms, - detail: None, + detail: None, error: Some(format!("HTTP {}", resp.status())), }); None @@ -957,7 +1014,7 @@ pub async fn run_probe( passed: false, skipped: false, duration_ms: step6_ms, - detail: None, + detail: None, error: Some(e.to_string()), }); None @@ -968,7 +1025,7 @@ pub async fn run_probe( passed: false, skipped: false, duration_ms: step6_ms, - detail: None, + detail: None, error: Some("timeout".to_string()), }); None @@ -981,10 +1038,7 @@ pub async fn run_probe( // including recursive maintainer chains), then compare against git refs. match refs_body_fallback { None => { - checks.push(skipped( - "git_refs_match_state", - "git_fetch_refs failed", - )); + checks.push(skipped("git_refs_match_state", "git_fetch_refs failed")); } Some(body) => { let fetched_refs = parse_refs(&body); @@ -992,19 +1046,19 @@ pub async fn run_probe( // Fetch all state events for this repo_id from the relay. // The relay only serves authorized state events (owner + full // recursive maintainer chain already resolved by the relay). - let state_filter = Filter::new() - .kind(Kind::RepoState) - .custom_tag( - nostr_sdk::prelude::SingleLetterTag::lowercase( - nostr_sdk::prelude::Alphabet::D, - ), - ann_id.clone(), - ); + let state_filter = Filter::new().kind(Kind::RepoState).custom_tag( + nostr_sdk::prelude::SingleLetterTag::lowercase( + nostr_sdk::prelude::Alphabet::D, + ), + ann_id.clone(), + ); let state_events = client .client() .fetch_events( state_filter, - deadline.saturating_duration_since(Instant::now()).min(Duration::from_secs(5)), + deadline + .saturating_duration_since(Instant::now()) + .min(Duration::from_secs(5)), ) .await .unwrap_or_default(); @@ -1046,7 +1100,8 @@ pub async fn run_probe( Some(h) => h.to_string(), None => continue, }; - let prev_ts = latest_ts.get(kind_str.as_ref()).copied().unwrap_or(0); + let prev_ts = + latest_ts.get(kind_str.as_ref()).copied().unwrap_or(0); if ts >= prev_ts { expected.insert(kind_str.to_string(), hash); latest_ts.insert(kind_str.to_string(), ts); @@ -1103,10 +1158,7 @@ pub async fn run_probe( detail: None, error: Some("no repositories found on relay".to_string()), }); - checks.push(skipped( - "git_refs_match_state", - "no announcement found", - )); + checks.push(skipped("git_refs_match_state", "no announcement found")); } } } diff --git a/grasp-audit/src/specs/grasp01/purgatory.rs b/grasp-audit/src/specs/grasp01/purgatory.rs index 0686da8..fdc1e32 100644 --- a/grasp-audit/src/specs/grasp01/purgatory.rs +++ b/grasp-audit/src/specs/grasp01/purgatory.rs @@ -54,9 +54,7 @@ impl PurgatoryTests { // Deletion event tests (NIP-09) results.add(Self::test_deletion_by_event_id_removes_purgatory_state_event(client).await); - results.add( - Self::test_deletion_by_coordinate_removes_purgatory_state_event(client).await, - ); + results.add(Self::test_deletion_by_coordinate_removes_purgatory_state_event(client).await); // PR purgatory tests results.add(Self::test_pr_event_accepted_into_purgatory_and_isnt_served(client).await); diff --git a/src/audit_cleanup.rs b/src/audit_cleanup.rs index b976b1f..de78b1b 100644 --- a/src/audit_cleanup.rs +++ b/src/audit_cleanup.rs @@ -15,7 +15,7 @@ //! //! Runs every `AUDIT_CLEANUP_INTERVAL_SECS` seconds. -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::time::Duration; use nostr_sdk::prelude::*; @@ -46,8 +46,12 @@ pub async fn run_audit_cleanup_loop(database: SharedDatabase, git_data_path: Pat } /// Perform a single cleanup pass. -async fn run_audit_cleanup_once(database: &SharedDatabase, git_data_path: &PathBuf) { - let cutoff = Timestamp::from(Timestamp::now().as_secs().saturating_sub(AUDIT_CLEANUP_AGE_SECS)); +async fn run_audit_cleanup_once(database: &SharedDatabase, git_data_path: &Path) { + let cutoff = Timestamp::from( + Timestamp::now() + .as_secs() + .saturating_sub(AUDIT_CLEANUP_AGE_SECS), + ); // --- Step 1: Find repo announcements to delete git repos for --- let repo_filter = Filter::new() @@ -73,10 +77,7 @@ async fn run_audit_cleanup_once(database: &SharedDatabase, git_data_path: &PathB if repo_path.exists() { match std::fs::remove_dir_all(&repo_path) { Ok(()) => { - debug!( - "audit_cleanup: deleted git repo {}", - repo_path.display() - ); + debug!("audit_cleanup: deleted git repo {}", repo_path.display()); repos_deleted += 1; // Remove the parent npub directory if it is now empty @@ -131,9 +132,7 @@ async fn run_audit_cleanup_once(database: &SharedDatabase, git_data_path: &PathB } // --- Step 2: Delete all audit events from the database --- - let all_audit_filter = Filter::new() - .hashtag(AUDIT_TEST_EVENT_TAG) - .until(cutoff); + let all_audit_filter = Filter::new().hashtag(AUDIT_TEST_EVENT_TAG).until(cutoff); match database.delete(all_audit_filter).await { Ok(()) => { @@ -143,7 +142,10 @@ async fn run_audit_cleanup_once(database: &SharedDatabase, git_data_path: &PathB ); } Err(e) => { - error!("audit_cleanup: failed to delete audit events from database: {}", e); + error!( + "audit_cleanup: failed to delete audit events from database: {}", + e + ); } } } diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 5ff3a7f..b615251 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -154,13 +154,10 @@ pub async fn handle_upload_pack( // Write request to git's stdin if let Some(mut stdin) = git.take_stdin() { - stdin - .write_all(&request_body) - .await - .map_err(|e| { - error!("Failed to write to git upload-pack stdin: {}", e); - GitError::IoError(e) - })?; + stdin.write_all(&request_body).await.map_err(|e| { + error!("Failed to write to git upload-pack stdin: {}", e); + GitError::IoError(e) + })?; // Close stdin to signal end of input drop(stdin); } @@ -171,24 +168,18 @@ pub async fn handle_upload_pack( if let Some(stdout) = git.take_stdout() { let mut stdout = stdout; - stdout - .read_to_end(&mut output) - .await - .map_err(|e| { - error!("Failed to read git upload-pack stdout: {}", e); - GitError::IoError(e) - })?; + stdout.read_to_end(&mut output).await.map_err(|e| { + error!("Failed to read git upload-pack stdout: {}", e); + GitError::IoError(e) + })?; } if let Some(stderr) = git.take_stderr() { let mut stderr = stderr; - stderr - .read_to_end(&mut stderr_output) - .await - .map_err(|e| { - error!("Failed to read git upload-pack stderr: {}", e); - GitError::IoError(e) - })?; + stderr.read_to_end(&mut stderr_output).await.map_err(|e| { + error!("Failed to read git upload-pack stderr: {}", e); + GitError::IoError(e) + })?; } // Wait for process @@ -317,13 +308,10 @@ pub async fn handle_receive_pack( // Write request to git's stdin if let Some(mut stdin) = git.take_stdin() { - stdin - .write_all(&request_body) - .await - .map_err(|e| { - error!("Failed to write to git receive-pack stdin: {}", e); - GitError::IoError(e) - })?; + stdin.write_all(&request_body).await.map_err(|e| { + error!("Failed to write to git receive-pack stdin: {}", e); + GitError::IoError(e) + })?; drop(stdin); } @@ -333,24 +321,18 @@ pub async fn handle_receive_pack( if let Some(stdout) = git.take_stdout() { let mut stdout = stdout; - stdout - .read_to_end(&mut output) - .await - .map_err(|e| { - error!("Failed to read git receive-pack stdout: {}", e); - GitError::IoError(e) - })?; + stdout.read_to_end(&mut output).await.map_err(|e| { + error!("Failed to read git receive-pack stdout: {}", e); + GitError::IoError(e) + })?; } if let Some(stderr) = git.take_stderr() { let mut stderr = stderr; - stderr - .read_to_end(&mut stderr_output) - .await - .map_err(|e| { - error!("Failed to read git receive-pack stderr: {}", e); - GitError::IoError(e) - })?; + stderr.read_to_end(&mut stderr_output).await.map_err(|e| { + error!("Failed to read git receive-pack stderr: {}", e); + GitError::IoError(e) + })?; } // Wait for process diff --git a/src/git/sync.rs b/src/git/sync.rs index 9a02ad4..05dcbda 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -814,6 +814,7 @@ pub fn extract_identifier_from_pr_event(event: &Event) -> Option { /// /// # Returns /// A `ProcessResult` describing what was processed +#[allow(clippy::too_many_arguments)] pub async fn process_newly_available_git_data( source_repo_path: &Path, new_oids: &HashSet, @@ -1339,6 +1340,7 @@ async fn process_purgatory_pr_events( /// When `write_policy` and `rejected_events_index` are provided (git push path), /// any maintainer announcements sitting in the hot cache are re-processed immediately /// after the owner announcement is promoted, so they don't wait for the next sync cycle. +#[allow(clippy::too_many_arguments)] async fn process_purgatory_announcements( identifier: &str, source_repo_path: &Path, diff --git a/src/http/mod.rs b/src/http/mod.rs index 76ffef3..c397365 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -105,6 +105,7 @@ struct HttpService { } impl HttpService { + #[allow(clippy::too_many_arguments)] fn new( relay: LocalRelay, config: Config, diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index a0088e1..03132bf 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -18,7 +18,6 @@ use crate::nostr::policy::{ ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; - /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index b366f0b..aba5181 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -70,7 +70,10 @@ impl AnnouncementPolicy { .is_some_and(|entry| event.created_at > entry.event.created_at); if should_evict { - self.remove_purgatory_announcement(&event.pubkey, &announcement.identifier); + self.remove_purgatory_announcement( + &event.pubkey, + &announcement.identifier, + ); } match self @@ -145,10 +148,9 @@ impl AnnouncementPolicy { ); AnnouncementResult::AcceptPurgatory } - Err(e) => AnnouncementResult::Reject(format!( - "Failed to parse announcement: {}", - e - )), + Err(e) => { + AnnouncementResult::Reject(format!("Failed to parse announcement: {}", e)) + } } } // AcceptPurgatory shouldn't come from validate_announcement, but handle it @@ -161,11 +163,7 @@ impl AnnouncementPolicy { /// Called when a replacement announcement arrives for a (pubkey, identifier) pair /// that is currently in purgatory. Updates the purgatory entry and extends the /// expiry so the new announcement has a fresh waiting window. - fn replace_purgatory_announcement( - &self, - event: &Event, - announcement: &RepositoryAnnouncement, - ) { + fn replace_purgatory_announcement(&self, event: &Event, announcement: &RepositoryAnnouncement) { let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); let relays: HashSet = announcement.relays.iter().cloned().collect(); diff --git a/src/nostr/policy/deletion.rs b/src/nostr/policy/deletion.rs index 6457c90..c5a52d4 100644 --- a/src/nostr/policy/deletion.rs +++ b/src/nostr/policy/deletion.rs @@ -155,7 +155,9 @@ impl DeletionPolicy { author = %author.to_hex(), "Deletion request: removing purgatory state event by event ID" ); - self.ctx.purgatory.remove_state_event(&identifier, &entry.event.id); + self.ctx + .purgatory + .remove_state_event(&identifier, &entry.event.id); return; // event IDs are unique } } @@ -223,7 +225,9 @@ impl DeletionPolicy { if entry.author == *author && entry.event.created_at.as_secs() <= deletion_created_at { - self.ctx.purgatory.remove_state_event(identifier, &entry.event.id); + self.ctx + .purgatory + .remove_state_event(identifier, &entry.event.id); removed += 1; } } @@ -306,7 +310,10 @@ mod tests { EventBuilder::new(Kind::GitRepoAnnouncement, "") .tags(vec![ Tag::identifier(identifier), - Tag::custom(TagKind::custom("clone"), vec!["https://example.com/repo.git"]), + Tag::custom( + TagKind::custom("clone"), + vec!["https://example.com/repo.git"], + ), ]) .sign_with_keys(keys) .unwrap() @@ -331,7 +338,9 @@ mod tests { let announcement = make_announcement_event(&keys, identifier); add_to_purgatory(&ctx, &announcement, identifier); - assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier)); + assert!(ctx + .purgatory + .has_purgatory_announcement(&keys.public_key(), identifier)); // Build kind 5 deletion event referencing the announcement by event ID let deletion = EventBuilder::new(Kind::EventDeletion, "") @@ -347,7 +356,8 @@ mod tests { assert!(matches!(result, WritePolicyResult::Accept)); assert!( - !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), + !ctx.purgatory + .has_purgatory_announcement(&keys.public_key(), identifier), "Purgatory entry should have been removed" ); } @@ -361,7 +371,9 @@ mod tests { let announcement = make_announcement_event(&keys, identifier); add_to_purgatory(&ctx, &announcement, identifier); - assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier)); + assert!(ctx + .purgatory + .has_purgatory_announcement(&keys.public_key(), identifier)); // Build kind 5 deletion event referencing the announcement by coordinate let coord = format!("30617:{}:{}", keys.public_key().to_hex(), identifier); @@ -378,7 +390,8 @@ mod tests { assert!(matches!(result, WritePolicyResult::Accept)); assert!( - !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), + !ctx.purgatory + .has_purgatory_announcement(&keys.public_key(), identifier), "Purgatory entry should have been removed" ); } @@ -407,7 +420,8 @@ mod tests { assert!(matches!(result, WritePolicyResult::Accept)); assert!( - ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier), + ctx.purgatory + .has_purgatory_announcement(&owner_keys.public_key(), identifier), "Purgatory entry should NOT have been removed by wrong author" ); } @@ -438,7 +452,8 @@ mod tests { assert!(matches!(result, WritePolicyResult::Accept)); assert!( - ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier), + ctx.purgatory + .has_purgatory_announcement(&owner_keys.public_key(), identifier), "Purgatory entry should NOT have been removed by wrong author" ); } @@ -450,11 +465,10 @@ mod tests { // No purgatory entry exists — deletion should still be accepted let deletion = EventBuilder::new(Kind::EventDeletion, "") - .tags(vec![ - Tag::custom(TagKind::custom("a"), vec![ - format!("30617:{}:nonexistent", keys.public_key().to_hex()) - ]), - ]) + .tags(vec![Tag::custom( + TagKind::custom("a"), + vec![format!("30617:{}:nonexistent", keys.public_key().to_hex())], + )]) .sign_with_keys(&keys) .unwrap(); @@ -491,7 +505,8 @@ mod tests { assert!(matches!(result, WritePolicyResult::Accept)); assert!( - ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), + ctx.purgatory + .has_purgatory_announcement(&keys.public_key(), identifier), "Purgatory entry should NOT be removed: entry is newer than deletion request" ); } diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index 52747a4..e4a64b8 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs @@ -7,7 +7,9 @@ use nostr_relay_builder::prelude::Event; use super::PolicyContext; use crate::git; -use crate::git::authorization::{collect_authorized_maintainers, fetch_repository_data_excluding_purgatory}; +use crate::git::authorization::{ + collect_authorized_maintainers, fetch_repository_data_excluding_purgatory, +}; /// Policy for validating PR and PR Update events #[derive(Clone)] @@ -131,7 +133,8 @@ impl PrEventPolicy { // only be accepted for announcements that have been promoted (validated). // If the announcement is still in purgatory, the PR event should also go // to purgatory and wait for the announcement to be promoted. - let db_repo_data = fetch_repository_data_excluding_purgatory(&self.ctx.database, &identifier).await?; + let db_repo_data = + fetch_repository_data_excluding_purgatory(&self.ctx.database, &identifier).await?; // Extract owner pubkey from source repo path let owner_pubkey = crate::git::sync::extract_owner_from_repo_path( @@ -211,7 +214,8 @@ impl PrEventPolicy { // only be accepted for announcements that have been promoted (validated). // If the announcement is still in purgatory, the PR event should also go // to purgatory and wait for the announcement to be promoted. - let db_repo_data = fetch_repository_data_excluding_purgatory(&self.ctx.database, identifier).await?; + let db_repo_data = + fetch_repository_data_excluding_purgatory(&self.ctx.database, identifier).await?; // 3. Extract list of maintainers from "a 30617::" tags let mut maintainer_pubkeys = std::collections::HashSet::new(); diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index df743ae..80fe84c 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -158,7 +158,11 @@ impl StatePolicy { // authorized it. for owner_hex in &authorized_owners { if let Ok(owner_pk) = nostr_sdk::PublicKey::from_hex(owner_hex) { - if self.ctx.purgatory.has_purgatory_announcement(&owner_pk, &state.identifier) { + if self + .ctx + .purgatory + .has_purgatory_announcement(&owner_pk, &state.identifier) + { self.ctx.purgatory.extend_announcement_expiry( &owner_pk, &state.identifier, diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index bb6ff54..9b370d2 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -16,8 +16,14 @@ pub mod persistence; pub mod sync; mod types; -pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs}; -pub use types::{AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; +pub use helpers::{ + can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, + get_unpushed_refs, +}; +pub use types::{ + AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, + StatePurgatoryEntry, +}; use dashmap::DashMap; use nostr_sdk::prelude::*; @@ -672,9 +678,15 @@ impl Purgatory { /// /// # Returns /// The announcement entry if found, None otherwise - pub fn find_announcement(&self, owner: &PublicKey, identifier: &str) -> Option { + pub fn find_announcement( + &self, + owner: &PublicKey, + identifier: &str, + ) -> Option { let key = (*owner, identifier.to_string()); - self.announcement_purgatory.get(&key).map(|entry| entry.clone()) + self.announcement_purgatory + .get(&key) + .map(|entry| entry.clone()) } /// Get all announcements in purgatory for a given identifier. @@ -687,7 +699,10 @@ impl Purgatory { /// /// # Returns /// Vector of announcement entries for this identifier - pub fn get_announcements_by_identifier(&self, identifier: &str) -> Vec { + pub fn get_announcements_by_identifier( + &self, + identifier: &str, + ) -> Vec { self.announcement_purgatory .iter() .filter(|entry| entry.key().1 == identifier) @@ -755,7 +770,12 @@ impl Purgatory { /// * `owner` - The owner pubkey /// * `identifier` - The repository identifier /// * `duration` - Minimum duration to guarantee from now - pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) { + pub fn extend_announcement_expiry( + &self, + owner: &PublicKey, + identifier: &str, + duration: Duration, + ) { let key = (*owner, identifier.to_string()); // Collect revival info before taking a mutable borrow @@ -977,16 +997,24 @@ impl Purgatory { .map(|entry| { let key = entry.key(); let v = entry.value(); - (key.0.clone(), key.1.clone(), v.repo_path.clone(), v.event.id, v.soft_expired) + ( + key.0, + key.1.clone(), + v.repo_path.clone(), + v.event.id, + v.soft_expired, + ) }) .collect(); let mut announcement_removed = 0; - for (owner, identifier, repo_path, event_id, already_soft_expired) in expired_announcements { + for (owner, identifier, repo_path, event_id, already_soft_expired) in expired_announcements + { if already_soft_expired { // Phase 2: fully remove self.mark_expired(event_id); - self.announcement_purgatory.remove(&(owner.clone(), identifier.clone())); + self.announcement_purgatory + .remove(&(owner, identifier.clone())); announcement_removed += 1; tracing::info!( owner = %owner, @@ -1026,7 +1054,10 @@ impl Purgatory { if repo_gone { // Mark soft_expired and extend expiry - if let Some(mut entry) = self.announcement_purgatory.get_mut(&(owner.clone(), identifier.clone())) { + if let Some(mut entry) = self + .announcement_purgatory + .get_mut(&(owner, identifier.clone())) + { entry.soft_expired = true; entry.expires_at = now + SOFT_EXPIRY_EXTENDED; } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index cd62380..36142e3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -2406,7 +2406,8 @@ impl SyncManager { } // Register any new entries in repo_sync_index as StateOnly - let mut new_relay_urls: std::collections::HashSet = std::collections::HashSet::new(); + let mut new_relay_urls: std::collections::HashSet = + std::collections::HashSet::new(); { let mut index = self.repo_sync_index.write().await; for (repo_id, relays) in &announcements { diff --git a/tests/common/sync_helpers.rs b/tests/common/sync_helpers.rs index af51e78..611b1a5 100644 --- a/tests/common/sync_helpers.rs +++ b/tests/common/sync_helpers.rs @@ -1240,8 +1240,11 @@ pub async fn push_unique_git_data_to_relay( git(path, &["config", "commit.gpgsign", "false"]); // Write a unique file so each maintainer gets a distinct commit hash - std::fs::write(path.join("state_test.txt"), "State test content for purgatory sync") - .expect("write state_test.txt"); + std::fs::write( + path.join("state_test.txt"), + "State test content for purgatory sync", + ) + .expect("write state_test.txt"); std::fs::write(path.join(".unique"), unique_seed).expect("write .unique"); git(path, &["add", "."]); git(path, &["commit", "-m", "State test commit"]); diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs index 655b0d9..0c1de28 100644 --- a/tests/purgatory_persistence.rs +++ b/tests/purgatory_persistence.rs @@ -169,7 +169,10 @@ async fn test_full_purgatory_save_restore_cycle() { // Verify all data was restored let (announcement_count2, state_count2, pr_count2) = purgatory2.count(); - assert_eq!(announcement_count2, 1, "Should have 1 announcement after restore"); + assert_eq!( + announcement_count2, 1, + "Should have 1 announcement after restore" + ); assert_eq!(state_count2, 2, "Should have 2 state events after restore"); assert_eq!( pr_count2, 3, @@ -853,7 +856,10 @@ async fn test_announcement_save_restore_cycle() { let purgatory2 = Purgatory::new(&git_data_path); purgatory2.restore_from_disk(&state_path).unwrap(); - assert!(!state_path.exists(), "State file should be deleted after restore"); + assert!( + !state_path.exists(), + "State file should be deleted after restore" + ); let (ann_count2, _, _) = purgatory2.count(); assert_eq!(ann_count2, 1, "Announcement should be restored"); diff --git a/tests/purgatory_sync.rs b/tests/purgatory_sync.rs index eefd6bc..ced39ff 100644 --- a/tests/purgatory_sync.rs +++ b/tests/purgatory_sync.rs @@ -711,15 +711,10 @@ async fn test_concurrent_state_and_pr_sync() { ); // Check refs/nostr/ points to pr_commit - let pr_ref_correct = check_ref_at_commit( - &syncing_domain, - &npub, - identifier, - &pr_ref_name, - &pr_commit, - ) - .await - .expect("Failed to check PR ref"); + let pr_ref_correct = + check_ref_at_commit(&syncing_domain, &npub, identifier, &pr_ref_name, &pr_commit) + .await + .expect("Failed to check PR ref"); assert!( pr_ref_correct, @@ -993,7 +988,7 @@ async fn test_pr_event_clone_tag_sync_with_partial_oid_aggregation_from_multiple let syncing_relay = TestRelay::start_on_port_with_options( syncing_port, Some(source_grasp.url().to_string()), // Bootstrap from source_grasp - true, // Disable negentropy - MockRelay doesn't support NIP-77 + true, // Disable negentropy - MockRelay doesn't support NIP-77 ) .await; diff --git a/tests/sync/discovery.rs b/tests/sync/discovery.rs index d45a290..ee675e6 100644 --- a/tests/sync/discovery.rs +++ b/tests/sync/discovery.rs @@ -183,7 +183,10 @@ async fn test_relay_discovery_via_announcements_with_historic_sync() { let (announcement, _git_dir_a) = setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; let announcement_id = announcement.id; - println!("Announcement {} set up on relay_a with git data (Layer 1)", announcement_id); + println!( + "Announcement {} set up on relay_a with git data (Layer 1)", + announcement_id + ); // Build repo coordinate for Layer 2 reference let repo_coord = repo_coord(&keys, repo_id); @@ -235,4 +238,3 @@ async fn test_relay_discovery_via_announcements_with_historic_sync() { issue_id ); } - diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index ff1eb43..9154ee5 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -70,17 +70,15 @@ async fn test_maintainer_announcement_reprocessed_immediately() { identifier )], ), - Tag::custom( - TagKind::custom("relays"), - vec![relay_a.url().to_string()], - ), + Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), ]) .sign_with_keys(&maintainer_keys) .unwrap(); - send_to_relay(&relay_a, &maintainer_announcement).await.unwrap(); + send_to_relay(&relay_a, &maintainer_announcement) + .await + .unwrap(); let _git_dir_maintainer = - push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()]) - .await; + push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()]).await; println!("✓ Maintainer announcement + git data pushed to relay_a"); // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. @@ -134,7 +132,9 @@ async fn test_maintainer_announcement_reprocessed_immediately() { // re-processing of the maintainer announcement via our new code path. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)"); + println!( + "✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)" + ); // Step 5: Wait briefly for async processing to complete. tokio::time::sleep(Duration::from_secs(1)).await; @@ -363,9 +363,12 @@ async fn test_multiple_maintainers_all_reprocessed() { .kind(Kind::GitRepoAnnouncement) .author(keys.public_key()) .identifier(identifier); - let found = - wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await; - assert!(found, "{} announcement should be in relay_a before starting relay_b", name); + let found = wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await; + assert!( + found, + "{} announcement should be in relay_a before starting relay_b", + name + ); } println!("✓ All three maintainer announcements confirmed in relay_a's DB"); diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs index e973bbb..996c8b7 100644 --- a/tests/sync/metrics.rs +++ b/tests/sync/metrics.rs @@ -417,9 +417,12 @@ async fn test_live_sync_event_count() { println!("Announcement set up on source relay with git data"); // Start syncing relay with pre-allocated port - let syncing_relay = - TestRelay::start_on_port_with_options(sync_port, Some(source_relay.url().to_string()), false) - .await; + let syncing_relay = TestRelay::start_on_port_with_options( + sync_port, + Some(source_relay.url().to_string()), + false, + ) + .await; println!("Syncing relay started at {}", syncing_relay.url()); // Wait for sync connection to be fully established with EOSE received @@ -458,8 +461,14 @@ async fn test_live_sync_event_count() { let client = TestClient::new(source_relay.url(), keys.clone()) .await .expect("Failed to connect to source"); - client.send_event(&patch1).await.expect("Failed to send patch 1"); - client.send_event(&patch2).await.expect("Failed to send patch 2"); + client + .send_event(&patch1) + .await + .expect("Failed to send patch 1"); + client + .send_event(&patch2) + .await + .expect("Failed to send patch 2"); client.disconnect().await; println!("Two patches sent to source relay (live mode)"); -- cgit v1.2.3