diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 14:19:27 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 14:46:17 +0000 |
| commit | 3dfec1e449f260295e8c5c505dd1edb82d787c58 (patch) | |
| tree | f093ebfd02cbc16ca2c3c36f98601a78cf0876fd | |
| parent | 74979c1de32f69a39e0e290f56435ef687c2b6f6 (diff) | |
Wire up new purgatory sync loop, remove legacy sync_state_git_data
Phase 13 of purgatory-sync-redesign:
- Add sync loop startup in main.rs (RealSyncContext + ThrottleManager + start_sync_loop)
- Update add_state() and add_pr() to automatically enqueue for background sync
- Remove start_state_sync() call from state.rs (now handled by sync loop)
- Remove orphaned legacy functions: sync_state_git_data, fetch_missing_oids_from_server,
get_most_complete_local_repo, identify_missing_oids, get_date_of_most_recent_commit_on_default_branch
- Clean up unused imports in purgatory/mod.rs
| -rw-r--r-- | src/main.rs | 20 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 9 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 484 |
3 files changed, 41 insertions, 472 deletions
diff --git a/src/main.rs b/src/main.rs index 59edc09..b4a42af 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -11,7 +11,7 @@ use ngit_grasp::{ | |||
| 11 | git, http, | 11 | git, http, |
| 12 | metrics::Metrics, | 12 | metrics::Metrics, |
| 13 | nostr, | 13 | nostr, |
| 14 | purgatory::Purgatory, | 14 | purgatory::{sync::RealSyncContext, sync::ThrottleManager, Purgatory}, |
| 15 | sync::SyncManager, | 15 | sync::SyncManager, |
| 16 | }; | 16 | }; |
| 17 | 17 | ||
| @@ -111,6 +111,24 @@ async fn main() -> Result<()> { | |||
| 111 | }); | 111 | }); |
| 112 | info!("Purgatory cleanup task started (60s interval)"); | 112 | info!("Purgatory cleanup task started (60s interval)"); |
| 113 | 113 | ||
| 114 | // Start purgatory sync loop for background git data fetching | ||
| 115 | let sync_ctx = Arc::new(RealSyncContext::new( | ||
| 116 | purgatory.clone(), | ||
| 117 | relay_with_db.database.clone(), | ||
| 118 | PathBuf::from(config.effective_git_data_path()), | ||
| 119 | Some(config.domain.clone()), | ||
| 120 | Some(relay_with_db.relay.clone()), | ||
| 121 | )); | ||
| 122 | |||
| 123 | // Create throttle manager for rate limiting remote git servers | ||
| 124 | // Default: 5 concurrent requests per domain, 30 requests per minute per domain | ||
| 125 | let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); | ||
| 126 | throttle_manager.set_context(sync_ctx.clone()); | ||
| 127 | |||
| 128 | // Start the sync loop | ||
| 129 | let _sync_loop_handle = purgatory.clone().start_sync_loop(sync_ctx, throttle_manager); | ||
| 130 | info!("Purgatory sync loop started (1s interval)"); | ||
| 131 | |||
| 114 | // Setup shutdown handler for purgatory cleanup | 132 | // Setup shutdown handler for purgatory cleanup |
| 115 | let shutdown_purgatory = purgatory.clone(); | 133 | let shutdown_purgatory = purgatory.clone(); |
| 116 | let git_data_path = config.effective_git_data_path(); | 134 | let git_data_path = config.effective_git_data_path(); |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index a85e351..7d69d7d 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -152,18 +152,11 @@ impl StatePolicy { | |||
| 152 | Ok(WritePolicyResult::Accept) // event should be saved and broadcast | 152 | Ok(WritePolicyResult::Accept) // event should be saved and broadcast |
| 153 | } else { | 153 | } else { |
| 154 | // if no git data - add to purgatory | 154 | // if no git data - add to purgatory |
| 155 | // (add_state automatically enqueues for background sync) | ||
| 155 | self.ctx | 156 | self.ctx |
| 156 | .purgatory | 157 | .purgatory |
| 157 | .add_state(event.clone(), state.identifier.clone(), event.pubkey); | 158 | .add_state(event.clone(), state.identifier.clone(), event.pubkey); |
| 158 | 159 | ||
| 159 | // Trigger background git data sync from remote servers | ||
| 160 | self.ctx.purgatory.start_state_sync( | ||
| 161 | state.clone(), | ||
| 162 | self.ctx.database.clone(), | ||
| 163 | Some(self.ctx.domain.clone()), | ||
| 164 | self.ctx.get_local_relay(), | ||
| 165 | ); | ||
| 166 | |||
| 167 | tracing::info!( | 160 | tracing::info!( |
| 168 | "state event added to purgatory: eventid: {}, identifier: {}", | 161 | "state event added to purgatory: eventid: {}, identifier: {}", |
| 169 | state.event.id, | 162 | state.event.id, |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 499e534..7045923 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -15,24 +15,16 @@ mod helpers; | |||
| 15 | pub mod sync; | 15 | pub mod sync; |
| 16 | mod types; | 16 | mod types; |
| 17 | 17 | ||
| 18 | use anyhow::{bail, Result}; | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; | 18 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 19 | pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 20 | ||
| 22 | use dashmap::DashMap; | 21 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 22 | use nostr_sdk::prelude::*; |
| 24 | use std::collections::HashSet; | 23 | use std::collections::HashSet; |
| 25 | use std::path::{Path, PathBuf}; | 24 | use std::path::PathBuf; |
| 26 | use std::process::Command; | ||
| 27 | use std::sync::Arc; | 25 | use std::sync::Arc; |
| 28 | use std::time::{Duration, Instant}; | 26 | use std::time::{Duration, Instant}; |
| 29 | 27 | ||
| 30 | use crate::git::authorization::{fetch_repository_data, pubkey_authorised_for_repo_owners, RepositoryData}; | ||
| 31 | use crate::git::oid_exists; | ||
| 32 | use crate::git::sync::sync_to_owner_repos; | ||
| 33 | use crate::nostr::builder::SharedDatabase; | ||
| 34 | use crate::nostr::events::RepositoryState; | ||
| 35 | |||
| 36 | pub use sync::SyncQueueEntry; | 28 | pub use sync::SyncQueueEntry; |
| 37 | 29 | ||
| 38 | /// Default expiry duration for purgatory entries (30 minutes) | 30 | /// Default expiry duration for purgatory entries (30 minutes) |
| @@ -195,6 +187,9 @@ impl Purgatory { | |||
| 195 | /// The event will expire after the default duration unless matched with git data. | 187 | /// The event will expire after the default duration unless matched with git data. |
| 196 | /// Multiple state events for the same identifier are allowed (from different authors). | 188 | /// Multiple state events for the same identifier are allowed (from different authors). |
| 197 | /// | 189 | /// |
| 190 | /// Automatically enqueues the identifier for background sync with the default delay | ||
| 191 | /// (3 minutes), giving time for a git push to arrive after the nostr event. | ||
| 192 | /// | ||
| 198 | /// # Arguments | 193 | /// # Arguments |
| 199 | /// * `event` - The state event (kind 30618) to hold | 194 | /// * `event` - The state event (kind 30618) to hold |
| 200 | /// * `identifier` - The repository identifier from the 'd' tag | 195 | /// * `identifier` - The repository identifier from the 'd' tag |
| @@ -209,84 +204,30 @@ impl Purgatory { | |||
| 209 | expires_at: now + DEFAULT_EXPIRY, | 204 | expires_at: now + DEFAULT_EXPIRY, |
| 210 | }; | 205 | }; |
| 211 | 206 | ||
| 212 | self.state_events.entry(identifier).or_default().push(entry); | 207 | self.state_events |
| 213 | } | 208 | .entry(identifier.clone()) |
| 209 | .or_default() | ||
| 210 | .push(entry); | ||
| 214 | 211 | ||
| 215 | /// Trigger a background git data sync for a state event. | 212 | // Enqueue for background sync with default delay |
| 216 | /// | 213 | self.enqueue_sync_default(&identifier); |
| 217 | /// This method spawns a background task to attempt fetching missing git data | ||
| 218 | /// from remote servers listed in the repository announcements. It's called | ||
| 219 | /// when a state event arrives but the required git data isn't available locally. | ||
| 220 | /// | ||
| 221 | /// After successfully syncing OIDs: | ||
| 222 | /// 1. Syncs OIDs to other owner repositories that authorize this state | ||
| 223 | /// 2. Aligns refs with state for each repository | ||
| 224 | /// 3. Saves the state event to database | ||
| 225 | /// 4. Notifies WebSocket subscribers | ||
| 226 | /// 5. Removes the event from purgatory | ||
| 227 | /// | ||
| 228 | /// # Arguments | ||
| 229 | /// * `state` - The parsed repository state event | ||
| 230 | /// * `database` - Database to query for repository announcements and save events | ||
| 231 | /// * `our_domain` - Our service domain to exclude from fetch targets | ||
| 232 | /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) | ||
| 233 | pub fn start_state_sync( | ||
| 234 | &self, | ||
| 235 | state: RepositoryState, | ||
| 236 | database: SharedDatabase, | ||
| 237 | our_domain: Option<String>, | ||
| 238 | local_relay: Option<nostr_relay_builder::LocalRelay>, | ||
| 239 | ) { | ||
| 240 | let git_data_path = self.git_data_path.clone(); | ||
| 241 | let identifier = state.identifier.clone(); | ||
| 242 | let event_id = state.event.id; | ||
| 243 | let purgatory = self.clone(); | ||
| 244 | |||
| 245 | tokio::spawn(async move { | ||
| 246 | tracing::debug!( | ||
| 247 | identifier = %identifier, | ||
| 248 | event_id = %event_id, | ||
| 249 | "Starting background git data sync for purgatory state event" | ||
| 250 | ); | ||
| 251 | |||
| 252 | match sync_state_git_data( | ||
| 253 | state, | ||
| 254 | &database, | ||
| 255 | &git_data_path, | ||
| 256 | our_domain.as_deref(), | ||
| 257 | local_relay.as_ref(), | ||
| 258 | &purgatory, | ||
| 259 | ) | ||
| 260 | .await | ||
| 261 | { | ||
| 262 | Ok(()) => { | ||
| 263 | tracing::info!( | ||
| 264 | identifier = %identifier, | ||
| 265 | event_id = %event_id, | ||
| 266 | "Successfully synced git data and released state event from purgatory" | ||
| 267 | ); | ||
| 268 | } | ||
| 269 | Err(e) => { | ||
| 270 | tracing::warn!( | ||
| 271 | identifier = %identifier, | ||
| 272 | event_id = %event_id, | ||
| 273 | error = %e, | ||
| 274 | "Failed to sync git data for purgatory state event" | ||
| 275 | ); | ||
| 276 | } | ||
| 277 | } | ||
| 278 | }); | ||
| 279 | } | 214 | } |
| 280 | 215 | ||
| 281 | /// Add a PR event to purgatory. | 216 | /// Add a PR event to purgatory. |
| 282 | /// | 217 | /// |
| 283 | /// The event will expire after the default duration unless matched with git data. | 218 | /// The event will expire after the default duration unless matched with git data. |
| 284 | /// | 219 | /// |
| 220 | /// Automatically enqueues the referenced repository identifier for background sync | ||
| 221 | /// with the default delay (3 minutes), giving time for a git push to arrive. | ||
| 222 | /// | ||
| 285 | /// # Arguments | 223 | /// # Arguments |
| 286 | /// * `event` - The PR event (kind 1617/1618) to hold | 224 | /// * `event` - The PR event (kind 1617/1618) to hold |
| 287 | /// * `event_id` - The event ID (hex string) from the 'e' tag | 225 | /// * `event_id` - The event ID (hex string) from the 'e' tag |
| 288 | /// * `commit` - The commit SHA from the 'c' tag | 226 | /// * `commit` - The commit SHA from the 'c' tag |
| 289 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 227 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { |
| 228 | // Extract identifier from the event's `a` tag for sync enqueueing | ||
| 229 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); | ||
| 230 | |||
| 290 | let now = Instant::now(); | 231 | let now = Instant::now(); |
| 291 | let entry = PrPurgatoryEntry { | 232 | let entry = PrPurgatoryEntry { |
| 292 | event: Some(event), | 233 | event: Some(event), |
| @@ -296,6 +237,11 @@ impl Purgatory { | |||
| 296 | }; | 237 | }; |
| 297 | 238 | ||
| 298 | self.pr_events.insert(event_id, entry); | 239 | self.pr_events.insert(event_id, entry); |
| 240 | |||
| 241 | // Enqueue the identifier for background sync if we could extract it | ||
| 242 | if let Some(id) = identifier { | ||
| 243 | self.enqueue_sync_default(&id); | ||
| 244 | } | ||
| 299 | } | 245 | } |
| 300 | 246 | ||
| 301 | /// Add a PR placeholder (git data arrived before PR event). | 247 | /// Add a PR placeholder (git data arrived before PR event). |
| @@ -604,394 +550,6 @@ impl Purgatory { | |||
| 604 | } | 550 | } |
| 605 | } | 551 | } |
| 606 | 552 | ||
| 607 | /// Async function to sync git data for a state event from remote servers. | ||
| 608 | /// | ||
| 609 | /// This function: | ||
| 610 | /// 1. Fetches repository data from the database | ||
| 611 | /// 2. Identifies which owners authorize the state event author | ||
| 612 | /// 3. Collects clone URLs from authorized announcements | ||
| 613 | /// 4. Finds the most complete local repo to fetch into | ||
| 614 | /// 5. Identifies missing OIDs and fetches them from remote servers | ||
| 615 | /// 6. After OIDs are synced, copies them to other owner repositories | ||
| 616 | /// 7. Aligns refs with state for each authorized repository | ||
| 617 | /// 8. Saves the state event to database | ||
| 618 | /// 9. Notifies WebSocket subscribers | ||
| 619 | /// 10. Removes the event from purgatory | ||
| 620 | async fn sync_state_git_data( | ||
| 621 | state: RepositoryState, | ||
| 622 | database: &SharedDatabase, | ||
| 623 | git_data_path: &Path, | ||
| 624 | our_domain: Option<&str>, | ||
| 625 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 626 | purgatory: &Purgatory, | ||
| 627 | ) -> Result<()> { | ||
| 628 | // Fetch repository data from database | ||
| 629 | let db_repo_data = fetch_repository_data(database, &state.identifier).await?; | ||
| 630 | |||
| 631 | if db_repo_data.announcements.is_empty() { | ||
| 632 | bail!( | ||
| 633 | "No announcements found for identifier: {}", | ||
| 634 | state.identifier | ||
| 635 | ); | ||
| 636 | } | ||
| 637 | |||
| 638 | // Find owners that authorize this pubkey as a maintainer | ||
| 639 | let repo_owners_authorising_pubkey = | ||
| 640 | pubkey_authorised_for_repo_owners(&state.event.pubkey, &db_repo_data); | ||
| 641 | |||
| 642 | if repo_owners_authorising_pubkey.is_empty() { | ||
| 643 | bail!( | ||
| 644 | "No owners authorize pubkey {} for identifier {}", | ||
| 645 | state.event.pubkey, | ||
| 646 | state.identifier | ||
| 647 | ); | ||
| 648 | } | ||
| 649 | |||
| 650 | // Collect clone URLs from authorized announcements, excluding our own service | ||
| 651 | let servers: HashSet<String> = db_repo_data | ||
| 652 | .announcements | ||
| 653 | .iter() | ||
| 654 | .filter(|a| repo_owners_authorising_pubkey.contains(&a.event.pubkey.to_hex())) | ||
| 655 | .flat_map(|a| a.clone_urls.iter().cloned()) | ||
| 656 | .filter(|url| { | ||
| 657 | // Exclude our own domain if specified | ||
| 658 | if let Some(domain) = our_domain { | ||
| 659 | !url.contains(domain) | ||
| 660 | } else { | ||
| 661 | true | ||
| 662 | } | ||
| 663 | }) | ||
| 664 | .collect(); | ||
| 665 | |||
| 666 | if servers.is_empty() { | ||
| 667 | bail!( | ||
| 668 | "No external clone URLs found for identifier: {}", | ||
| 669 | state.identifier | ||
| 670 | ); | ||
| 671 | } | ||
| 672 | |||
| 673 | tracing::debug!( | ||
| 674 | identifier = %state.identifier, | ||
| 675 | servers = ?servers, | ||
| 676 | "Found {} external servers for git data sync", | ||
| 677 | servers.len() | ||
| 678 | ); | ||
| 679 | |||
| 680 | // Find the most complete local repo to fetch into | ||
| 681 | let (source_repo_path, missing_oids) = | ||
| 682 | get_most_complete_local_repo(&db_repo_data, &state, git_data_path)?; | ||
| 683 | |||
| 684 | // Fetch missing OIDs from remote servers | ||
| 685 | if !missing_oids.is_empty() { | ||
| 686 | tracing::info!( | ||
| 687 | identifier = %state.identifier, | ||
| 688 | repo_path = %source_repo_path.display(), | ||
| 689 | missing_oids = ?missing_oids, | ||
| 690 | "Attempting to fetch {} missing OIDs from remote servers", | ||
| 691 | missing_oids.len() | ||
| 692 | ); | ||
| 693 | |||
| 694 | // Try to fetch from each server until we get all missing OIDs | ||
| 695 | let mut last_error: Option<String> = None; | ||
| 696 | for server_url in &servers { | ||
| 697 | match fetch_missing_oids_from_server(&source_repo_path, server_url, &missing_oids).await | ||
| 698 | { | ||
| 699 | Ok(fetched) => { | ||
| 700 | if fetched > 0 { | ||
| 701 | tracing::info!( | ||
| 702 | identifier = %state.identifier, | ||
| 703 | server = %server_url, | ||
| 704 | fetched = %fetched, | ||
| 705 | "Successfully fetched git data" | ||
| 706 | ); | ||
| 707 | } | ||
| 708 | |||
| 709 | // Check if all OIDs are now available | ||
| 710 | let still_missing: Vec<_> = missing_oids | ||
| 711 | .iter() | ||
| 712 | .filter(|oid| !oid_exists(&source_repo_path, oid)) | ||
| 713 | .collect(); | ||
| 714 | |||
| 715 | if still_missing.is_empty() { | ||
| 716 | tracing::info!( | ||
| 717 | identifier = %state.identifier, | ||
| 718 | "All missing OIDs fetched successfully" | ||
| 719 | ); | ||
| 720 | break; | ||
| 721 | } | ||
| 722 | } | ||
| 723 | Err(e) => { | ||
| 724 | tracing::debug!( | ||
| 725 | identifier = %state.identifier, | ||
| 726 | server = %server_url, | ||
| 727 | error = %e, | ||
| 728 | "Failed to fetch from server" | ||
| 729 | ); | ||
| 730 | last_error = Some(e.to_string()); | ||
| 731 | } | ||
| 732 | } | ||
| 733 | } | ||
| 734 | |||
| 735 | // Check final state - if still missing OIDs, fail | ||
| 736 | let still_missing: Vec<_> = missing_oids | ||
| 737 | .iter() | ||
| 738 | .filter(|oid| !oid_exists(&source_repo_path, oid)) | ||
| 739 | .collect(); | ||
| 740 | |||
| 741 | if !still_missing.is_empty() { | ||
| 742 | bail!( | ||
| 743 | "Failed to fetch {} OIDs from any server. Last error: {:?}", | ||
| 744 | still_missing.len(), | ||
| 745 | last_error | ||
| 746 | ); | ||
| 747 | } | ||
| 748 | } else { | ||
| 749 | tracing::debug!( | ||
| 750 | identifier = %state.identifier, | ||
| 751 | repo_path = %source_repo_path.display(), | ||
| 752 | "No missing OIDs - git data is already complete" | ||
| 753 | ); | ||
| 754 | } | ||
| 755 | |||
| 756 | // Now that we have all OIDs, sync to other owner repositories and align refs | ||
| 757 | let sync_result = sync_to_owner_repos(&source_repo_path, &state, &db_repo_data, git_data_path); | ||
| 758 | |||
| 759 | tracing::info!( | ||
| 760 | identifier = %state.identifier, | ||
| 761 | event_id = %state.event.id, | ||
| 762 | repos_synced = sync_result.repos_synced, | ||
| 763 | "Synced git data and aligned {} repositories from purgatory", | ||
| 764 | sync_result.repos_synced | ||
| 765 | ); | ||
| 766 | |||
| 767 | // Save state event to database | ||
| 768 | match database.save_event(&state.event).await { | ||
| 769 | Ok(_) => { | ||
| 770 | tracing::info!( | ||
| 771 | identifier = %state.identifier, | ||
| 772 | event_id = %state.event.id, | ||
| 773 | "Saved purgatory state event to database after git sync" | ||
| 774 | ); | ||
| 775 | |||
| 776 | // Notify WebSocket subscribers | ||
| 777 | if let Some(relay) = local_relay { | ||
| 778 | if relay.notify_event(state.event.clone()) { | ||
| 779 | tracing::info!( | ||
| 780 | identifier = %state.identifier, | ||
| 781 | event_id = %state.event.id, | ||
| 782 | "Broadcast purgatory state event to websocket listeners" | ||
| 783 | ); | ||
| 784 | } else { | ||
| 785 | tracing::warn!( | ||
| 786 | identifier = %state.identifier, | ||
| 787 | event_id = %state.event.id, | ||
| 788 | "Failed to broadcast purgatory state event to websocket listeners" | ||
| 789 | ); | ||
| 790 | } | ||
| 791 | } | ||
| 792 | |||
| 793 | // Remove from purgatory | ||
| 794 | purgatory.remove_state_event(&state.identifier, &state.event.id); | ||
| 795 | tracing::info!( | ||
| 796 | identifier = %state.identifier, | ||
| 797 | event_id = %state.event.id, | ||
| 798 | "Removed state event from purgatory after successful sync" | ||
| 799 | ); | ||
| 800 | } | ||
| 801 | Err(e) => { | ||
| 802 | tracing::warn!( | ||
| 803 | identifier = %state.identifier, | ||
| 804 | event_id = %state.event.id, | ||
| 805 | error = %e, | ||
| 806 | "Failed to save purgatory state event to database" | ||
| 807 | ); | ||
| 808 | // Don't remove from purgatory if save failed - it will retry or expire | ||
| 809 | bail!("Failed to save state event to database: {}", e); | ||
| 810 | } | ||
| 811 | } | ||
| 812 | |||
| 813 | Ok(()) | ||
| 814 | } | ||
| 815 | |||
| 816 | /// Fetch missing OIDs from a remote git server. | ||
| 817 | /// | ||
| 818 | /// Uses `git fetch` to retrieve specific commits from the server. | ||
| 819 | async fn fetch_missing_oids_from_server( | ||
| 820 | repo_path: &Path, | ||
| 821 | server_url: &str, | ||
| 822 | missing_oids: &[String], | ||
| 823 | ) -> Result<usize> { | ||
| 824 | if missing_oids.is_empty() { | ||
| 825 | return Ok(0); | ||
| 826 | } | ||
| 827 | |||
| 828 | // Use tokio::task::spawn_blocking for the git operations since they're blocking | ||
| 829 | let repo_path = repo_path.to_path_buf(); | ||
| 830 | let server_url = server_url.to_string(); | ||
| 831 | let oids = missing_oids.to_vec(); | ||
| 832 | |||
| 833 | tokio::task::spawn_blocking(move || { | ||
| 834 | // Filter to only OIDs that don't already exist | ||
| 835 | let missing: Vec<&String> = oids | ||
| 836 | .iter() | ||
| 837 | .filter(|oid| !oid_exists(&repo_path, oid)) | ||
| 838 | .collect(); | ||
| 839 | |||
| 840 | if missing.is_empty() { | ||
| 841 | return Ok(0); | ||
| 842 | } | ||
| 843 | |||
| 844 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs in one command | ||
| 845 | let mut args = vec!["fetch", "--depth=1", &server_url]; | ||
| 846 | args.extend(missing.iter().map(|s| s.as_str())); | ||
| 847 | |||
| 848 | tracing::debug!( | ||
| 849 | oids = ?missing, | ||
| 850 | server = %server_url, | ||
| 851 | "Fetching OIDs" | ||
| 852 | ); | ||
| 853 | |||
| 854 | let output = Command::new("git") | ||
| 855 | .args(&args) | ||
| 856 | .current_dir(&repo_path) | ||
| 857 | .output(); | ||
| 858 | |||
| 859 | match output { | ||
| 860 | Ok(result) if result.status.success() => { | ||
| 861 | // Count how many OIDs we now have | ||
| 862 | let fetched_count = missing | ||
| 863 | .iter() | ||
| 864 | .filter(|oid| oid_exists(&repo_path, oid)) | ||
| 865 | .count(); | ||
| 866 | |||
| 867 | tracing::debug!( | ||
| 868 | fetched_count = fetched_count, | ||
| 869 | server = %server_url, | ||
| 870 | "Successfully fetched OIDs" | ||
| 871 | ); | ||
| 872 | |||
| 873 | Ok(fetched_count) | ||
| 874 | } | ||
| 875 | Ok(result) => { | ||
| 876 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 877 | tracing::debug!( | ||
| 878 | oids = ?missing, | ||
| 879 | server = %server_url, | ||
| 880 | stderr = %stderr, | ||
| 881 | "git fetch failed for OIDs" | ||
| 882 | ); | ||
| 883 | Ok(0) | ||
| 884 | } | ||
| 885 | Err(e) => { | ||
| 886 | tracing::debug!( | ||
| 887 | oids = ?missing, | ||
| 888 | server = %server_url, | ||
| 889 | error = %e, | ||
| 890 | "git fetch command error" | ||
| 891 | ); | ||
| 892 | Ok(0) | ||
| 893 | } | ||
| 894 | } | ||
| 895 | }) | ||
| 896 | .await? | ||
| 897 | } | ||
| 898 | |||
| 899 | fn get_most_complete_local_repo( | ||
| 900 | db_repo_data: &RepositoryData, | ||
| 901 | state: &RepositoryState, | ||
| 902 | git_path: &Path, | ||
| 903 | ) -> Result<(PathBuf, Vec<String>)> { | ||
| 904 | // should we filter for those where pubkey is authorised? | ||
| 905 | |||
| 906 | let repo_onwers_authorising_pubkey = | ||
| 907 | pubkey_authorised_for_repo_owners(&state.event.pubkey, db_repo_data); | ||
| 908 | |||
| 909 | let mut res: Option<(Timestamp, PathBuf, Vec<String>)> = None; | ||
| 910 | for announcement in &db_repo_data.announcements { | ||
| 911 | if !repo_onwers_authorising_pubkey.contains(&announcement.event.pubkey.to_hex()) { | ||
| 912 | continue; // skip where event author isn't a maintainer | ||
| 913 | } | ||
| 914 | let repo_path = git_path.join(announcement.repo_path().clone()); | ||
| 915 | if let Ok(missing_oids) = identify_missing_oids(state, &repo_path) { | ||
| 916 | let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path) | ||
| 917 | .unwrap_or(Timestamp::zero()); | ||
| 918 | let newest_commmit_date = if let Some((d, _, _)) = &res { | ||
| 919 | d | ||
| 920 | } else { | ||
| 921 | &Timestamp::zero() | ||
| 922 | }; | ||
| 923 | if commit_date.gt(newest_commmit_date) { | ||
| 924 | res = Some((commit_date, repo_path, missing_oids)); | ||
| 925 | } | ||
| 926 | } | ||
| 927 | } | ||
| 928 | if let Some((_newest_commit_date, repo_path, missing_oids)) = res { | ||
| 929 | Ok((repo_path, missing_oids)) | ||
| 930 | } else { | ||
| 931 | bail!("no repo directories exists yet"); | ||
| 932 | } | ||
| 933 | } | ||
| 934 | |||
| 935 | fn identify_missing_oids(state: &RepositoryState, git_repo_path: &Path) -> Result<Vec<String>> { | ||
| 936 | if !git_repo_path.exists() { | ||
| 937 | bail!("repo directory doesn't exists"); | ||
| 938 | } | ||
| 939 | let mut missing_oids = vec![]; | ||
| 940 | for branch_state in &state.branches { | ||
| 941 | if !branch_state.commit.starts_with("ref: ") | ||
| 942 | && !oid_exists(git_repo_path, &branch_state.commit) | ||
| 943 | { | ||
| 944 | missing_oids.push(branch_state.commit.clone()); | ||
| 945 | } | ||
| 946 | } | ||
| 947 | for tag_state in &state.tags { | ||
| 948 | if !tag_state.commit.starts_with("ref: ") && !oid_exists(git_repo_path, &tag_state.commit) { | ||
| 949 | missing_oids.push(tag_state.commit.clone()); | ||
| 950 | } | ||
| 951 | } | ||
| 952 | Ok(missing_oids) | ||
| 953 | } | ||
| 954 | |||
| 955 | fn get_date_of_most_recent_commit_on_default_branch(git_repo_path: &Path) -> Result<Timestamp> { | ||
| 956 | if !git_repo_path.exists() { | ||
| 957 | bail!("repo directory doesn't exists"); | ||
| 958 | } | ||
| 959 | |||
| 960 | // Get the default branch (HEAD) | ||
| 961 | let head_output = std::process::Command::new("git") | ||
| 962 | .args(["symbolic-ref", "HEAD"]) | ||
| 963 | .current_dir(git_repo_path) | ||
| 964 | .output()?; | ||
| 965 | |||
| 966 | if !head_output.status.success() { | ||
| 967 | bail!("Failed to get repository HEAD"); | ||
| 968 | } | ||
| 969 | |||
| 970 | let head_ref = String::from_utf8_lossy(&head_output.stdout) | ||
| 971 | .trim() | ||
| 972 | .to_string(); | ||
| 973 | |||
| 974 | // Get the most recent commit timestamp on the default branch | ||
| 975 | // Use %ct to get the committer date as Unix timestamp | ||
| 976 | let log_output = std::process::Command::new("git") | ||
| 977 | .args(["log", "-1", "--format=%ct", &head_ref]) | ||
| 978 | .current_dir(git_repo_path) | ||
| 979 | .output()?; | ||
| 980 | |||
| 981 | if !log_output.status.success() { | ||
| 982 | bail!("Failed to get commit timestamp for {}", head_ref); | ||
| 983 | } | ||
| 984 | |||
| 985 | let timestamp_str = String::from_utf8_lossy(&log_output.stdout) | ||
| 986 | .trim() | ||
| 987 | .to_string(); | ||
| 988 | let unix_timestamp: u64 = timestamp_str | ||
| 989 | .parse() | ||
| 990 | .map_err(|_| anyhow::anyhow!("Failed to parse timestamp: {}", timestamp_str))?; | ||
| 991 | |||
| 992 | Ok(Timestamp::from(unix_timestamp)) | ||
| 993 | } | ||
| 994 | |||
| 995 | #[cfg(test)] | 553 | #[cfg(test)] |
| 996 | mod tests { | 554 | mod tests { |
| 997 | use super::*; | 555 | use super::*; |