From 39cfcd950eaf31eb721c25b0e60c751d0f279bb6 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 13:48:56 +0000 Subject: purgatory: more robust process_purgatory_state_events syncing --- src/git/sync.rs | 425 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 362 insertions(+), 63 deletions(-) (limited to 'src/git') diff --git a/src/git/sync.rs b/src/git/sync.rs index cf6e93d..949d8e1 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -37,8 +37,7 @@ use tracing::{debug, info, warn}; use nostr_sdk::Event; use crate::git::authorization::{ - collect_authorized_maintainers, fetch_repository_data, pubkey_authorised_for_repo_owners, - RepositoryData, + collect_authorized_maintainers, fetch_repository_data, RepositoryData, }; use crate::git::{self, oid_exists}; use crate::nostr::builder::SharedDatabase; @@ -877,25 +876,49 @@ async fn process_purgatory_state_events( let mut result = ProcessResult::default(); // Find state events in purgatory for this identifier - let purgatory_states = purgatory.find_state(identifier); + let mut purgatory_states = purgatory.find_state(identifier); if purgatory_states.is_empty() { return result; } + // Sort by created_at (oldest first) so we process events in chronological order. + // This ensures that when multiple state events are in purgatory, older ones + // get processed first, allowing newer ones to correctly supersede them. + purgatory_states.sort_by_key(|entry| entry.event.created_at); + debug!( identifier = %identifier, purgatory_states_count = purgatory_states.len(), - "Checking purgatory state events for available git data" + "Checking purgatory state events for available git data (processing oldest first)" ); - // Check which state events can be applied (have all required OIDs) + // Fetch repository data once for all state events + let mut db_repo_data = match fetch_repository_data(database, identifier).await { + Ok(data) => data, + Err(e) => { + warn!( + identifier = %identifier, + error = %e, + "Failed to fetch repository data for purgatory state events" + ); + result + .errors + .push(format!("Failed to fetch repo data: {}", e)); + return result; + } + }; + + // Collect authorized maintainers per owner (computed once) + let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); + + // Process each state event in chronological order for entry in &purgatory_states { - // Check if we have all the git data needed to apply this state event + // Step 0: Check if we have all the git data needed to apply this state event if !can_apply_state(&entry.event, source_repo_path) { debug!( identifier = %identifier, event_id = %entry.event.id, - "State event cannot be applied - missing git OIDs" + "State event cannot be applied - missing git OIDs in source repo" ); continue; } @@ -910,84 +933,196 @@ async fn process_purgatory_state_events( error = %e, "Failed to parse state event from purgatory" ); - result.errors.push(format!("Failed to parse state event: {}", e)); + result + .errors + .push(format!("Failed to parse state event: {}", e)); continue; } }; - // Fetch repository data for authorization check - let db_repo_data = match fetch_repository_data(database, identifier).await { - Ok(data) => data, - Err(e) => { - warn!( - identifier = %identifier, - event_id = %entry.event.id, - error = %e, - "Failed to fetch repository data for state event" - ); - result.errors.push(format!("Failed to fetch repo data: {}", e)); - continue; - } - }; + let state_author = state.event.pubkey.to_hex(); + + // Step 1: Identify owner repos that the state event author is maintainer for + let authorized_owners: Vec<&String> = by_owner + .iter() + .filter(|(_, maintainers)| maintainers.contains(&state_author)) + .map(|(owner, _)| owner) + .collect(); - // Check authorization at release time - let repo_owners_authorising_pubkey = - pubkey_authorised_for_repo_owners(&entry.event.pubkey, &db_repo_data); - if repo_owners_authorising_pubkey.is_empty() { + if authorized_owners.is_empty() { debug!( identifier = %identifier, event_id = %entry.event.id, - pubkey = %entry.event.pubkey, - "State event author no longer authorized - skipping" + pubkey = %state_author, + "State event author not authorized for any owner - skipping" ); continue; } - // Sync to owner repos and align refs - let sync_result = sync_to_owner_repos(source_repo_path, &state, &db_repo_data, git_data_path); - result.repos_synced += sync_result.repos_synced; - result.refs_created += sync_result.refs_created; - result.refs_updated += sync_result.refs_updated; - result.refs_deleted += sync_result.refs_deleted; + // Track if we applied to at least one owner repo + let mut applied_to_any = false; - // Save event to database - match database.save_event(&entry.event).await { - Ok(_) => { - info!( + // Process each owner repo that authorizes this state event author + for owner in &authorized_owners { + let maintainers = by_owner.get(*owner).unwrap(); + + // Step 2: Check if this state event is the latest authorized for this owner + // Only consider database states, not other purgatory states + let is_latest = is_latest_authorized_state( + &state, + maintainers, + &db_repo_data.states, + ); + + if !is_latest { + debug!( identifier = %identifier, event_id = %entry.event.id, - "Saved purgatory state event to database" + owner = %owner, + "Skipping owner - a newer authorized state exists" ); + continue; + } - // Notify WebSocket subscribers - if let Some(relay) = local_relay { - if relay.notify_event(entry.event.clone()) { - debug!( - identifier = %identifier, - event_id = %entry.event.id, - "Broadcast state event to WebSocket listeners" - ); - } - } + // Find the announcement for this owner + let announcement = db_repo_data + .announcements + .iter() + .find(|a| a.event.pubkey.to_hex() == **owner); - // Remove from purgatory - purgatory.remove_state_event(identifier, &entry.event.id); - result.states_released += 1; + let Some(announcement) = announcement else { + continue; + }; - info!( + let target_repo_path = git_data_path.join(announcement.repo_path()); + + // Step 3: Check git repo exists for that owner + if !target_repo_path.exists() { + debug!( identifier = %identifier, - event_id = %entry.event.id, - "Released state event from purgatory" + owner = %owner, + repo_path = %target_repo_path.display(), + "Skipping owner - repository doesn't exist" ); + continue; } - Err(e) => { - warn!( - identifier = %identifier, - event_id = %entry.event.id, - error = %e, - "Failed to save state event to database" - ); - result.errors.push(format!("Failed to save state event: {}", e)); + + // Step 4: Copy all required OIDs to that repo (unless it's source_repo_path) + if target_repo_path != source_repo_path { + if let Err(e) = + copy_missing_oids_between_repos(source_repo_path, &target_repo_path, &state) + { + warn!( + identifier = %identifier, + source = %source_repo_path.display(), + target = %target_repo_path.display(), + error = %e, + "Failed to copy OIDs between repos" + ); + result + .errors + .push((target_repo_path.display().to_string(), e).1); + // Continue anyway - we'll try to align what we can + } + } + + // Step 5: Reset the git state in that repo to match the state event + // (excluding refs/nostr/*) + let align_result = align_repository_with_state(&target_repo_path, &state); + result.repos_synced += 1; + result.refs_created += align_result.refs_created; + result.refs_updated += align_result.refs_updated; + result.refs_deleted += align_result.refs_deleted; + + info!( + identifier = %identifier, + owner = %owner, + event_id = %entry.event.id, + repo_path = %target_repo_path.display(), + refs_created = align_result.refs_created, + refs_updated = align_result.refs_updated, + refs_deleted = align_result.refs_deleted, + head_set = align_result.head_set, + "Aligned repository with state from purgatory" + ); + + applied_to_any = true; + } + + // We have the git data now, so we should release from purgatory regardless of + // whether we applied to any repo. The question is: should we save to DB or just remove? + // + // - If there's a newer state event from the same author already in the DB, just remove + // (no point saving an older event that will never be used) + // - Otherwise, save it to the DB (even if we didn't apply to any repo, because in the + // future the currently-authorized state event might be deleted and this one should apply) + + // Check if there's a newer state from the same author in the database + let has_newer_from_same_author = db_repo_data.states.iter().any(|s| { + s.event.pubkey == state.event.pubkey + && (s.event.created_at > state.event.created_at + || (s.event.created_at == state.event.created_at + && s.event.id > state.event.id)) + }); + + if has_newer_from_same_author { + // Just remove from purgatory without saving - a newer event from same author exists + purgatory.remove_state_event(identifier, &entry.event.id); + result.states_released += 1; + + debug!( + identifier = %identifier, + event_id = %entry.event.id, + author = %state_author, + "Removed older state event from purgatory - newer event from same author exists in DB" + ); + } else { + // Save to database (even if we didn't apply to any repo) + match database.save_event(&entry.event).await { + Ok(_) => { + info!( + identifier = %identifier, + event_id = %entry.event.id, + applied_to_repos = applied_to_any, + "Saved purgatory state event to database" + ); + + // Notify WebSocket subscribers + if let Some(relay) = local_relay { + if relay.notify_event(entry.event.clone()) { + debug!( + identifier = %identifier, + event_id = %entry.event.id, + "Broadcast state event to WebSocket listeners" + ); + } + } + + // Remove from purgatory + purgatory.remove_state_event(identifier, &entry.event.id); + result.states_released += 1; + + // Add the newly saved state to db_repo_data so subsequent iterations + // can correctly determine if they're the latest + db_repo_data.states.push(state.clone()); + + info!( + identifier = %identifier, + event_id = %entry.event.id, + "Released state event from purgatory" + ); + } + Err(e) => { + warn!( + identifier = %identifier, + event_id = %entry.event.id, + error = %e, + "Failed to save state event to database" + ); + result + .errors + .push(format!("Failed to save state event: {}", e)); + } } } } @@ -995,6 +1130,45 @@ async fn process_purgatory_state_events( result } +/// Check if a state event is the latest authorized state for a given maintainer set. +/// +/// Only considers states already in the database, not other purgatory states. +/// +/// # Arguments +/// * `state` - The state event to check +/// * `maintainers` - The set of authorized maintainers for the owner +/// * `db_states` - State events from the database +/// +/// # Returns +/// true if this state is the latest (or equal latest) among all authorized states in the DB +fn is_latest_authorized_state( + state: &RepositoryState, + maintainers: &[String], + db_states: &[RepositoryState], +) -> bool { + // Find the latest authorized state from database + let latest_db_state = db_states + .iter() + .filter(|s| maintainers.contains(&s.event.pubkey.to_hex())) + .max_by(|a, b| { + // Compare by created_at, then by event id for tie-breaking + a.event + .created_at + .cmp(&b.event.created_at) + .then_with(|| a.event.id.cmp(&b.event.id)) + }); + + match latest_db_state { + None => true, // No other states exist in DB, this is the latest + Some(latest) => { + // This state is latest if it's newer, or if equal timestamp with larger event id + state.event.created_at > latest.event.created_at + || (state.event.created_at == latest.event.created_at + && state.event.id >= latest.event.id) + } + } +} + /// Process PR events from purgatory that can now be satisfied. async fn process_purgatory_pr_events( identifier: &str, @@ -1146,6 +1320,7 @@ fn extract_owner_from_repo_path(repo_path: &Path, git_data_path: &Path) -> Optio #[cfg(test)] mod tests { use super::*; + use nostr_sdk::Keys; #[test] fn test_process_result_default() { @@ -1427,4 +1602,128 @@ mod tests { assert_eq!(owners.len(), 1); assert!(owners.contains("valid_owner")); } + + // Helper function to create a test state event with specific timestamp + // The `nonce` parameter ensures different events have different IDs even with same timestamp + fn create_test_state_event_with_nonce( + keys: &Keys, + identifier: &str, + created_at: u64, + nonce: &str, + ) -> RepositoryState { + use nostr_sdk::{EventBuilder, Kind, Tag, TagKind, Timestamp}; + + let tags = vec![ + Tag::custom(TagKind::d(), vec![identifier.to_string()]), + Tag::custom( + TagKind::custom("refs/heads/main"), + vec![format!("abc123{}", nonce)], + ), + ]; + + let event = EventBuilder::new(Kind::from(30618), nonce) + .tags(tags) + .custom_created_at(Timestamp::from(created_at)) + .sign_with_keys(keys) + .unwrap(); + + RepositoryState::from_event(event).unwrap() + } + + // Helper function to create a test state event with specific timestamp + fn create_test_state_event( + keys: &Keys, + identifier: &str, + created_at: u64, + ) -> RepositoryState { + create_test_state_event_with_nonce(keys, identifier, created_at, "") + } + + #[test] + fn test_is_latest_authorized_state_no_other_states() { + let keys = Keys::generate(); + let state = create_test_state_event(&keys, "test-repo", 1000); + let maintainers = vec![keys.public_key().to_hex()]; + + // No other states - should be latest + let result = is_latest_authorized_state(&state, &maintainers, &[]); + assert!(result); + } + + #[test] + fn test_is_latest_authorized_state_newer_than_db() { + let keys = Keys::generate(); + let old_state = create_test_state_event(&keys, "test-repo", 1000); + let new_state = create_test_state_event(&keys, "test-repo", 2000); + let maintainers = vec![keys.public_key().to_hex()]; + + // new_state is newer than old_state in db + let result = is_latest_authorized_state(&new_state, &maintainers, &[old_state]); + assert!(result); + } + + #[test] + fn test_is_latest_authorized_state_older_than_db() { + let keys = Keys::generate(); + let old_state = create_test_state_event(&keys, "test-repo", 1000); + let new_state = create_test_state_event(&keys, "test-repo", 2000); + let maintainers = vec![keys.public_key().to_hex()]; + + // old_state is older than new_state in db + let result = is_latest_authorized_state(&old_state, &maintainers, &[new_state]); + assert!(!result); + } + + #[test] + fn test_is_latest_authorized_state_ignores_unauthorized_states() { + let keys1 = Keys::generate(); + let keys2 = Keys::generate(); + + let state1 = create_test_state_event(&keys1, "test-repo", 1000); + let state2 = create_test_state_event(&keys2, "test-repo", 2000); + + // Only keys1 is authorized + let maintainers = vec![keys1.public_key().to_hex()]; + + // state1 should be latest because state2 is not authorized + let result = is_latest_authorized_state(&state1, &maintainers, &[state2]); + assert!(result); + } + + #[test] + fn test_is_latest_authorized_state_same_timestamp_uses_event_id() { + let keys = Keys::generate(); + + // Create two states with same timestamp but different content (different event IDs) + let state1 = create_test_state_event_with_nonce(&keys, "test-repo", 1000, "nonce1"); + let state2 = create_test_state_event_with_nonce(&keys, "test-repo", 1000, "nonce2"); + + let maintainers = vec![keys.public_key().to_hex()]; + + // The one with larger event ID should be considered latest + let (latest, older) = if state1.event.id > state2.event.id { + (state1, state2) + } else { + (state2, state1) + }; + + // latest should be considered latest + let result = is_latest_authorized_state(&latest, &maintainers, &[older.clone()]); + assert!(result); + + // older should not be considered latest + let result = is_latest_authorized_state(&older, &maintainers, &[latest]); + assert!(!result); + } + + #[test] + fn test_is_latest_authorized_state_same_event_is_latest() { + let keys = Keys::generate(); + let state = create_test_state_event(&keys, "test-repo", 1000); + let maintainers = vec![keys.public_key().to_hex()]; + + // When the state being checked is also in the db_states, it should be considered latest + let result = is_latest_authorized_state(&state, &maintainers, &[state.clone()]); + assert!(result); + } } -- cgit v1.2.3