From 74979c1de32f69a39e0e290f56435ef687c2b6f6 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 14:01:47 +0000 Subject: Add RealSyncContext implementation for production purgatory sync Implement the production SyncContext that connects to real systems: - RealSyncContext struct holding purgatory, database, git_data_path, our_domain, and local_relay references - fetch_repository_data: delegates to git::authorization module - collect_needed_oids: collects commit hashes from state events (branches/tags) and PR events (c-tag) in purgatory - oid_exists: delegates to git::oid_exists function - fetch_oids: uses git fetch --depth=1 to retrieve specific OIDs from remote servers, running in spawn_blocking for async safety - process_newly_available_git_data: delegates to the unified function in git::sync module for consistent post-git-data processing - has_pending_events: delegates to purgatory method - find_target_repo: finds first existing owner repository on disk - our_domain: returns configured domain for clone URL filtering This enables the purgatory sync loop to use real database queries, git operations, and event processing instead of mocks. --- src/purgatory/sync/context.rs | 249 ++++++++++++++++++++++++++++++++++++++++++ src/purgatory/sync/mod.rs | 2 +- 2 files changed, 250 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index dea97ef..e97b708 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -167,6 +167,255 @@ pub trait SyncContext: Send + Sync { fn our_domain(&self) -> Option<&str>; } +// ============================================================================= +// Real Implementation +// ============================================================================= + +use nostr_relay_builder::LocalRelay; +use std::process::Command; +use std::sync::Arc; +use tracing::debug; + +use crate::nostr::builder::SharedDatabase; +use crate::nostr::events::RepositoryState; +use crate::purgatory::Purgatory; + +/// Real implementation of `SyncContext` that connects to actual systems. +/// +/// This is the production implementation used by the sync loop. It: +/// - Queries the database for repository data +/// - Collects needed OIDs from purgatory state and PR events +/// - Uses git commands to check OID existence and fetch from remote servers +/// - Delegates to the unified `process_newly_available_git_data` function +pub struct RealSyncContext { + /// Purgatory instance for checking pending events and collecting needed OIDs + purgatory: Arc, + + /// Database for querying repository data and saving events + database: SharedDatabase, + + /// Base path for git repositories + git_data_path: PathBuf, + + /// Our domain (to exclude from clone URLs when syncing) + our_domain_value: Option, + + /// Local relay for notifying WebSocket subscribers + local_relay: Option, +} + +impl RealSyncContext { + /// Create a new real sync context. + /// + /// # Arguments + /// * `purgatory` - Purgatory instance for pending events + /// * `database` - Database for queries and saves + /// * `git_data_path` - Base path for git repositories + /// * `our_domain` - Our domain to exclude from clone URLs + /// * `local_relay` - Local relay for WebSocket notifications + pub fn new( + purgatory: Arc, + database: SharedDatabase, + git_data_path: PathBuf, + our_domain: Option, + local_relay: Option, + ) -> Self { + Self { + purgatory, + database, + git_data_path, + our_domain_value: our_domain, + local_relay, + } + } +} + +#[async_trait] +impl SyncContext for RealSyncContext { + async fn fetch_repository_data(&self, identifier: &str) -> Result { + crate::git::authorization::fetch_repository_data(&self.database, identifier).await + } + + fn collect_needed_oids(&self, identifier: &str) -> HashSet { + let mut needed_oids = HashSet::new(); + + // Collect OIDs from state events in purgatory + for entry in self.purgatory.find_state(identifier) { + // Parse state event to extract branch/tag commits + if let Ok(state) = RepositoryState::from_event(entry.event.clone()) { + for branch in &state.branches { + // Skip symbolic refs (e.g., "ref: refs/heads/main") + if !branch.commit.starts_with("ref: ") { + needed_oids.insert(branch.commit.clone()); + } + } + for tag in &state.tags { + if !tag.commit.starts_with("ref: ") { + needed_oids.insert(tag.commit.clone()); + } + } + } + } + + // Collect OIDs from PR events in purgatory + for entry in self.purgatory.find_prs_for_identifier(identifier) { + // PR events have a commit field (from c-tag) + if !entry.commit.is_empty() { + needed_oids.insert(entry.commit.clone()); + } + } + + debug!( + identifier = %identifier, + needed_oids_count = needed_oids.len(), + "Collected needed OIDs from purgatory" + ); + + needed_oids + } + + fn oid_exists(&self, repo_path: &Path, oid: &str) -> bool { + crate::git::oid_exists(repo_path, oid) + } + + async fn fetch_oids( + &self, + repo_path: &Path, + url: &str, + oids: &[String], + ) -> Result> { + if oids.is_empty() { + return Ok(vec![]); + } + + // Filter to only OIDs that don't already exist locally + let missing: Vec<&String> = oids + .iter() + .filter(|oid| !self.oid_exists(repo_path, oid)) + .collect(); + + if missing.is_empty() { + debug!( + url = %url, + "All requested OIDs already exist locally" + ); + return Ok(oids.to_vec()); + } + + debug!( + url = %url, + missing_count = missing.len(), + "Fetching OIDs from remote server" + ); + + // Use tokio::task::spawn_blocking for the git fetch since it's blocking + let repo_path = repo_path.to_path_buf(); + let url = url.to_string(); + let missing_oids: Vec = missing.into_iter().cloned().collect(); + + let fetched = tokio::task::spawn_blocking(move || -> Vec { + // git fetch ... - fetch all OIDs in one command + let mut args = vec!["fetch", "--depth=1", &url]; + args.extend(missing_oids.iter().map(|s| s.as_str())); + + let output = Command::new("git") + .args(&args) + .current_dir(&repo_path) + .output(); + + match output { + Ok(result) if result.status.success() => { + // Count how many OIDs we now have + let fetched: Vec = missing_oids + .iter() + .filter(|oid| crate::git::oid_exists(&repo_path, oid)) + .cloned() + .collect(); + + debug!( + fetched_count = fetched.len(), + "Successfully fetched OIDs" + ); + + fetched + } + Ok(result) => { + let stderr = String::from_utf8_lossy(&result.stderr); + debug!( + stderr = %stderr, + "git fetch failed" + ); + vec![] + } + Err(e) => { + debug!( + error = %e, + "git fetch command error" + ); + vec![] + } + } + }) + .await + .map_err(|e| anyhow::anyhow!("Failed to spawn blocking task: {}", e))?; + + Ok(fetched) + } + + async fn process_newly_available_git_data( + &self, + source_repo_path: &Path, + new_oids: &HashSet, + ) -> Result { + // Delegate to the unified function from git::sync + let result = crate::git::sync::process_newly_available_git_data( + source_repo_path, + new_oids, + &self.database, + self.local_relay.as_ref(), + &self.purgatory, + &self.git_data_path, + ) + .await?; + + // Convert from git::sync::ProcessResult to our ProcessResult + Ok(ProcessResult { + states_released: result.states_released, + prs_released: result.prs_released, + repos_synced: result.repos_synced, + refs_created: result.refs_created, + refs_updated: result.refs_updated, + refs_deleted: result.refs_deleted, + errors: result.errors, + }) + } + + fn has_pending_events(&self, identifier: &str) -> bool { + self.purgatory.has_pending_events(identifier) + } + + fn find_target_repo(&self, db_repo_data: &RepositoryData) -> Option { + // Find the first owner repository that exists on disk + for announcement in &db_repo_data.announcements { + let repo_path = self.git_data_path.join(announcement.repo_path()); + if repo_path.exists() { + debug!( + repo_path = %repo_path.display(), + "Found existing repository for sync target" + ); + return Some(repo_path); + } + } + + debug!("No existing repository found for sync target"); + None + } + + fn our_domain(&self) -> Option<&str> { + self.our_domain_value.as_deref() + } +} + // ============================================================================= // Mock Implementation for Testing // ============================================================================= diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index be89130..022a556 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs @@ -13,7 +13,7 @@ mod r#loop; mod queue; mod throttle; -pub use context::{ProcessResult, SyncContext}; +pub use context::{ProcessResult, RealSyncContext, SyncContext}; pub use functions::{ get_throttled_domains_with_untried_urls, sync_identifier, sync_identifier_from_url, sync_identifier_next_url, ThrottledDomainInfo, -- cgit v1.2.3