diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-05 14:31:47 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-05 14:31:47 +0000 |
| commit | f8235b7977c673524c12a229eddb7ace6b0c2c0d (patch) | |
| tree | 1725861f36085adc7b7c3f033e67cde910fe4c75 | |
| parent | 8cb45487d7125a14d15c15dfc14b0804eb22ffd6 (diff) | |
purgatory: git data sync applies state and saves event
| -rw-r--r-- | src/main.rs | 6 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 8 | ||||
| -rw-r--r-- | src/nostr/policy/mod.rs | 19 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 1 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 539 |
5 files changed, 516 insertions, 57 deletions
diff --git a/src/main.rs b/src/main.rs index fbe3e34..59edc09 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -62,6 +62,12 @@ async fn main() -> Result<()> { | |||
| 62 | config.domain | 62 | config.domain |
| 63 | ); | 63 | ); |
| 64 | 64 | ||
| 65 | // Set the local relay on the write policy for purgatory notifications | ||
| 66 | // This must be done after relay creation since the relay depends on the policy | ||
| 67 | relay_with_db | ||
| 68 | .write_policy | ||
| 69 | .set_local_relay(relay_with_db.relay.clone()); | ||
| 70 | |||
| 65 | // Start SyncManager for proactive sync (Phase 2: multi-relay support, Phase 3: health tracking) | 71 | // Start SyncManager for proactive sync (Phase 2: multi-relay support, Phase 3: health tracking) |
| 66 | // Even without bootstrap relay, SyncManager discovers relays from stored announcements | 72 | // Even without bootstrap relay, SyncManager discovers relays from stored announcements |
| 67 | // Pass the already-registered sync metrics from Metrics to avoid duplicate registration | 73 | // Pass the already-registered sync metrics from Metrics to avoid duplicate registration |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index da81e64..db2f59b 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -74,6 +74,14 @@ impl Nip34WritePolicy { | |||
| 74 | &self.ctx.purgatory | 74 | &self.ctx.purgatory |
| 75 | } | 75 | } |
| 76 | 76 | ||
| 77 | /// Set the local relay for purgatory notifications. | ||
| 78 | /// | ||
| 79 | /// This must be called after the relay is created since the relay depends | ||
| 80 | /// on this policy, but purgatory sync needs the relay to notify subscribers. | ||
| 81 | pub fn set_local_relay(&self, relay: nostr_relay_builder::LocalRelay) { | ||
| 82 | self.ctx.set_local_relay(relay); | ||
| 83 | } | ||
| 84 | |||
| 77 | /// Handle repository announcement event | 85 | /// Handle repository announcement event |
| 78 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { | 86 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { |
| 79 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | 87 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); |
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 2a446fe..c3c5829 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs | |||
| @@ -17,6 +17,7 @@ pub use state::{AlignmentResult, StatePolicy, StateResult}; | |||
| 17 | 17 | ||
| 18 | use super::SharedDatabase; | 18 | use super::SharedDatabase; |
| 19 | use crate::purgatory::Purgatory; | 19 | use crate::purgatory::Purgatory; |
| 20 | use nostr_relay_builder::LocalRelay; | ||
| 20 | use std::sync::Arc; | 21 | use std::sync::Arc; |
| 21 | 22 | ||
| 22 | /// Shared context for all sub-policies | 23 | /// Shared context for all sub-policies |
| @@ -26,6 +27,8 @@ pub struct PolicyContext { | |||
| 26 | pub database: SharedDatabase, | 27 | pub database: SharedDatabase, |
| 27 | pub git_data_path: std::path::PathBuf, | 28 | pub git_data_path: std::path::PathBuf, |
| 28 | pub purgatory: Arc<Purgatory>, | 29 | pub purgatory: Arc<Purgatory>, |
| 30 | /// Local relay for notifying WebSocket subscribers (set after relay creation) | ||
| 31 | pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, | ||
| 29 | } | 32 | } |
| 30 | 33 | ||
| 31 | impl PolicyContext { | 34 | impl PolicyContext { |
| @@ -40,6 +43,22 @@ impl PolicyContext { | |||
| 40 | database, | 43 | database, |
| 41 | git_data_path: git_data_path.into(), | 44 | git_data_path: git_data_path.into(), |
| 42 | purgatory, | 45 | purgatory, |
| 46 | local_relay: Arc::new(std::sync::RwLock::new(None)), | ||
| 43 | } | 47 | } |
| 44 | } | 48 | } |
| 49 | |||
| 50 | /// Set the local relay after it's been created. | ||
| 51 | /// | ||
| 52 | /// This is called after the relay is built since the relay depends on the policy | ||
| 53 | /// but the policy needs the relay for purgatory notifications. | ||
| 54 | pub fn set_local_relay(&self, relay: LocalRelay) { | ||
| 55 | let mut guard = self.local_relay.write().unwrap(); | ||
| 56 | *guard = Some(relay); | ||
| 57 | } | ||
| 58 | |||
| 59 | /// Get a clone of the local relay if it's been set. | ||
| 60 | pub fn get_local_relay(&self) -> Option<LocalRelay> { | ||
| 61 | let guard = self.local_relay.read().unwrap(); | ||
| 62 | guard.clone() | ||
| 63 | } | ||
| 45 | } | 64 | } |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 9ca3ee6..7521ef1 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -179,6 +179,7 @@ impl StatePolicy { | |||
| 179 | state.clone(), | 179 | state.clone(), |
| 180 | self.ctx.database.clone(), | 180 | self.ctx.database.clone(), |
| 181 | Some(self.ctx.domain.clone()), | 181 | Some(self.ctx.domain.clone()), |
| 182 | self.ctx.get_local_relay(), | ||
| 182 | ); | 183 | ); |
| 183 | 184 | ||
| 184 | tracing::info!( | 185 | tracing::info!( |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 8634aff..f15d6bd 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -27,7 +27,8 @@ use std::sync::Arc; | |||
| 27 | use std::time::{Duration, Instant}; | 27 | use std::time::{Duration, Instant}; |
| 28 | 28 | ||
| 29 | use crate::git::authorization::{ | 29 | use crate::git::authorization::{ |
| 30 | fetch_repository_data, pubkey_authorised_for_repo_owners, RepositoryData, | 30 | collect_authorized_maintainers, fetch_repository_data, pubkey_authorised_for_repo_owners, |
| 31 | RepositoryData, | ||
| 31 | }; | 32 | }; |
| 32 | use crate::git::oid_exists; | 33 | use crate::git::oid_exists; |
| 33 | use crate::nostr::builder::SharedDatabase; | 34 | use crate::nostr::builder::SharedDatabase; |
| @@ -92,19 +93,29 @@ impl Purgatory { | |||
| 92 | /// from remote servers listed in the repository announcements. It's called | 93 | /// from remote servers listed in the repository announcements. It's called |
| 93 | /// when a state event arrives but the required git data isn't available locally. | 94 | /// when a state event arrives but the required git data isn't available locally. |
| 94 | /// | 95 | /// |
| 96 | /// After successfully syncing OIDs: | ||
| 97 | /// 1. Syncs OIDs to other owner repositories that authorize this state | ||
| 98 | /// 2. Aligns refs with state for each repository | ||
| 99 | /// 3. Saves the state event to database | ||
| 100 | /// 4. Notifies WebSocket subscribers | ||
| 101 | /// 5. Removes the event from purgatory | ||
| 102 | /// | ||
| 95 | /// # Arguments | 103 | /// # Arguments |
| 96 | /// * `state` - The parsed repository state event | 104 | /// * `state` - The parsed repository state event |
| 97 | /// * `database` - Database to query for repository announcements | 105 | /// * `database` - Database to query for repository announcements and save events |
| 98 | /// * `our_domain` - Our service domain to exclude from fetch targets | 106 | /// * `our_domain` - Our service domain to exclude from fetch targets |
| 107 | /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) | ||
| 99 | pub fn start_state_sync( | 108 | pub fn start_state_sync( |
| 100 | &self, | 109 | &self, |
| 101 | state: RepositoryState, | 110 | state: RepositoryState, |
| 102 | database: SharedDatabase, | 111 | database: SharedDatabase, |
| 103 | our_domain: Option<String>, | 112 | our_domain: Option<String>, |
| 113 | local_relay: Option<nostr_relay_builder::LocalRelay>, | ||
| 104 | ) { | 114 | ) { |
| 105 | let git_data_path = self.git_data_path.clone(); | 115 | let git_data_path = self.git_data_path.clone(); |
| 106 | let identifier = state.identifier.clone(); | 116 | let identifier = state.identifier.clone(); |
| 107 | let event_id = state.event.id; | 117 | let event_id = state.event.id; |
| 118 | let purgatory = self.clone(); | ||
| 108 | 119 | ||
| 109 | tokio::spawn(async move { | 120 | tokio::spawn(async move { |
| 110 | tracing::debug!( | 121 | tracing::debug!( |
| @@ -113,15 +124,31 @@ impl Purgatory { | |||
| 113 | "Starting background git data sync for purgatory state event" | 124 | "Starting background git data sync for purgatory state event" |
| 114 | ); | 125 | ); |
| 115 | 126 | ||
| 116 | if let Err(e) = | 127 | match sync_state_git_data( |
| 117 | sync_state_git_data(state, &database, &git_data_path, our_domain.as_deref()).await | 128 | state, |
| 129 | &database, | ||
| 130 | &git_data_path, | ||
| 131 | our_domain.as_deref(), | ||
| 132 | local_relay.as_ref(), | ||
| 133 | &purgatory, | ||
| 134 | ) | ||
| 135 | .await | ||
| 118 | { | 136 | { |
| 119 | tracing::warn!( | 137 | Ok(()) => { |
| 120 | identifier = %identifier, | 138 | tracing::info!( |
| 121 | event_id = %event_id, | 139 | identifier = %identifier, |
| 122 | error = %e, | 140 | event_id = %event_id, |
| 123 | "Failed to sync git data for purgatory state event" | 141 | "Successfully synced git data and released state event from purgatory" |
| 124 | ); | 142 | ); |
| 143 | } | ||
| 144 | Err(e) => { | ||
| 145 | tracing::warn!( | ||
| 146 | identifier = %identifier, | ||
| 147 | event_id = %event_id, | ||
| 148 | error = %e, | ||
| 149 | "Failed to sync git data for purgatory state event" | ||
| 150 | ); | ||
| 151 | } | ||
| 125 | } | 152 | } |
| 126 | }); | 153 | }); |
| 127 | } | 154 | } |
| @@ -427,11 +454,18 @@ impl Purgatory { | |||
| 427 | /// 3. Collects clone URLs from authorized announcements | 454 | /// 3. Collects clone URLs from authorized announcements |
| 428 | /// 4. Finds the most complete local repo to fetch into | 455 | /// 4. Finds the most complete local repo to fetch into |
| 429 | /// 5. Identifies missing OIDs and fetches them from remote servers | 456 | /// 5. Identifies missing OIDs and fetches them from remote servers |
| 457 | /// 6. After OIDs are synced, copies them to other owner repositories | ||
| 458 | /// 7. Aligns refs with state for each authorized repository | ||
| 459 | /// 8. Saves the state event to database | ||
| 460 | /// 9. Notifies WebSocket subscribers | ||
| 461 | /// 10. Removes the event from purgatory | ||
| 430 | async fn sync_state_git_data( | 462 | async fn sync_state_git_data( |
| 431 | state: RepositoryState, | 463 | state: RepositoryState, |
| 432 | database: &SharedDatabase, | 464 | database: &SharedDatabase, |
| 433 | git_data_path: &Path, | 465 | git_data_path: &Path, |
| 434 | our_domain: Option<&str>, | 466 | our_domain: Option<&str>, |
| 467 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 468 | purgatory: &Purgatory, | ||
| 435 | ) -> Result<()> { | 469 | ) -> Result<()> { |
| 436 | // Fetch repository data from database | 470 | // Fetch repository data from database |
| 437 | let db_repo_data = fetch_repository_data(database, &state.identifier).await?; | 471 | let db_repo_data = fetch_repository_data(database, &state.identifier).await?; |
| @@ -486,81 +520,469 @@ async fn sync_state_git_data( | |||
| 486 | ); | 520 | ); |
| 487 | 521 | ||
| 488 | // Find the most complete local repo to fetch into | 522 | // Find the most complete local repo to fetch into |
| 489 | let (repo_path, missing_oids) = | 523 | let (source_repo_path, missing_oids) = |
| 490 | get_most_complete_local_repo(&db_repo_data, &state, git_data_path)?; | 524 | get_most_complete_local_repo(&db_repo_data, &state, git_data_path)?; |
| 491 | 525 | ||
| 492 | if missing_oids.is_empty() { | 526 | // Fetch missing OIDs from remote servers |
| 527 | if !missing_oids.is_empty() { | ||
| 528 | tracing::info!( | ||
| 529 | identifier = %state.identifier, | ||
| 530 | repo_path = %source_repo_path.display(), | ||
| 531 | missing_oids = ?missing_oids, | ||
| 532 | "Attempting to fetch {} missing OIDs from remote servers", | ||
| 533 | missing_oids.len() | ||
| 534 | ); | ||
| 535 | |||
| 536 | // Try to fetch from each server until we get all missing OIDs | ||
| 537 | let mut last_error: Option<String> = None; | ||
| 538 | for server_url in &servers { | ||
| 539 | match fetch_missing_oids_from_server(&source_repo_path, server_url, &missing_oids).await | ||
| 540 | { | ||
| 541 | Ok(fetched) => { | ||
| 542 | if fetched > 0 { | ||
| 543 | tracing::info!( | ||
| 544 | identifier = %state.identifier, | ||
| 545 | server = %server_url, | ||
| 546 | fetched = %fetched, | ||
| 547 | "Successfully fetched git data" | ||
| 548 | ); | ||
| 549 | } | ||
| 550 | |||
| 551 | // Check if all OIDs are now available | ||
| 552 | let still_missing: Vec<_> = missing_oids | ||
| 553 | .iter() | ||
| 554 | .filter(|oid| !oid_exists(&source_repo_path, oid)) | ||
| 555 | .collect(); | ||
| 556 | |||
| 557 | if still_missing.is_empty() { | ||
| 558 | tracing::info!( | ||
| 559 | identifier = %state.identifier, | ||
| 560 | "All missing OIDs fetched successfully" | ||
| 561 | ); | ||
| 562 | break; | ||
| 563 | } | ||
| 564 | } | ||
| 565 | Err(e) => { | ||
| 566 | tracing::debug!( | ||
| 567 | identifier = %state.identifier, | ||
| 568 | server = %server_url, | ||
| 569 | error = %e, | ||
| 570 | "Failed to fetch from server" | ||
| 571 | ); | ||
| 572 | last_error = Some(e.to_string()); | ||
| 573 | } | ||
| 574 | } | ||
| 575 | } | ||
| 576 | |||
| 577 | // Check final state - if still missing OIDs, fail | ||
| 578 | let still_missing: Vec<_> = missing_oids | ||
| 579 | .iter() | ||
| 580 | .filter(|oid| !oid_exists(&source_repo_path, oid)) | ||
| 581 | .collect(); | ||
| 582 | |||
| 583 | if !still_missing.is_empty() { | ||
| 584 | bail!( | ||
| 585 | "Failed to fetch {} OIDs from any server. Last error: {:?}", | ||
| 586 | still_missing.len(), | ||
| 587 | last_error | ||
| 588 | ); | ||
| 589 | } | ||
| 590 | } else { | ||
| 493 | tracing::debug!( | 591 | tracing::debug!( |
| 494 | identifier = %state.identifier, | 592 | identifier = %state.identifier, |
| 495 | repo_path = %repo_path.display(), | 593 | repo_path = %source_repo_path.display(), |
| 496 | "No missing OIDs - git data is already complete" | 594 | "No missing OIDs - git data is already complete" |
| 497 | ); | 595 | ); |
| 498 | return Ok(()); | 596 | } |
| 597 | |||
| 598 | // Now that we have all OIDs, sync to other owner repositories and align refs | ||
| 599 | let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); | ||
| 600 | let mut repo_count = 0; | ||
| 601 | |||
| 602 | for (owner, maintainers) in &by_owner { | ||
| 603 | // Check if this state's author is authorized for this owner | ||
| 604 | if !maintainers.contains(&state.event.pubkey.to_hex()) { | ||
| 605 | continue; | ||
| 606 | } | ||
| 607 | |||
| 608 | // Find the previous latest state for this owner's maintainer set | ||
| 609 | let previous_state = db_repo_data | ||
| 610 | .states | ||
| 611 | .iter() | ||
| 612 | .filter(|s| maintainers.contains(&s.event.pubkey.to_hex())) | ||
| 613 | .max_by_key(|s| s.event.created_at); | ||
| 614 | |||
| 615 | // Only update if this state is newer than any existing state | ||
| 616 | // TODO: in event of a tie, the event with the biggest event id wins | ||
| 617 | if let Some(prev) = previous_state { | ||
| 618 | if state.event.created_at <= prev.event.created_at { | ||
| 619 | tracing::debug!( | ||
| 620 | identifier = %state.identifier, | ||
| 621 | owner = %owner, | ||
| 622 | "Skipping owner - existing state is newer or equal" | ||
| 623 | ); | ||
| 624 | continue; | ||
| 625 | } | ||
| 626 | } | ||
| 627 | |||
| 628 | // Find the announcement for this owner | ||
| 629 | let announcement = db_repo_data | ||
| 630 | .announcements | ||
| 631 | .iter() | ||
| 632 | .find(|a| a.event.pubkey.to_hex() == *owner); | ||
| 633 | |||
| 634 | let Some(announcement) = announcement else { | ||
| 635 | continue; | ||
| 636 | }; | ||
| 637 | |||
| 638 | let target_repo_path = git_data_path.join(announcement.repo_path()); | ||
| 639 | |||
| 640 | if !target_repo_path.exists() { | ||
| 641 | // Repository doesn't exist (e.g., announcement doesn't list this service) | ||
| 642 | tracing::debug!( | ||
| 643 | identifier = %state.identifier, | ||
| 644 | owner = %owner, | ||
| 645 | repo_path = %target_repo_path.display(), | ||
| 646 | "Skipping owner - repository doesn't exist" | ||
| 647 | ); | ||
| 648 | continue; | ||
| 649 | } | ||
| 650 | |||
| 651 | // Copy missing OIDs from source repo to target repo if different | ||
| 652 | if target_repo_path != source_repo_path { | ||
| 653 | if let Err(e) = | ||
| 654 | copy_missing_oids_between_repos(&source_repo_path, &target_repo_path, &state) | ||
| 655 | { | ||
| 656 | tracing::warn!( | ||
| 657 | identifier = %state.identifier, | ||
| 658 | source = %source_repo_path.display(), | ||
| 659 | target = %target_repo_path.display(), | ||
| 660 | error = %e, | ||
| 661 | "Failed to copy OIDs between repos" | ||
| 662 | ); | ||
| 663 | // Continue anyway - we'll try to align what we can | ||
| 664 | } | ||
| 665 | } | ||
| 666 | |||
| 667 | // Align refs with state | ||
| 668 | let result = align_repository_with_state(&target_repo_path, &state); | ||
| 669 | repo_count += 1; | ||
| 670 | |||
| 671 | tracing::info!( | ||
| 672 | identifier = %state.identifier, | ||
| 673 | owner = %owner, | ||
| 674 | repo_path = %target_repo_path.display(), | ||
| 675 | refs_created = result.refs_created, | ||
| 676 | refs_updated = result.refs_updated, | ||
| 677 | refs_deleted = result.refs_deleted, | ||
| 678 | head_set = result.head_set, | ||
| 679 | "Aligned repository with state from purgatory sync" | ||
| 680 | ); | ||
| 499 | } | 681 | } |
| 500 | 682 | ||
| 501 | tracing::info!( | 683 | tracing::info!( |
| 502 | identifier = %state.identifier, | 684 | identifier = %state.identifier, |
| 503 | repo_path = %repo_path.display(), | 685 | event_id = %state.event.id, |
| 504 | missing_oids = ?missing_oids, | 686 | repo_count = repo_count, |
| 505 | "Attempting to fetch {} missing OIDs from remote servers", | 687 | "Synced git data and aligned {} repositories", |
| 506 | missing_oids.len() | 688 | repo_count |
| 507 | ); | 689 | ); |
| 508 | 690 | ||
| 509 | // Try to fetch from each server until we get all missing OIDs | 691 | // Save state event to database |
| 510 | let mut last_error: Option<String> = None; | 692 | match database.save_event(&state.event).await { |
| 511 | for server_url in &servers { | 693 | Ok(_) => { |
| 512 | match fetch_missing_oids_from_server(&repo_path, server_url, &missing_oids).await { | 694 | tracing::info!( |
| 513 | Ok(fetched) => { | 695 | identifier = %state.identifier, |
| 514 | if fetched > 0 { | 696 | event_id = %state.event.id, |
| 697 | "Saved purgatory state event to database after git sync" | ||
| 698 | ); | ||
| 699 | |||
| 700 | // Notify WebSocket subscribers | ||
| 701 | if let Some(relay) = local_relay { | ||
| 702 | if relay.notify_event(state.event.clone()) { | ||
| 515 | tracing::info!( | 703 | tracing::info!( |
| 516 | identifier = %state.identifier, | 704 | identifier = %state.identifier, |
| 517 | server = %server_url, | 705 | event_id = %state.event.id, |
| 518 | fetched = %fetched, | 706 | "Broadcast purgatory state event to websocket listeners" |
| 519 | "Successfully fetched git data" | 707 | ); |
| 708 | } else { | ||
| 709 | tracing::warn!( | ||
| 710 | identifier = %state.identifier, | ||
| 711 | event_id = %state.event.id, | ||
| 712 | "Failed to broadcast purgatory state event to websocket listeners" | ||
| 520 | ); | 713 | ); |
| 521 | } | 714 | } |
| 715 | } | ||
| 522 | 716 | ||
| 523 | // Check if all OIDs are now available | 717 | // Remove from purgatory |
| 524 | let still_missing: Vec<_> = missing_oids | 718 | purgatory.remove_state_event(&state.identifier, &state.event.id); |
| 525 | .iter() | 719 | tracing::info!( |
| 526 | .filter(|oid| !oid_exists(&repo_path, oid)) | 720 | identifier = %state.identifier, |
| 527 | .collect(); | 721 | event_id = %state.event.id, |
| 722 | "Removed state event from purgatory after successful sync" | ||
| 723 | ); | ||
| 724 | } | ||
| 725 | Err(e) => { | ||
| 726 | tracing::warn!( | ||
| 727 | identifier = %state.identifier, | ||
| 728 | event_id = %state.event.id, | ||
| 729 | error = %e, | ||
| 730 | "Failed to save purgatory state event to database" | ||
| 731 | ); | ||
| 732 | // Don't remove from purgatory if save failed - it will retry or expire | ||
| 733 | bail!("Failed to save state event to database: {}", e); | ||
| 734 | } | ||
| 735 | } | ||
| 736 | |||
| 737 | Ok(()) | ||
| 738 | } | ||
| 739 | |||
| 740 | /// Copy missing OIDs from a source repository to a target repository. | ||
| 741 | /// | ||
| 742 | /// Identifies commits referenced in the state that are missing from the target | ||
| 743 | /// repository and copies them from the source repository using git fetch. | ||
| 744 | fn copy_missing_oids_between_repos( | ||
| 745 | source_repo: &Path, | ||
| 746 | target_repo: &Path, | ||
| 747 | state: &RepositoryState, | ||
| 748 | ) -> Result<(), String> { | ||
| 749 | // Collect all commits referenced in the state | ||
| 750 | let mut commits_to_check = Vec::new(); | ||
| 528 | 751 | ||
| 529 | if still_missing.is_empty() { | 752 | for branch in &state.branches { |
| 753 | if !branch.commit.starts_with("ref: ") { | ||
| 754 | commits_to_check.push(&branch.commit); | ||
| 755 | } | ||
| 756 | } | ||
| 757 | |||
| 758 | for tag in &state.tags { | ||
| 759 | if !tag.commit.starts_with("ref: ") { | ||
| 760 | commits_to_check.push(&tag.commit); | ||
| 761 | } | ||
| 762 | } | ||
| 763 | |||
| 764 | // Identify missing commits | ||
| 765 | let mut missing_commits = Vec::new(); | ||
| 766 | for commit in commits_to_check { | ||
| 767 | if !oid_exists(target_repo, commit) { | ||
| 768 | missing_commits.push(commit); | ||
| 769 | } | ||
| 770 | } | ||
| 771 | |||
| 772 | if missing_commits.is_empty() { | ||
| 773 | tracing::debug!( | ||
| 774 | "No missing commits to copy from {} to {}", | ||
| 775 | source_repo.display(), | ||
| 776 | target_repo.display() | ||
| 777 | ); | ||
| 778 | return Ok(()); | ||
| 779 | } | ||
| 780 | |||
| 781 | tracing::info!( | ||
| 782 | "Copying {} missing commits from {} to {}", | ||
| 783 | missing_commits.len(), | ||
| 784 | source_repo.display(), | ||
| 785 | target_repo.display() | ||
| 786 | ); | ||
| 787 | |||
| 788 | // Fetch each missing commit from source to target | ||
| 789 | for commit in &missing_commits { | ||
| 790 | let output = Command::new("git") | ||
| 791 | .args([ | ||
| 792 | "fetch", | ||
| 793 | source_repo.to_str().ok_or("Invalid source path")?, | ||
| 794 | commit, | ||
| 795 | ]) | ||
| 796 | .current_dir(target_repo) | ||
| 797 | .output() | ||
| 798 | .map_err(|e| format!("Failed to execute git fetch: {}", e))?; | ||
| 799 | |||
| 800 | if !output.status.success() { | ||
| 801 | let stderr = String::from_utf8_lossy(&output.stderr); | ||
| 802 | return Err(format!( | ||
| 803 | "git fetch failed for commit {}: {}", | ||
| 804 | commit, stderr | ||
| 805 | )); | ||
| 806 | } | ||
| 807 | |||
| 808 | tracing::debug!("Copied commit {} to {}", commit, target_repo.display()); | ||
| 809 | } | ||
| 810 | |||
| 811 | Ok(()) | ||
| 812 | } | ||
| 813 | |||
| 814 | /// Result of aligning a repository with authorized state | ||
| 815 | #[derive(Debug, Default)] | ||
| 816 | struct SyncAlignmentResult { | ||
| 817 | /// Number of refs created | ||
| 818 | refs_created: usize, | ||
| 819 | /// Number of refs updated | ||
| 820 | refs_updated: usize, | ||
| 821 | /// Number of refs deleted | ||
| 822 | refs_deleted: usize, | ||
| 823 | /// Whether HEAD was set | ||
| 824 | head_set: bool, | ||
| 825 | } | ||
| 826 | |||
| 827 | /// Align a repository's refs with the authorized state. | ||
| 828 | /// | ||
| 829 | /// This function: | ||
| 830 | /// 1. Deletes refs that are in the repo but not in the state (for refs/heads/ and refs/tags/) | ||
| 831 | /// 2. Updates refs that exist in state if we have the commit | ||
| 832 | /// 3. Sets HEAD if the HEAD branch's commit is available | ||
| 833 | fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) -> SyncAlignmentResult { | ||
| 834 | use crate::git; | ||
| 835 | |||
| 836 | let mut result = SyncAlignmentResult::default(); | ||
| 837 | |||
| 838 | // Check if repository exists | ||
| 839 | if !repo_path.exists() { | ||
| 840 | tracing::debug!( | ||
| 841 | "Repository not found at {}, cannot align with state", | ||
| 842 | repo_path.display() | ||
| 843 | ); | ||
| 844 | return result; | ||
| 845 | } | ||
| 846 | |||
| 847 | // Get current refs from the repository | ||
| 848 | let current_refs = match git::list_refs(repo_path) { | ||
| 849 | Ok(refs) => refs, | ||
| 850 | Err(e) => { | ||
| 851 | tracing::warn!("Failed to list refs in {}: {}", repo_path.display(), e); | ||
| 852 | return result; | ||
| 853 | } | ||
| 854 | }; | ||
| 855 | |||
| 856 | // Build expected refs from state | ||
| 857 | let mut expected_refs: std::collections::HashMap<String, String> = | ||
| 858 | std::collections::HashMap::new(); | ||
| 859 | |||
| 860 | for branch in &state.branches { | ||
| 861 | let ref_name = format!("refs/heads/{}", branch.name); | ||
| 862 | expected_refs.insert(ref_name, branch.commit.clone()); | ||
| 863 | } | ||
| 864 | |||
| 865 | for tag in &state.tags { | ||
| 866 | let ref_name = format!("refs/tags/{}", tag.name); | ||
| 867 | expected_refs.insert(ref_name, tag.commit.clone()); | ||
| 868 | } | ||
| 869 | |||
| 870 | // Delete refs that exist in repo but not in state (only for refs/heads/ and refs/tags/) | ||
| 871 | for (ref_name, _current_commit) in ¤t_refs { | ||
| 872 | if (ref_name.starts_with("refs/heads/") || ref_name.starts_with("refs/tags/")) | ||
| 873 | && !expected_refs.contains_key(ref_name) | ||
| 874 | { | ||
| 875 | match git::delete_ref(repo_path, ref_name) { | ||
| 876 | Ok(()) => { | ||
| 530 | tracing::info!( | 877 | tracing::info!( |
| 531 | identifier = %state.identifier, | 878 | "Deleted {} from {} (not in state)", |
| 532 | "All missing OIDs fetched successfully" | 879 | ref_name, |
| 880 | repo_path.display() | ||
| 881 | ); | ||
| 882 | result.refs_deleted += 1; | ||
| 883 | } | ||
| 884 | Err(e) => { | ||
| 885 | tracing::warn!( | ||
| 886 | "Failed to delete {} from {}: {}", | ||
| 887 | ref_name, | ||
| 888 | repo_path.display(), | ||
| 889 | e | ||
| 890 | ); | ||
| 891 | } | ||
| 892 | } | ||
| 893 | } | ||
| 894 | } | ||
| 895 | |||
| 896 | // Update refs that exist in state (if we have the commit) | ||
| 897 | for (ref_name, expected_commit) in &expected_refs { | ||
| 898 | // Skip symbolic refs | ||
| 899 | if expected_commit.starts_with("ref: ") { | ||
| 900 | continue; | ||
| 901 | } | ||
| 902 | |||
| 903 | // Check if we have the commit | ||
| 904 | if !git::oid_exists(repo_path, expected_commit) { | ||
| 905 | tracing::debug!( | ||
| 906 | "Commit {} not available for {} in {}", | ||
| 907 | expected_commit, | ||
| 908 | ref_name, | ||
| 909 | repo_path.display() | ||
| 910 | ); | ||
| 911 | continue; | ||
| 912 | } | ||
| 913 | |||
| 914 | // Check current value | ||
| 915 | let current_commit = current_refs | ||
| 916 | .iter() | ||
| 917 | .find(|(r, _)| r == ref_name) | ||
| 918 | .map(|(_, c)| c.as_str()); | ||
| 919 | |||
| 920 | if current_commit == Some(expected_commit.as_str()) { | ||
| 921 | // Already correct | ||
| 922 | continue; | ||
| 923 | } | ||
| 924 | |||
| 925 | // Update or create the ref | ||
| 926 | match git::update_ref(repo_path, ref_name, expected_commit) { | ||
| 927 | Ok(()) => { | ||
| 928 | if current_commit.is_some() { | ||
| 929 | tracing::info!( | ||
| 930 | "Updated {} to {} in {}", | ||
| 931 | ref_name, | ||
| 932 | expected_commit, | ||
| 933 | repo_path.display() | ||
| 934 | ); | ||
| 935 | result.refs_updated += 1; | ||
| 936 | } else { | ||
| 937 | tracing::info!( | ||
| 938 | "Created {} at {} in {}", | ||
| 939 | ref_name, | ||
| 940 | expected_commit, | ||
| 941 | repo_path.display() | ||
| 533 | ); | 942 | ); |
| 534 | return Ok(()); | 943 | result.refs_created += 1; |
| 535 | } | 944 | } |
| 536 | } | 945 | } |
| 537 | Err(e) => { | 946 | Err(e) => { |
| 538 | tracing::debug!( | 947 | tracing::warn!( |
| 539 | identifier = %state.identifier, | 948 | "Failed to update {} in {}: {}", |
| 540 | server = %server_url, | 949 | ref_name, |
| 541 | error = %e, | 950 | repo_path.display(), |
| 542 | "Failed to fetch from server" | 951 | e |
| 543 | ); | 952 | ); |
| 544 | last_error = Some(e.to_string()); | ||
| 545 | } | 953 | } |
| 546 | } | 954 | } |
| 547 | } | 955 | } |
| 548 | 956 | ||
| 549 | // Check final state | 957 | // Set HEAD if specified in state |
| 550 | let still_missing: Vec<_> = missing_oids | 958 | if let Some(head_ref) = &state.head { |
| 551 | .iter() | 959 | if let Some(branch_name) = state.get_head_branch() { |
| 552 | .filter(|oid| !oid_exists(&repo_path, oid)) | 960 | if let Some(head_commit) = state.get_branch_commit(branch_name) { |
| 553 | .collect(); | 961 | match git::try_set_head_if_available(repo_path, head_ref, head_commit) { |
| 554 | 962 | Ok(true) => { | |
| 555 | if still_missing.is_empty() { | 963 | tracing::info!( |
| 556 | Ok(()) | 964 | "Set HEAD to {} in {} (from purgatory sync)", |
| 557 | } else { | 965 | head_ref, |
| 558 | bail!( | 966 | repo_path.display() |
| 559 | "Failed to fetch {} OIDs from any server. Last error: {:?}", | 967 | ); |
| 560 | still_missing.len(), | 968 | result.head_set = true; |
| 561 | last_error | 969 | } |
| 562 | ) | 970 | Ok(false) => { |
| 971 | tracing::debug!( | ||
| 972 | "HEAD commit {} not available yet in {}", | ||
| 973 | head_commit, | ||
| 974 | repo_path.display() | ||
| 975 | ); | ||
| 976 | } | ||
| 977 | Err(e) => { | ||
| 978 | tracing::warn!("Failed to set HEAD in {}: {}", repo_path.display(), e); | ||
| 979 | } | ||
| 980 | } | ||
| 981 | } | ||
| 982 | } | ||
| 563 | } | 983 | } |
| 984 | |||
| 985 | result | ||
| 564 | } | 986 | } |
| 565 | 987 | ||
| 566 | /// Fetch missing OIDs from a remote git server. | 988 | /// Fetch missing OIDs from a remote git server. |
| @@ -582,7 +1004,10 @@ async fn fetch_missing_oids_from_server( | |||
| 582 | 1004 | ||
| 583 | tokio::task::spawn_blocking(move || { | 1005 | tokio::task::spawn_blocking(move || { |
| 584 | // Filter to only OIDs that don't already exist | 1006 | // Filter to only OIDs that don't already exist |
| 585 | let missing: Vec<&String> = oids.iter().filter(|oid| !oid_exists(&repo_path, oid)).collect(); | 1007 | let missing: Vec<&String> = oids |
| 1008 | .iter() | ||
| 1009 | .filter(|oid| !oid_exists(&repo_path, oid)) | ||
| 1010 | .collect(); | ||
| 586 | 1011 | ||
| 587 | if missing.is_empty() { | 1012 | if missing.is_empty() { |
| 588 | return Ok(0); | 1013 | return Ok(0); |