diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 14:01:47 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 14:01:47 +0000 |
| commit | 74979c1de32f69a39e0e290f56435ef687c2b6f6 (patch) | |
| tree | 26f235d1de053a5d1b44c4c7d642c2ad65fd0fd1 /src | |
| parent | 871ab773cd1d2fea89fdfe584d637c64694f9991 (diff) | |
Add RealSyncContext implementation for production purgatory sync
Implement the production SyncContext that connects to real systems:
- RealSyncContext struct holding purgatory, database, git_data_path,
our_domain, and local_relay references
- fetch_repository_data: delegates to git::authorization module
- collect_needed_oids: collects commit hashes from state events
(branches/tags) and PR events (c-tag) in purgatory
- oid_exists: delegates to git::oid_exists function
- fetch_oids: uses git fetch --depth=1 to retrieve specific OIDs
from remote servers, running in spawn_blocking for async safety
- process_newly_available_git_data: delegates to the unified function
in git::sync module for consistent post-git-data processing
- has_pending_events: delegates to purgatory method
- find_target_repo: finds first existing owner repository on disk
- our_domain: returns configured domain for clone URL filtering
This enables the purgatory sync loop to use real database queries,
git operations, and event processing instead of mocks.
Diffstat (limited to 'src')
| -rw-r--r-- | src/purgatory/sync/context.rs | 249 | ||||
| -rw-r--r-- | src/purgatory/sync/mod.rs | 2 |
2 files changed, 250 insertions, 1 deletions
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index dea97ef..e97b708 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -168,6 +168,255 @@ pub trait SyncContext: Send + Sync { | |||
| 168 | } | 168 | } |
| 169 | 169 | ||
| 170 | // ============================================================================= | 170 | // ============================================================================= |
| 171 | // Real Implementation | ||
| 172 | // ============================================================================= | ||
| 173 | |||
| 174 | use nostr_relay_builder::LocalRelay; | ||
| 175 | use std::process::Command; | ||
| 176 | use std::sync::Arc; | ||
| 177 | use tracing::debug; | ||
| 178 | |||
| 179 | use crate::nostr::builder::SharedDatabase; | ||
| 180 | use crate::nostr::events::RepositoryState; | ||
| 181 | use crate::purgatory::Purgatory; | ||
| 182 | |||
| 183 | /// Real implementation of `SyncContext` that connects to actual systems. | ||
| 184 | /// | ||
| 185 | /// This is the production implementation used by the sync loop. It: | ||
| 186 | /// - Queries the database for repository data | ||
| 187 | /// - Collects needed OIDs from purgatory state and PR events | ||
| 188 | /// - Uses git commands to check OID existence and fetch from remote servers | ||
| 189 | /// - Delegates to the unified `process_newly_available_git_data` function | ||
| 190 | pub struct RealSyncContext { | ||
| 191 | /// Purgatory instance for checking pending events and collecting needed OIDs | ||
| 192 | purgatory: Arc<Purgatory>, | ||
| 193 | |||
| 194 | /// Database for querying repository data and saving events | ||
| 195 | database: SharedDatabase, | ||
| 196 | |||
| 197 | /// Base path for git repositories | ||
| 198 | git_data_path: PathBuf, | ||
| 199 | |||
| 200 | /// Our domain (to exclude from clone URLs when syncing) | ||
| 201 | our_domain_value: Option<String>, | ||
| 202 | |||
| 203 | /// Local relay for notifying WebSocket subscribers | ||
| 204 | local_relay: Option<LocalRelay>, | ||
| 205 | } | ||
| 206 | |||
| 207 | impl RealSyncContext { | ||
| 208 | /// Create a new real sync context. | ||
| 209 | /// | ||
| 210 | /// # Arguments | ||
| 211 | /// * `purgatory` - Purgatory instance for pending events | ||
| 212 | /// * `database` - Database for queries and saves | ||
| 213 | /// * `git_data_path` - Base path for git repositories | ||
| 214 | /// * `our_domain` - Our domain to exclude from clone URLs | ||
| 215 | /// * `local_relay` - Local relay for WebSocket notifications | ||
| 216 | pub fn new( | ||
| 217 | purgatory: Arc<Purgatory>, | ||
| 218 | database: SharedDatabase, | ||
| 219 | git_data_path: PathBuf, | ||
| 220 | our_domain: Option<String>, | ||
| 221 | local_relay: Option<LocalRelay>, | ||
| 222 | ) -> Self { | ||
| 223 | Self { | ||
| 224 | purgatory, | ||
| 225 | database, | ||
| 226 | git_data_path, | ||
| 227 | our_domain_value: our_domain, | ||
| 228 | local_relay, | ||
| 229 | } | ||
| 230 | } | ||
| 231 | } | ||
| 232 | |||
| 233 | #[async_trait] | ||
| 234 | impl SyncContext for RealSyncContext { | ||
| 235 | async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData> { | ||
| 236 | crate::git::authorization::fetch_repository_data(&self.database, identifier).await | ||
| 237 | } | ||
| 238 | |||
| 239 | fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { | ||
| 240 | let mut needed_oids = HashSet::new(); | ||
| 241 | |||
| 242 | // Collect OIDs from state events in purgatory | ||
| 243 | for entry in self.purgatory.find_state(identifier) { | ||
| 244 | // Parse state event to extract branch/tag commits | ||
| 245 | if let Ok(state) = RepositoryState::from_event(entry.event.clone()) { | ||
| 246 | for branch in &state.branches { | ||
| 247 | // Skip symbolic refs (e.g., "ref: refs/heads/main") | ||
| 248 | if !branch.commit.starts_with("ref: ") { | ||
| 249 | needed_oids.insert(branch.commit.clone()); | ||
| 250 | } | ||
| 251 | } | ||
| 252 | for tag in &state.tags { | ||
| 253 | if !tag.commit.starts_with("ref: ") { | ||
| 254 | needed_oids.insert(tag.commit.clone()); | ||
| 255 | } | ||
| 256 | } | ||
| 257 | } | ||
| 258 | } | ||
| 259 | |||
| 260 | // Collect OIDs from PR events in purgatory | ||
| 261 | for entry in self.purgatory.find_prs_for_identifier(identifier) { | ||
| 262 | // PR events have a commit field (from c-tag) | ||
| 263 | if !entry.commit.is_empty() { | ||
| 264 | needed_oids.insert(entry.commit.clone()); | ||
| 265 | } | ||
| 266 | } | ||
| 267 | |||
| 268 | debug!( | ||
| 269 | identifier = %identifier, | ||
| 270 | needed_oids_count = needed_oids.len(), | ||
| 271 | "Collected needed OIDs from purgatory" | ||
| 272 | ); | ||
| 273 | |||
| 274 | needed_oids | ||
| 275 | } | ||
| 276 | |||
| 277 | fn oid_exists(&self, repo_path: &Path, oid: &str) -> bool { | ||
| 278 | crate::git::oid_exists(repo_path, oid) | ||
| 279 | } | ||
| 280 | |||
| 281 | async fn fetch_oids( | ||
| 282 | &self, | ||
| 283 | repo_path: &Path, | ||
| 284 | url: &str, | ||
| 285 | oids: &[String], | ||
| 286 | ) -> Result<Vec<String>> { | ||
| 287 | if oids.is_empty() { | ||
| 288 | return Ok(vec![]); | ||
| 289 | } | ||
| 290 | |||
| 291 | // Filter to only OIDs that don't already exist locally | ||
| 292 | let missing: Vec<&String> = oids | ||
| 293 | .iter() | ||
| 294 | .filter(|oid| !self.oid_exists(repo_path, oid)) | ||
| 295 | .collect(); | ||
| 296 | |||
| 297 | if missing.is_empty() { | ||
| 298 | debug!( | ||
| 299 | url = %url, | ||
| 300 | "All requested OIDs already exist locally" | ||
| 301 | ); | ||
| 302 | return Ok(oids.to_vec()); | ||
| 303 | } | ||
| 304 | |||
| 305 | debug!( | ||
| 306 | url = %url, | ||
| 307 | missing_count = missing.len(), | ||
| 308 | "Fetching OIDs from remote server" | ||
| 309 | ); | ||
| 310 | |||
| 311 | // Use tokio::task::spawn_blocking for the git fetch since it's blocking | ||
| 312 | let repo_path = repo_path.to_path_buf(); | ||
| 313 | let url = url.to_string(); | ||
| 314 | let missing_oids: Vec<String> = missing.into_iter().cloned().collect(); | ||
| 315 | |||
| 316 | let fetched = tokio::task::spawn_blocking(move || -> Vec<String> { | ||
| 317 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs in one command | ||
| 318 | let mut args = vec!["fetch", "--depth=1", &url]; | ||
| 319 | args.extend(missing_oids.iter().map(|s| s.as_str())); | ||
| 320 | |||
| 321 | let output = Command::new("git") | ||
| 322 | .args(&args) | ||
| 323 | .current_dir(&repo_path) | ||
| 324 | .output(); | ||
| 325 | |||
| 326 | match output { | ||
| 327 | Ok(result) if result.status.success() => { | ||
| 328 | // Count how many OIDs we now have | ||
| 329 | let fetched: Vec<String> = missing_oids | ||
| 330 | .iter() | ||
| 331 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) | ||
| 332 | .cloned() | ||
| 333 | .collect(); | ||
| 334 | |||
| 335 | debug!( | ||
| 336 | fetched_count = fetched.len(), | ||
| 337 | "Successfully fetched OIDs" | ||
| 338 | ); | ||
| 339 | |||
| 340 | fetched | ||
| 341 | } | ||
| 342 | Ok(result) => { | ||
| 343 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 344 | debug!( | ||
| 345 | stderr = %stderr, | ||
| 346 | "git fetch failed" | ||
| 347 | ); | ||
| 348 | vec![] | ||
| 349 | } | ||
| 350 | Err(e) => { | ||
| 351 | debug!( | ||
| 352 | error = %e, | ||
| 353 | "git fetch command error" | ||
| 354 | ); | ||
| 355 | vec![] | ||
| 356 | } | ||
| 357 | } | ||
| 358 | }) | ||
| 359 | .await | ||
| 360 | .map_err(|e| anyhow::anyhow!("Failed to spawn blocking task: {}", e))?; | ||
| 361 | |||
| 362 | Ok(fetched) | ||
| 363 | } | ||
| 364 | |||
| 365 | async fn process_newly_available_git_data( | ||
| 366 | &self, | ||
| 367 | source_repo_path: &Path, | ||
| 368 | new_oids: &HashSet<String>, | ||
| 369 | ) -> Result<ProcessResult> { | ||
| 370 | // Delegate to the unified function from git::sync | ||
| 371 | let result = crate::git::sync::process_newly_available_git_data( | ||
| 372 | source_repo_path, | ||
| 373 | new_oids, | ||
| 374 | &self.database, | ||
| 375 | self.local_relay.as_ref(), | ||
| 376 | &self.purgatory, | ||
| 377 | &self.git_data_path, | ||
| 378 | ) | ||
| 379 | .await?; | ||
| 380 | |||
| 381 | // Convert from git::sync::ProcessResult to our ProcessResult | ||
| 382 | Ok(ProcessResult { | ||
| 383 | states_released: result.states_released, | ||
| 384 | prs_released: result.prs_released, | ||
| 385 | repos_synced: result.repos_synced, | ||
| 386 | refs_created: result.refs_created, | ||
| 387 | refs_updated: result.refs_updated, | ||
| 388 | refs_deleted: result.refs_deleted, | ||
| 389 | errors: result.errors, | ||
| 390 | }) | ||
| 391 | } | ||
| 392 | |||
| 393 | fn has_pending_events(&self, identifier: &str) -> bool { | ||
| 394 | self.purgatory.has_pending_events(identifier) | ||
| 395 | } | ||
| 396 | |||
| 397 | fn find_target_repo(&self, db_repo_data: &RepositoryData) -> Option<PathBuf> { | ||
| 398 | // Find the first owner repository that exists on disk | ||
| 399 | for announcement in &db_repo_data.announcements { | ||
| 400 | let repo_path = self.git_data_path.join(announcement.repo_path()); | ||
| 401 | if repo_path.exists() { | ||
| 402 | debug!( | ||
| 403 | repo_path = %repo_path.display(), | ||
| 404 | "Found existing repository for sync target" | ||
| 405 | ); | ||
| 406 | return Some(repo_path); | ||
| 407 | } | ||
| 408 | } | ||
| 409 | |||
| 410 | debug!("No existing repository found for sync target"); | ||
| 411 | None | ||
| 412 | } | ||
| 413 | |||
| 414 | fn our_domain(&self) -> Option<&str> { | ||
| 415 | self.our_domain_value.as_deref() | ||
| 416 | } | ||
| 417 | } | ||
| 418 | |||
| 419 | // ============================================================================= | ||
| 171 | // Mock Implementation for Testing | 420 | // Mock Implementation for Testing |
| 172 | // ============================================================================= | 421 | // ============================================================================= |
| 173 | 422 | ||
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index be89130..022a556 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs | |||
| @@ -13,7 +13,7 @@ mod r#loop; | |||
| 13 | mod queue; | 13 | mod queue; |
| 14 | mod throttle; | 14 | mod throttle; |
| 15 | 15 | ||
| 16 | pub use context::{ProcessResult, SyncContext}; | 16 | pub use context::{ProcessResult, RealSyncContext, SyncContext}; |
| 17 | pub use functions::{ | 17 | pub use functions::{ |
| 18 | get_throttled_domains_with_untried_urls, sync_identifier, sync_identifier_from_url, | 18 | get_throttled_domains_with_untried_urls, sync_identifier, sync_identifier_from_url, |
| 19 | sync_identifier_next_url, ThrottledDomainInfo, | 19 | sync_identifier_next_url, ThrottledDomainInfo, |