From 3dfec1e449f260295e8c5c505dd1edb82d787c58 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 14:19:27 +0000 Subject: Wire up new purgatory sync loop, remove legacy sync_state_git_data Phase 13 of purgatory-sync-redesign: - Add sync loop startup in main.rs (RealSyncContext + ThrottleManager + start_sync_loop) - Update add_state() and add_pr() to automatically enqueue for background sync - Remove start_state_sync() call from state.rs (now handled by sync loop) - Remove orphaned legacy functions: sync_state_git_data, fetch_missing_oids_from_server, get_most_complete_local_repo, identify_missing_oids, get_date_of_most_recent_commit_on_default_branch - Clean up unused imports in purgatory/mod.rs --- src/main.rs | 20 +- src/nostr/policy/state.rs | 9 +- src/purgatory/mod.rs | 484 ++-------------------------------------------- 3 files changed, 41 insertions(+), 472 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 59edc09..b4a42af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use ngit_grasp::{ git, http, metrics::Metrics, nostr, - purgatory::Purgatory, + purgatory::{sync::RealSyncContext, sync::ThrottleManager, Purgatory}, sync::SyncManager, }; @@ -111,6 +111,24 @@ async fn main() -> Result<()> { }); info!("Purgatory cleanup task started (60s interval)"); + // Start purgatory sync loop for background git data fetching + let sync_ctx = Arc::new(RealSyncContext::new( + purgatory.clone(), + relay_with_db.database.clone(), + PathBuf::from(config.effective_git_data_path()), + Some(config.domain.clone()), + Some(relay_with_db.relay.clone()), + )); + + // Create throttle manager for rate limiting remote git servers + // Default: 5 concurrent requests per domain, 30 requests per minute per domain + let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); + throttle_manager.set_context(sync_ctx.clone()); + + // Start the sync loop + let _sync_loop_handle = purgatory.clone().start_sync_loop(sync_ctx, throttle_manager); + info!("Purgatory sync loop started (1s interval)"); + // Setup shutdown handler for purgatory cleanup let shutdown_purgatory = purgatory.clone(); let git_data_path = config.effective_git_data_path(); diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index a85e351..7d69d7d 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -152,18 +152,11 @@ impl StatePolicy { Ok(WritePolicyResult::Accept) // event should be saved and broadcast } else { // if no git data - add to purgatory + // (add_state automatically enqueues for background sync) self.ctx .purgatory .add_state(event.clone(), state.identifier.clone(), event.pubkey); - // Trigger background git data sync from remote servers - self.ctx.purgatory.start_state_sync( - state.clone(), - self.ctx.database.clone(), - Some(self.ctx.domain.clone()), - self.ctx.get_local_relay(), - ); - tracing::info!( "state event added to purgatory: eventid: {}, identifier: {}", state.event.id, diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 499e534..7045923 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -15,24 +15,16 @@ mod helpers; pub mod sync; mod types; -use anyhow::{bail, Result}; pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; use dashmap::DashMap; use nostr_sdk::prelude::*; use std::collections::HashSet; -use std::path::{Path, PathBuf}; -use std::process::Command; +use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; -use crate::git::authorization::{fetch_repository_data, pubkey_authorised_for_repo_owners, RepositoryData}; -use crate::git::oid_exists; -use crate::git::sync::sync_to_owner_repos; -use crate::nostr::builder::SharedDatabase; -use crate::nostr::events::RepositoryState; - pub use sync::SyncQueueEntry; /// Default expiry duration for purgatory entries (30 minutes) @@ -195,6 +187,9 @@ impl Purgatory { /// The event will expire after the default duration unless matched with git data. /// Multiple state events for the same identifier are allowed (from different authors). /// + /// Automatically enqueues the identifier for background sync with the default delay + /// (3 minutes), giving time for a git push to arrive after the nostr event. + /// /// # Arguments /// * `event` - The state event (kind 30618) to hold /// * `identifier` - The repository identifier from the 'd' tag @@ -209,84 +204,30 @@ impl Purgatory { expires_at: now + DEFAULT_EXPIRY, }; - self.state_events.entry(identifier).or_default().push(entry); - } + self.state_events + .entry(identifier.clone()) + .or_default() + .push(entry); - /// Trigger a background git data sync for a state event. - /// - /// This method spawns a background task to attempt fetching missing git data - /// from remote servers listed in the repository announcements. It's called - /// when a state event arrives but the required git data isn't available locally. - /// - /// After successfully syncing OIDs: - /// 1. Syncs OIDs to other owner repositories that authorize this state - /// 2. Aligns refs with state for each repository - /// 3. Saves the state event to database - /// 4. Notifies WebSocket subscribers - /// 5. Removes the event from purgatory - /// - /// # Arguments - /// * `state` - The parsed repository state event - /// * `database` - Database to query for repository announcements and save events - /// * `our_domain` - Our service domain to exclude from fetch targets - /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional) - pub fn start_state_sync( - &self, - state: RepositoryState, - database: SharedDatabase, - our_domain: Option, - local_relay: Option, - ) { - let git_data_path = self.git_data_path.clone(); - let identifier = state.identifier.clone(); - let event_id = state.event.id; - let purgatory = self.clone(); - - tokio::spawn(async move { - tracing::debug!( - identifier = %identifier, - event_id = %event_id, - "Starting background git data sync for purgatory state event" - ); - - match sync_state_git_data( - state, - &database, - &git_data_path, - our_domain.as_deref(), - local_relay.as_ref(), - &purgatory, - ) - .await - { - Ok(()) => { - tracing::info!( - identifier = %identifier, - event_id = %event_id, - "Successfully synced git data and released state event from purgatory" - ); - } - Err(e) => { - tracing::warn!( - identifier = %identifier, - event_id = %event_id, - error = %e, - "Failed to sync git data for purgatory state event" - ); - } - } - }); + // Enqueue for background sync with default delay + self.enqueue_sync_default(&identifier); } /// Add a PR event to purgatory. /// /// The event will expire after the default duration unless matched with git data. /// + /// Automatically enqueues the referenced repository identifier for background sync + /// with the default delay (3 minutes), giving time for a git push to arrive. + /// /// # Arguments /// * `event` - The PR event (kind 1617/1618) to hold /// * `event_id` - The event ID (hex string) from the 'e' tag /// * `commit` - The commit SHA from the 'c' tag pub fn add_pr(&self, event: Event, event_id: String, commit: String) { + // Extract identifier from the event's `a` tag for sync enqueueing + let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); + let now = Instant::now(); let entry = PrPurgatoryEntry { event: Some(event), @@ -296,6 +237,11 @@ impl Purgatory { }; self.pr_events.insert(event_id, entry); + + // Enqueue the identifier for background sync if we could extract it + if let Some(id) = identifier { + self.enqueue_sync_default(&id); + } } /// Add a PR placeholder (git data arrived before PR event). @@ -604,394 +550,6 @@ impl Purgatory { } } -/// Async function to sync git data for a state event from remote servers. -/// -/// This function: -/// 1. Fetches repository data from the database -/// 2. Identifies which owners authorize the state event author -/// 3. Collects clone URLs from authorized announcements -/// 4. Finds the most complete local repo to fetch into -/// 5. Identifies missing OIDs and fetches them from remote servers -/// 6. After OIDs are synced, copies them to other owner repositories -/// 7. Aligns refs with state for each authorized repository -/// 8. Saves the state event to database -/// 9. Notifies WebSocket subscribers -/// 10. Removes the event from purgatory -async fn sync_state_git_data( - state: RepositoryState, - database: &SharedDatabase, - git_data_path: &Path, - our_domain: Option<&str>, - local_relay: Option<&nostr_relay_builder::LocalRelay>, - purgatory: &Purgatory, -) -> Result<()> { - // Fetch repository data from database - let db_repo_data = fetch_repository_data(database, &state.identifier).await?; - - if db_repo_data.announcements.is_empty() { - bail!( - "No announcements found for identifier: {}", - state.identifier - ); - } - - // Find owners that authorize this pubkey as a maintainer - let repo_owners_authorising_pubkey = - pubkey_authorised_for_repo_owners(&state.event.pubkey, &db_repo_data); - - if repo_owners_authorising_pubkey.is_empty() { - bail!( - "No owners authorize pubkey {} for identifier {}", - state.event.pubkey, - state.identifier - ); - } - - // Collect clone URLs from authorized announcements, excluding our own service - let servers: HashSet = db_repo_data - .announcements - .iter() - .filter(|a| repo_owners_authorising_pubkey.contains(&a.event.pubkey.to_hex())) - .flat_map(|a| a.clone_urls.iter().cloned()) - .filter(|url| { - // Exclude our own domain if specified - if let Some(domain) = our_domain { - !url.contains(domain) - } else { - true - } - }) - .collect(); - - if servers.is_empty() { - bail!( - "No external clone URLs found for identifier: {}", - state.identifier - ); - } - - tracing::debug!( - identifier = %state.identifier, - servers = ?servers, - "Found {} external servers for git data sync", - servers.len() - ); - - // Find the most complete local repo to fetch into - let (source_repo_path, missing_oids) = - get_most_complete_local_repo(&db_repo_data, &state, git_data_path)?; - - // Fetch missing OIDs from remote servers - if !missing_oids.is_empty() { - tracing::info!( - identifier = %state.identifier, - repo_path = %source_repo_path.display(), - missing_oids = ?missing_oids, - "Attempting to fetch {} missing OIDs from remote servers", - missing_oids.len() - ); - - // Try to fetch from each server until we get all missing OIDs - let mut last_error: Option = None; - for server_url in &servers { - match fetch_missing_oids_from_server(&source_repo_path, server_url, &missing_oids).await - { - Ok(fetched) => { - if fetched > 0 { - tracing::info!( - identifier = %state.identifier, - server = %server_url, - fetched = %fetched, - "Successfully fetched git data" - ); - } - - // Check if all OIDs are now available - let still_missing: Vec<_> = missing_oids - .iter() - .filter(|oid| !oid_exists(&source_repo_path, oid)) - .collect(); - - if still_missing.is_empty() { - tracing::info!( - identifier = %state.identifier, - "All missing OIDs fetched successfully" - ); - break; - } - } - Err(e) => { - tracing::debug!( - identifier = %state.identifier, - server = %server_url, - error = %e, - "Failed to fetch from server" - ); - last_error = Some(e.to_string()); - } - } - } - - // Check final state - if still missing OIDs, fail - let still_missing: Vec<_> = missing_oids - .iter() - .filter(|oid| !oid_exists(&source_repo_path, oid)) - .collect(); - - if !still_missing.is_empty() { - bail!( - "Failed to fetch {} OIDs from any server. Last error: {:?}", - still_missing.len(), - last_error - ); - } - } else { - tracing::debug!( - identifier = %state.identifier, - repo_path = %source_repo_path.display(), - "No missing OIDs - git data is already complete" - ); - } - - // Now that we have all OIDs, sync to other owner repositories and align refs - let sync_result = sync_to_owner_repos(&source_repo_path, &state, &db_repo_data, git_data_path); - - tracing::info!( - identifier = %state.identifier, - event_id = %state.event.id, - repos_synced = sync_result.repos_synced, - "Synced git data and aligned {} repositories from purgatory", - sync_result.repos_synced - ); - - // Save state event to database - match database.save_event(&state.event).await { - Ok(_) => { - tracing::info!( - identifier = %state.identifier, - event_id = %state.event.id, - "Saved purgatory state event to database after git sync" - ); - - // Notify WebSocket subscribers - if let Some(relay) = local_relay { - if relay.notify_event(state.event.clone()) { - tracing::info!( - identifier = %state.identifier, - event_id = %state.event.id, - "Broadcast purgatory state event to websocket listeners" - ); - } else { - tracing::warn!( - identifier = %state.identifier, - event_id = %state.event.id, - "Failed to broadcast purgatory state event to websocket listeners" - ); - } - } - - // Remove from purgatory - purgatory.remove_state_event(&state.identifier, &state.event.id); - tracing::info!( - identifier = %state.identifier, - event_id = %state.event.id, - "Removed state event from purgatory after successful sync" - ); - } - Err(e) => { - tracing::warn!( - identifier = %state.identifier, - event_id = %state.event.id, - error = %e, - "Failed to save purgatory state event to database" - ); - // Don't remove from purgatory if save failed - it will retry or expire - bail!("Failed to save state event to database: {}", e); - } - } - - Ok(()) -} - -/// Fetch missing OIDs from a remote git server. -/// -/// Uses `git fetch` to retrieve specific commits from the server. -async fn fetch_missing_oids_from_server( - repo_path: &Path, - server_url: &str, - missing_oids: &[String], -) -> Result { - if missing_oids.is_empty() { - return Ok(0); - } - - // Use tokio::task::spawn_blocking for the git operations since they're blocking - let repo_path = repo_path.to_path_buf(); - let server_url = server_url.to_string(); - let oids = missing_oids.to_vec(); - - tokio::task::spawn_blocking(move || { - // Filter to only OIDs that don't already exist - let missing: Vec<&String> = oids - .iter() - .filter(|oid| !oid_exists(&repo_path, oid)) - .collect(); - - if missing.is_empty() { - return Ok(0); - } - - // git fetch ... - fetch all OIDs in one command - let mut args = vec!["fetch", "--depth=1", &server_url]; - args.extend(missing.iter().map(|s| s.as_str())); - - tracing::debug!( - oids = ?missing, - server = %server_url, - "Fetching OIDs" - ); - - let output = Command::new("git") - .args(&args) - .current_dir(&repo_path) - .output(); - - match output { - Ok(result) if result.status.success() => { - // Count how many OIDs we now have - let fetched_count = missing - .iter() - .filter(|oid| oid_exists(&repo_path, oid)) - .count(); - - tracing::debug!( - fetched_count = fetched_count, - server = %server_url, - "Successfully fetched OIDs" - ); - - Ok(fetched_count) - } - Ok(result) => { - let stderr = String::from_utf8_lossy(&result.stderr); - tracing::debug!( - oids = ?missing, - server = %server_url, - stderr = %stderr, - "git fetch failed for OIDs" - ); - Ok(0) - } - Err(e) => { - tracing::debug!( - oids = ?missing, - server = %server_url, - error = %e, - "git fetch command error" - ); - Ok(0) - } - } - }) - .await? -} - -fn get_most_complete_local_repo( - db_repo_data: &RepositoryData, - state: &RepositoryState, - git_path: &Path, -) -> Result<(PathBuf, Vec)> { - // should we filter for those where pubkey is authorised? - - let repo_onwers_authorising_pubkey = - pubkey_authorised_for_repo_owners(&state.event.pubkey, db_repo_data); - - let mut res: Option<(Timestamp, PathBuf, Vec)> = None; - for announcement in &db_repo_data.announcements { - if !repo_onwers_authorising_pubkey.contains(&announcement.event.pubkey.to_hex()) { - continue; // skip where event author isn't a maintainer - } - let repo_path = git_path.join(announcement.repo_path().clone()); - if let Ok(missing_oids) = identify_missing_oids(state, &repo_path) { - let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path) - .unwrap_or(Timestamp::zero()); - let newest_commmit_date = if let Some((d, _, _)) = &res { - d - } else { - &Timestamp::zero() - }; - if commit_date.gt(newest_commmit_date) { - res = Some((commit_date, repo_path, missing_oids)); - } - } - } - if let Some((_newest_commit_date, repo_path, missing_oids)) = res { - Ok((repo_path, missing_oids)) - } else { - bail!("no repo directories exists yet"); - } -} - -fn identify_missing_oids(state: &RepositoryState, git_repo_path: &Path) -> Result> { - if !git_repo_path.exists() { - bail!("repo directory doesn't exists"); - } - let mut missing_oids = vec![]; - for branch_state in &state.branches { - if !branch_state.commit.starts_with("ref: ") - && !oid_exists(git_repo_path, &branch_state.commit) - { - missing_oids.push(branch_state.commit.clone()); - } - } - for tag_state in &state.tags { - if !tag_state.commit.starts_with("ref: ") && !oid_exists(git_repo_path, &tag_state.commit) { - missing_oids.push(tag_state.commit.clone()); - } - } - Ok(missing_oids) -} - -fn get_date_of_most_recent_commit_on_default_branch(git_repo_path: &Path) -> Result { - if !git_repo_path.exists() { - bail!("repo directory doesn't exists"); - } - - // Get the default branch (HEAD) - let head_output = std::process::Command::new("git") - .args(["symbolic-ref", "HEAD"]) - .current_dir(git_repo_path) - .output()?; - - if !head_output.status.success() { - bail!("Failed to get repository HEAD"); - } - - let head_ref = String::from_utf8_lossy(&head_output.stdout) - .trim() - .to_string(); - - // Get the most recent commit timestamp on the default branch - // Use %ct to get the committer date as Unix timestamp - let log_output = std::process::Command::new("git") - .args(["log", "-1", "--format=%ct", &head_ref]) - .current_dir(git_repo_path) - .output()?; - - if !log_output.status.success() { - bail!("Failed to get commit timestamp for {}", head_ref); - } - - let timestamp_str = String::from_utf8_lossy(&log_output.stdout) - .trim() - .to_string(); - let unix_timestamp: u64 = timestamp_str - .parse() - .map_err(|_| anyhow::anyhow!("Failed to parse timestamp: {}", timestamp_str))?; - - Ok(Timestamp::from(unix_timestamp)) -} - #[cfg(test)] mod tests { use super::*; -- cgit v1.2.3