diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 23:31:38 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 23:31:38 +0000 |
| commit | c67ebe6f33bfa191f17eb0df24d3ee18092c74e1 (patch) | |
| tree | b86911bbb406f4aa0253b1cf1e0a82aed16c972b /src/nostr/policy | |
| parent | 4dc0ed66a0bd3b4b00804bb13adf93b207bb5fc4 (diff) | |
refactor: unify event processing logic
Eliminates code duplication by extracting core event processing into
reusable functions. All state and PR event processing now uses the same
unified logic from src/git/process.rs.
Changes:
- Add src/git/process.rs with unified processing functions
- process_state_with_git_data() for state events
- process_pr_with_git_data() for PR events
- Pure functions with comprehensive result types
- Refactor policy handlers to use unified processing
- src/nostr/policy/state.rs: Remove ~70 lines of duplicated logic
- src/nostr/policy/pr_event.rs: Remove ~40 lines of duplicated logic
- Refactor purgatory processing to use unified functions
- src/git/sync.rs: Remove ~125 lines of duplicated logic
- Make extract_owner_from_repo_path() public for reuse
Benefits:
- DRY: Single source of truth for event processing
- Testable: Pure functions with clear contracts
- Maintainable: Changes happen in one place
- Consistent: All code paths use same logic
All 217 unit tests + 40 integration tests pass (257/257).
Diffstat (limited to 'src/nostr/policy')
| -rw-r--r-- | src/nostr/policy/pr_event.rs | 132 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 194 |
2 files changed, 82 insertions, 244 deletions
diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index ff3bade..9942a6a 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs | |||
| @@ -27,7 +27,7 @@ impl PrEventPolicy { | |||
| 27 | /// 2. Commit existence in referenced repositories | 27 | /// 2. Commit existence in referenced repositories |
| 28 | /// 3. Deletion of incorrect refs/nostr/<event-id> refs | 28 | /// 3. Deletion of incorrect refs/nostr/<event-id> refs |
| 29 | /// 4. Deletion of incorrect placeholders | 29 | /// 4. Deletion of incorrect placeholders |
| 30 | /// 5. Copying git data to all referenced repositories when found | 30 | /// 5. Processing PR event with unified function |
| 31 | /// | 31 | /// |
| 32 | /// # Returns | 32 | /// # Returns |
| 33 | /// - `Ok(true)` if git data ready (commit exists and is synced to all repos) | 33 | /// - `Ok(true)` if git data ready (commit exists and is synced to all repos) |
| @@ -64,7 +64,6 @@ impl PrEventPolicy { | |||
| 64 | ); | 64 | ); |
| 65 | // Remove placeholder - event processing will continue normally | 65 | // Remove placeholder - event processing will continue normally |
| 66 | self.ctx.purgatory.remove_pr(&event_id); | 66 | self.ctx.purgatory.remove_pr(&event_id); |
| 67 | // Continue to validate and sync refs across all repos | ||
| 68 | } else { | 67 | } else { |
| 69 | // Placeholder has different commit - incoming event supersedes | 68 | // Placeholder has different commit - incoming event supersedes |
| 70 | tracing::info!( | 69 | tracing::info!( |
| @@ -75,8 +74,7 @@ impl PrEventPolicy { | |||
| 75 | ); | 74 | ); |
| 76 | // Remove incorrect placeholder | 75 | // Remove incorrect placeholder |
| 77 | self.ctx.purgatory.remove_pr(&event_id); | 76 | self.ctx.purgatory.remove_pr(&event_id); |
| 78 | // Delete incorrect git data (refs/nostr/<event-id>) from all repos | 77 | // Delete incorrect git data (refs/nostr/<event-id>) will be handled below |
| 79 | // This will be handled below when we validate refs | ||
| 80 | } | 78 | } |
| 81 | } | 79 | } |
| 82 | 80 | ||
| @@ -87,9 +85,8 @@ impl PrEventPolicy { | |||
| 87 | return Ok(false); | 85 | return Ok(false); |
| 88 | } | 86 | } |
| 89 | 87 | ||
| 90 | // delete incorrect refs/nostr/<event-id> | 88 | // Delete incorrect refs/nostr/<event-id> |
| 91 | for repo_path in &repo_paths { | 89 | for repo_path in &repo_paths { |
| 92 | // First, validate/delete any incorrect refs/nostr/<event-id> | ||
| 93 | match git::validate_nostr_ref(repo_path, &event_id, &commit) { | 90 | match git::validate_nostr_ref(repo_path, &event_id, &commit) { |
| 94 | Ok(true) => { | 91 | Ok(true) => { |
| 95 | tracing::info!( | 92 | tracing::info!( |
| @@ -110,10 +107,9 @@ impl PrEventPolicy { | |||
| 110 | } | 107 | } |
| 111 | } | 108 | } |
| 112 | 109 | ||
| 113 | // find location of correct git data (if exists) | 110 | // Find location of correct git data (if exists) |
| 114 | let mut source_repo: Option<std::path::PathBuf> = None; | 111 | let mut source_repo: Option<std::path::PathBuf> = None; |
| 115 | for repo_path in &repo_paths { | 112 | for repo_path in &repo_paths { |
| 116 | // Check if commit exists in this repository | ||
| 117 | if git::commit_exists(repo_path, &commit) { | 113 | if git::commit_exists(repo_path, &commit) { |
| 118 | source_repo = Some(repo_path.clone()); | 114 | source_repo = Some(repo_path.clone()); |
| 119 | tracing::debug!( | 115 | tracing::debug!( |
| @@ -125,59 +121,50 @@ impl PrEventPolicy { | |||
| 125 | } | 121 | } |
| 126 | } | 122 | } |
| 127 | 123 | ||
| 128 | // Copy commit to all other referenced repositories | ||
| 129 | if let Some(source_repo) = source_repo { | 124 | if let Some(source_repo) = source_repo { |
| 130 | for repo_path in &repo_paths { | 125 | // Extract identifier |
| 131 | if repo_path == &source_repo { | 126 | let identifier = crate::git::sync::extract_identifier_from_pr_event(event) |
| 132 | // Skip source repo | 127 | .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; |
| 133 | continue; | 128 | |
| 134 | } | 129 | // Fetch repository data |
| 130 | let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; | ||
| 131 | |||
| 132 | // Extract owner pubkey from source repo path | ||
| 133 | let owner_pubkey = crate::git::sync::extract_owner_from_repo_path( | ||
| 134 | &source_repo, | ||
| 135 | &self.ctx.git_data_path, | ||
| 136 | ) | ||
| 137 | .unwrap_or_default(); | ||
| 138 | |||
| 139 | // Use unified processing function | ||
| 140 | let result = crate::git::process::process_pr_with_git_data( | ||
| 141 | event, | ||
| 142 | &commit, | ||
| 143 | &source_repo, | ||
| 144 | &db_repo_data, | ||
| 145 | &self.ctx.git_data_path, | ||
| 146 | &owner_pubkey, | ||
| 147 | ); | ||
| 135 | 148 | ||
| 136 | // Check if repository exists | 149 | tracing::info!( |
| 137 | if !repo_path.exists() { | 150 | identifier = %identifier, |
| 138 | tracing::debug!( | 151 | event_id = %event_id, |
| 139 | "Repository {} does not exist, skipping sync", | 152 | repos_synced = result.repos_synced, |
| 140 | repo_path.display() | 153 | refs_created = result.refs_created, |
| 141 | ); | 154 | "Processed PR event with git data already available" |
| 142 | continue; | 155 | ); |
| 143 | } | ||
| 144 | 156 | ||
| 145 | // Check if commit already exists | 157 | if !result.errors.is_empty() { |
| 146 | if git::commit_exists(repo_path, &commit) { | 158 | for error in &result.errors { |
| 147 | tracing::debug!( | 159 | tracing::warn!( |
| 148 | "Commit {} already exists in {}, skipping sync", | 160 | identifier = %identifier, |
| 149 | commit, | 161 | event_id = %event_id, |
| 150 | repo_path.display() | 162 | error = %error, |
| 163 | "Error processing PR event" | ||
| 151 | ); | 164 | ); |
| 152 | continue; | ||
| 153 | } | ||
| 154 | |||
| 155 | // Fetch commit from source repo to target repo | ||
| 156 | tracing::info!( | ||
| 157 | "Syncing commit {} from {} to {}", | ||
| 158 | commit, | ||
| 159 | source_repo.display(), | ||
| 160 | repo_path.display() | ||
| 161 | ); | ||
| 162 | |||
| 163 | match self.copy_commit(&source_repo, repo_path, &commit).await { | ||
| 164 | Ok(()) => { | ||
| 165 | tracing::info!( | ||
| 166 | "Successfully synced commit {} to {}", | ||
| 167 | commit, | ||
| 168 | repo_path.display() | ||
| 169 | ); | ||
| 170 | } | ||
| 171 | Err(e) => { | ||
| 172 | tracing::warn!( | ||
| 173 | "Failed to sync commit {} to {}: {}", | ||
| 174 | commit, | ||
| 175 | repo_path.display(), | ||
| 176 | e | ||
| 177 | ); | ||
| 178 | } | ||
| 179 | } | 165 | } |
| 180 | } | 166 | } |
| 167 | |||
| 181 | Ok(true) | 168 | Ok(true) |
| 182 | } else { | 169 | } else { |
| 183 | tracing::debug!( | 170 | tracing::debug!( |
| @@ -250,40 +237,5 @@ impl PrEventPolicy { | |||
| 250 | 237 | ||
| 251 | Ok(repo_paths) | 238 | Ok(repo_paths) |
| 252 | } | 239 | } |
| 253 | /// Copy a commit from source repository to target repository | ||
| 254 | /// | ||
| 255 | /// Uses `git fetch` to copy a specific commit between local repositories. | ||
| 256 | /// | ||
| 257 | /// # Arguments | ||
| 258 | /// * `source_repo` - Path to repository containing the commit | ||
| 259 | /// * `target_repo` - Path to repository to receive the commit | ||
| 260 | /// * `commit` - Commit hash to copy | ||
| 261 | /// | ||
| 262 | /// # Returns | ||
| 263 | /// Ok(()) on success, Err with error message on failure | ||
| 264 | async fn copy_commit( | ||
| 265 | &self, | ||
| 266 | source_repo: &std::path::Path, | ||
| 267 | target_repo: &std::path::Path, | ||
| 268 | commit: &str, | ||
| 269 | ) -> Result<(), String> { | ||
| 270 | use std::process::Command; | ||
| 271 | |||
| 272 | let output = Command::new("git") | ||
| 273 | .args([ | ||
| 274 | "fetch", | ||
| 275 | source_repo.to_str().ok_or("Invalid source path")?, | ||
| 276 | commit, | ||
| 277 | ]) | ||
| 278 | .current_dir(target_repo) | ||
| 279 | .output() | ||
| 280 | .map_err(|e| format!("Failed to execute git fetch: {}", e))?; | ||
| 281 | 240 | ||
| 282 | if !output.status.success() { | ||
| 283 | let stderr = String::from_utf8_lossy(&output.stderr); | ||
| 284 | return Err(format!("git fetch failed: {}", stderr)); | ||
| 285 | } | ||
| 286 | |||
| 287 | Ok(()) | ||
| 288 | } | ||
| 289 | } | 241 | } |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 7d69d7d..68b1e97 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -9,9 +9,8 @@ use nostr_relay_builder::builder::WritePolicyResult; | |||
| 9 | use nostr_relay_builder::prelude::Event; | 9 | use nostr_relay_builder::prelude::Event; |
| 10 | 10 | ||
| 11 | use super::PolicyContext; | 11 | use super::PolicyContext; |
| 12 | use crate::git::authorization::{collect_authorized_maintainers, fetch_repository_data}; | 12 | use crate::git::authorization::fetch_repository_data; |
| 13 | use crate::git::sync::align_repository_with_state; | 13 | use crate::git; |
| 14 | use crate::git::{self}; | ||
| 15 | use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; | 14 | use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; |
| 16 | 15 | ||
| 17 | /// Result of state policy evaluation | 16 | /// Result of state policy evaluation |
| @@ -50,7 +49,7 @@ impl StatePolicy { | |||
| 50 | let state = | 49 | let state = |
| 51 | RepositoryState::from_event(event.clone()).context("Failed to parse state event")?; | 50 | RepositoryState::from_event(event.clone()).context("Failed to parse state event")?; |
| 52 | 51 | ||
| 53 | // duplicate check in purgatory | 52 | // Duplicate check in purgatory |
| 54 | if self | 53 | if self |
| 55 | .ctx | 54 | .ctx |
| 56 | .purgatory | 55 | .purgatory |
| @@ -63,95 +62,65 @@ impl StatePolicy { | |||
| 63 | event.id, | 62 | event.id, |
| 64 | ); | 63 | ); |
| 65 | return Ok(WritePolicyResult::Reject { | 64 | return Ok(WritePolicyResult::Reject { |
| 66 | status: true, // Client sees OK | 65 | status: true, |
| 67 | message: "duplicate: in purgatory".into(), | 66 | message: "duplicate: in purgatory".into(), |
| 68 | }); | 67 | }); |
| 69 | } | 68 | } |
| 70 | // get all repositories and state events from db with identifier | 69 | |
| 70 | // Get all repositories and state events from db with identifier | ||
| 71 | let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; | 71 | let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; |
| 72 | 72 | ||
| 73 | // duplicate check in db | 73 | // Duplicate check in db |
| 74 | if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { | 74 | if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { |
| 75 | tracing::debug!("processed state event duplicate (in db): {}", event.id,); | 75 | tracing::debug!("processed state event duplicate (in db): {}", event.id); |
| 76 | return Ok(WritePolicyResult::Reject { | 76 | return Ok(WritePolicyResult::Reject { |
| 77 | status: true, // Client sees OK | 77 | status: true, |
| 78 | message: "duplicate".into(), | 78 | message: "duplicate".into(), |
| 79 | }); | 79 | }); |
| 80 | } | 80 | } |
| 81 | 81 | ||
| 82 | // check if git data is avialable | 82 | // Check if git data is available |
| 83 | if let Some(repo_with_git_data) = | 83 | if let Some(repo_with_git_data) = |
| 84 | find_repo_with_git_data(&db_repo_data.announcements, &state, &self.ctx.git_data_path) | 84 | find_repo_with_git_data(&db_repo_data.announcements, &state, &self.ctx.git_data_path) |
| 85 | { | 85 | { |
| 86 | tracing::debug!( | 86 | tracing::debug!( |
| 87 | "processing state event git as data already available: {}", | 87 | "processing state event as git data already available: {}", |
| 88 | event.id, | 88 | event.id, |
| 89 | ); | 89 | ); |
| 90 | // find repos for which this state is authorised and align the git refs to this state | ||
| 91 | let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); | ||
| 92 | let mut repo_count = 0; | ||
| 93 | for (owner, maintainers) in by_owner { | ||
| 94 | if maintainers.contains(&event.pubkey.to_string()) { | ||
| 95 | if let Some(previous_state) = db_repo_data | ||
| 96 | .states | ||
| 97 | .iter() | ||
| 98 | .filter(|e| maintainers.contains(&e.event.pubkey.to_string())) | ||
| 99 | .max_by_key(|e| e.event.created_at) | ||
| 100 | { | ||
| 101 | // TODO in event of a tie the event with the biggest event id wins | ||
| 102 | if state.event.created_at > previous_state.event.created_at { | ||
| 103 | if let Some(annoucement) = db_repo_data | ||
| 104 | .announcements | ||
| 105 | .iter() | ||
| 106 | .find(|a| a.event.pubkey.to_string().eq(&owner)) | ||
| 107 | { | ||
| 108 | let repo_path = | ||
| 109 | self.ctx.git_data_path.join(annoucement.repo_path().clone()); | ||
| 110 | 90 | ||
| 111 | if !repo_path.exists() { | 91 | // Use unified processing function |
| 112 | // eg if annoucement doesnt list repo (but stored as its in maintainer set) | 92 | let result = crate::git::process::process_state_with_git_data( |
| 113 | continue; | 93 | &state, |
| 114 | } | 94 | &repo_with_git_data, |
| 115 | // If repo_path != repo_with_git_data, copy missing oids first | 95 | &db_repo_data, |
| 116 | if repo_path != repo_with_git_data { | 96 | &self.ctx.git_data_path, |
| 117 | if let Err(e) = self.copy_missing_oids( | 97 | ); |
| 118 | &repo_with_git_data, | 98 | |
| 119 | &repo_path, | 99 | tracing::info!( |
| 120 | &state, | 100 | identifier = %state.identifier, |
| 121 | ) { | 101 | event_id = %event.id, |
| 122 | tracing::warn!( | 102 | repos_synced = result.repos_synced, |
| 123 | "Failed to copy oids from {} to {}: {}", | 103 | refs_created = result.refs_created, |
| 124 | repo_with_git_data.display(), | 104 | refs_updated = result.refs_updated, |
| 125 | repo_path.display(), | 105 | refs_deleted = result.refs_deleted, |
| 126 | e | 106 | "Processed state event with git data already available" |
| 127 | ); | 107 | ); |
| 128 | } | ||
| 129 | } | ||
| 130 | 108 | ||
| 131 | let result = align_repository_with_state(&repo_path, &state); | 109 | if !result.errors.is_empty() { |
| 132 | repo_count += 1; | 110 | for error in &result.errors { |
| 133 | tracing::info!( | 111 | tracing::warn!( |
| 134 | "Aligned {} with state: created={}, updated={}, deleted={}, head_set={}", | 112 | identifier = %state.identifier, |
| 135 | repo_path.display(), | 113 | event_id = %event.id, |
| 136 | result.refs_created, | 114 | error = %error, |
| 137 | result.refs_updated, | 115 | "Error processing state event" |
| 138 | result.refs_deleted, | 116 | ); |
| 139 | result.head_set | ||
| 140 | ); | ||
| 141 | } | ||
| 142 | } | ||
| 143 | } | ||
| 144 | } | 117 | } |
| 145 | } | 118 | } |
| 146 | 119 | ||
| 147 | tracing::info!( | 120 | // Event will be saved and broadcast by relay builder |
| 148 | "immediately accepting state event. Was latest authorised state and git data updated for {repo_count} repositories: eventid: {}", | 121 | Ok(WritePolicyResult::Accept) |
| 149 | state.event.id, | ||
| 150 | ); | ||
| 151 | // immediately accept the event, bypassing purgatory | ||
| 152 | Ok(WritePolicyResult::Accept) // event should be saved and broadcast | ||
| 153 | } else { | 122 | } else { |
| 154 | // if no git data - add to purgatory | 123 | // If no git data - add to purgatory |
| 155 | // (add_state automatically enqueues for background sync) | 124 | // (add_state automatically enqueues for background sync) |
| 156 | self.ctx | 125 | self.ctx |
| 157 | .purgatory | 126 | .purgatory |
| @@ -163,96 +132,13 @@ impl StatePolicy { | |||
| 163 | state.identifier, | 132 | state.identifier, |
| 164 | ); | 133 | ); |
| 165 | Ok(WritePolicyResult::Reject { | 134 | Ok(WritePolicyResult::Reject { |
| 166 | status: true, // Client sees OK | 135 | status: true, |
| 167 | message: "purgatory: won't be served until git data arrives".into(), | 136 | message: "purgatory: won't be served until git data arrives".into(), |
| 168 | }) | 137 | }) |
| 169 | } | 138 | } |
| 170 | } | 139 | } |
| 171 | 140 | ||
| 172 | /// Copy missing OIDs from a source repository to a target repository | ||
| 173 | /// | ||
| 174 | /// Identifies commits referenced in the state that are missing from the target | ||
| 175 | /// repository and copies them from the source repository using git fetch. | ||
| 176 | /// | ||
| 177 | /// # Arguments | ||
| 178 | /// * `source_repo` - Path to repository containing the commits | ||
| 179 | /// * `target_repo` - Path to repository to receive the commits | ||
| 180 | /// * `state` - Repository state containing commit references | ||
| 181 | /// | ||
| 182 | /// # Returns | ||
| 183 | /// Ok(()) on success, Err with error message on failure | ||
| 184 | fn copy_missing_oids( | ||
| 185 | &self, | ||
| 186 | source_repo: &Path, | ||
| 187 | target_repo: &Path, | ||
| 188 | state: &RepositoryState, | ||
| 189 | ) -> Result<(), String> { | ||
| 190 | use std::process::Command; | ||
| 191 | |||
| 192 | // Collect all commits referenced in the state | ||
| 193 | let mut commits_to_check = Vec::new(); | ||
| 194 | |||
| 195 | for branch in &state.branches { | ||
| 196 | if !branch.commit.starts_with("ref: ") { | ||
| 197 | commits_to_check.push(&branch.commit); | ||
| 198 | } | ||
| 199 | } | ||
| 200 | |||
| 201 | for tag in &state.tags { | ||
| 202 | if !tag.commit.starts_with("ref: ") { | ||
| 203 | commits_to_check.push(&tag.commit); | ||
| 204 | } | ||
| 205 | } | ||
| 206 | |||
| 207 | // Identify missing commits | ||
| 208 | let mut missing_commits = Vec::new(); | ||
| 209 | for commit in commits_to_check { | ||
| 210 | if !git::oid_exists(target_repo, commit) { | ||
| 211 | missing_commits.push(commit); | ||
| 212 | } | ||
| 213 | } | ||
| 214 | |||
| 215 | if missing_commits.is_empty() { | ||
| 216 | tracing::debug!( | ||
| 217 | "No missing commits to copy from {} to {}", | ||
| 218 | source_repo.display(), | ||
| 219 | target_repo.display() | ||
| 220 | ); | ||
| 221 | return Ok(()); | ||
| 222 | } | ||
| 223 | |||
| 224 | tracing::info!( | ||
| 225 | "Copying {} missing commits from {} to {}", | ||
| 226 | missing_commits.len(), | ||
| 227 | source_repo.display(), | ||
| 228 | target_repo.display() | ||
| 229 | ); | ||
| 230 | |||
| 231 | // Fetch each missing commit from source to target | ||
| 232 | for commit in &missing_commits { | ||
| 233 | let output = Command::new("git") | ||
| 234 | .args([ | ||
| 235 | "fetch", | ||
| 236 | source_repo.to_str().ok_or("Invalid source path")?, | ||
| 237 | commit, | ||
| 238 | ]) | ||
| 239 | .current_dir(target_repo) | ||
| 240 | .output() | ||
| 241 | .map_err(|e| format!("Failed to execute git fetch: {}", e))?; | ||
| 242 | |||
| 243 | if !output.status.success() { | ||
| 244 | let stderr = String::from_utf8_lossy(&output.stderr); | ||
| 245 | return Err(format!( | ||
| 246 | "git fetch failed for commit {}: {}", | ||
| 247 | commit, stderr | ||
| 248 | )); | ||
| 249 | } | ||
| 250 | |||
| 251 | tracing::debug!("Copied commit {} to {}", commit, target_repo.display()); | ||
| 252 | } | ||
| 253 | 141 | ||
| 254 | Ok(()) | ||
| 255 | } | ||
| 256 | } | 142 | } |
| 257 | 143 | ||
| 258 | fn find_repo_with_git_data( | 144 | fn find_repo_with_git_data( |