From 8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 11:57:55 +0000 Subject: Add sync_identifier orchestration and ThrottleManager queue processing Implement the main sync orchestration function and trigger-based queue processing for throttled domains: sync_identifier function: - Orchestrates syncing git data for a single identifier - Tries all non-throttled URLs in sequence - Checks completion after each fetch (no pending events or all OIDs fetched) - Enqueues with throttled domains when non-throttled URLs are exhausted - Returns true if complete, false if events remain (for backoff) ThrottleManager enhancements: - Add set_context() to provide SyncContext for queue processing - Add try_process_next() to spawn tasks when capacity frees - Add process_queued_identifier() to handle queued work - Update complete_request() to trigger processing on completion - Update enqueue_identifier() to trigger processing when capacity available - Add internal methods for non-Arc testing compatibility Generic function updates: - Add ?Sized bound to sync_identifier_next_url, sync_identifier_from_url, sync_identifier, and get_throttled_domains_with_untried_urls for dynamic dispatch support (Arc) Tests: - sync_identifier_tries_multiple_urls_until_complete: verifies sequential URL fetching until all OIDs are available - sync_identifier_enqueues_throttled_domains_when_incomplete: verifies throttled domains get the identifier enqueued for later processing - has_queued_work_reflects_queue_state: verifies queue state tracking --- src/purgatory/sync/functions.rs | 228 +++++++++++++++++++++++++++++++- src/purgatory/sync/mod.rs | 4 +- src/purgatory/sync/throttle.rs | 278 ++++++++++++++++++++++++++++++++++------ 3 files changed, 469 insertions(+), 41 deletions(-) (limited to 'src/purgatory/sync') diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index de846f3..751dd5e 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs @@ -55,7 +55,7 @@ fn extract_domain(url: &str) -> Option { /// /// * `Some(url)` - The next URL to try /// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) -pub async fn sync_identifier_next_url( +pub async fn sync_identifier_next_url( ctx: &C, identifier: &str, domain: Option<&str>, @@ -178,7 +178,7 @@ pub struct ThrottledDomainInfo { /// /// A list of throttled domains that still have untried URLs, along with /// the tried URLs for each domain (for proper queue state). -pub async fn get_throttled_domains_with_untried_urls( +pub async fn get_throttled_domains_with_untried_urls( ctx: &C, identifier: &str, tried_urls: &HashSet, @@ -254,7 +254,7 @@ pub async fn get_throttled_domains_with_untried_urls( /// # Returns /// /// The number of OIDs successfully fetched (0 on failure) -pub async fn sync_identifier_from_url( +pub async fn sync_identifier_from_url( ctx: &C, identifier: &str, url: &str, @@ -347,6 +347,128 @@ pub async fn sync_identifier_from_url( oids_fetched } +/// Sync git data for an identifier. +/// +/// This is the main orchestration function called by the sync loop. It: +/// 1. Tries all non-throttled URLs in sequence +/// 2. After each fetch, checks if sync is complete (no pending events or no needed OIDs) +/// 3. When no non-throttled URLs remain, enqueues with throttled domains for later processing +/// 4. Returns without waiting for throttled domains to complete +/// +/// # Arguments +/// +/// * `ctx` - The sync context providing repository data and OID information +/// * `identifier` - The repository identifier (d-tag value) +/// * `throttle_manager` - Used for rate limiting and domain queue management +/// +/// # Returns +/// +/// * `true` - Sync completed (no pending events or all OIDs fetched) +/// * `false` - Events remain in purgatory (will be retried after backoff, or processed +/// by throttled domain queues) +pub async fn sync_identifier( + ctx: &C, + identifier: &str, + throttle_manager: &Arc, +) -> bool { + let mut tried_urls: HashSet = HashSet::new(); + + debug!( + identifier = %identifier, + "Starting sync for identifier" + ); + + // Try all non-throttled URLs + loop { + match sync_identifier_next_url(ctx, identifier, None, &tried_urls, throttle_manager).await { + Some(url) => { + debug!( + identifier = %identifier, + url = %url, + "Found non-throttled URL to try" + ); + + // Fetch from this URL + sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await; + tried_urls.insert(url); + + // Check if sync is now complete + if !ctx.has_pending_events(identifier) { + debug!( + identifier = %identifier, + "Sync complete - no pending events" + ); + return true; + } + + let needed_oids = ctx.collect_needed_oids(identifier); + if needed_oids.is_empty() { + debug!( + identifier = %identifier, + "Sync complete - all OIDs available" + ); + return true; + } + + // Continue trying more URLs + } + None => { + // No more non-throttled URLs available + debug!( + identifier = %identifier, + tried_count = tried_urls.len(), + "No more non-throttled URLs available" + ); + break; + } + } + } + + // Check if we're done (no pending events or no needed OIDs) + if !ctx.has_pending_events(identifier) { + debug!( + identifier = %identifier, + "Sync complete after exhausting URLs - no pending events" + ); + return true; + } + + let needed_oids = ctx.collect_needed_oids(identifier); + if needed_oids.is_empty() { + debug!( + identifier = %identifier, + "Sync complete after exhausting URLs - all OIDs available" + ); + return true; + } + + // Enqueue with any throttled domains that have untried URLs + let throttled_domains = + get_throttled_domains_with_untried_urls(ctx, identifier, &tried_urls, throttle_manager) + .await; + + for info in throttled_domains { + debug!( + identifier = %identifier, + domain = %info.domain, + "Enqueueing identifier with throttled domain" + ); + throttle_manager.enqueue_identifier( + &info.domain, + identifier.to_string(), + info.tried_urls_for_domain, + ); + } + + // Return false - events remain, will retry after backoff + // (throttled domains will process independently) + debug!( + identifier = %identifier, + "Sync incomplete - returning false for backoff" + ); + false +} + #[cfg(test)] mod tests { use super::*; @@ -617,4 +739,104 @@ mod tests { assert_eq!(throttled[0].domain, "gitlab.com"); assert!(throttled[0].tried_urls_for_domain.is_empty()); } + + // ========================================================================= + // Phase 6: sync_identifier tests + // ========================================================================= + + #[tokio::test] + async fn sync_identifier_tries_multiple_urls_until_complete() { + // Set up mock with 3 URLs, each providing partial OIDs + // URL1 provides abc123, URL2 provides def456, URL3 provides ghi789 + let mock = MockSyncContext::new() + .with_urls(&[ + "https://server1.com/repo.git", + "https://server2.com/repo.git", + "https://server3.com/repo.git", + ]) + .with_needed_oids(&["abc123", "def456", "ghi789"]) + .with_pending_events(true) + .url_provides("https://server1.com/repo.git", &["abc123"]) + .url_provides("https://server2.com/repo.git", &["def456"]) + .url_provides("https://server3.com/repo.git", &["ghi789"]); + + let throttle_manager = Arc::new(ThrottleManager::new(5, 100)); + + // Run sync_identifier + let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; + + // Should return true (sync complete) + assert!(complete, "Expected sync to complete after trying all URLs"); + + // Should have tried all 3 URLs + let fetch_log = mock.fetch_log(); + assert_eq!( + fetch_log.len(), + 3, + "Expected 3 fetch attempts, got: {:?}", + fetch_log + ); + + // All OIDs should now be fetched + assert!( + mock.current_needed_oids().is_empty(), + "Expected all OIDs to be fetched" + ); + } + + #[tokio::test] + async fn sync_identifier_enqueues_throttled_domains_when_incomplete() { + // Set up mock with URLs from two domains + // Only github.com can provide the OID, but it will be throttled + let mock = MockSyncContext::new() + .with_urls(&[ + "https://github.com/foo/bar.git", + "https://gitlab.com/foo/bar.git", + ]) + .with_needed_oids(&["abc123"]) + .with_pending_events(true) + .url_provides("https://github.com/foo/bar.git", &["abc123"]); + // Note: gitlab.com doesn't provide any OIDs + + let throttle_manager = Arc::new(ThrottleManager::new(1, 100)); + + // Throttle github.com by starting a request + throttle_manager.start_request("github.com"); + + // Run sync_identifier + let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; + + // Should return false (sync incomplete - github.com is throttled) + assert!( + !complete, + "Expected sync to be incomplete when required domain is throttled" + ); + + // Should have tried gitlab.com (not throttled) but it doesn't have the OID + let fetch_log = mock.fetch_log(); + assert_eq!( + fetch_log.len(), + 1, + "Expected 1 fetch attempt (gitlab.com), got: {:?}", + fetch_log + ); + assert!( + fetch_log[0].contains("gitlab.com"), + "Expected gitlab.com to be tried first" + ); + + // OID should still be needed + assert!( + mock.current_needed_oids().contains("abc123"), + "Expected OID to still be needed" + ); + + // github.com should have the identifier enqueued + // We can verify this by checking if github.com is still throttled (it should be, + // since the identifier was enqueued but not processed yet) + assert!( + throttle_manager.is_throttled("github.com"), + "Expected github.com to still be throttled" + ); + } } diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index d26c1f0..8ac9216 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs @@ -13,8 +13,8 @@ mod throttle; pub use context::{ProcessResult, SyncContext}; pub use functions::{ - get_throttled_domains_with_untried_urls, sync_identifier_from_url, sync_identifier_next_url, - ThrottledDomainInfo, + get_throttled_domains_with_untried_urls, sync_identifier, sync_identifier_from_url, + sync_identifier_next_url, ThrottledDomainInfo, }; pub use queue::SyncQueueEntry; pub use throttle::{DomainThrottle, ThrottleManager}; diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index 94056b5..a310a91 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs @@ -8,12 +8,22 @@ //! //! The `ThrottleManager` owns all `DomainThrottle` instances and provides the //! interface for checking throttle status and managing identifier queues. +//! +//! ## Trigger-based Processing +//! +//! When capacity frees up (via `complete_request`) or a new identifier is enqueued +//! (via `enqueue_identifier`), the manager automatically spawns tasks to process +//! queued identifiers. This is trigger-based, not polling-based. use dashmap::DashMap; use indexmap::IndexMap; use std::collections::{HashSet, VecDeque}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex, OnceLock}; use std::time::{Duration, Instant}; +use tracing::debug; + +use super::context::SyncContext; +use super::functions::{sync_identifier_from_url, sync_identifier_next_url}; /// State for an identifier waiting in a domain's queue. /// @@ -240,9 +250,7 @@ impl DomainThrottle { /// - Throttle status checking for `sync_identifier_next_url` /// - Identifier queue management /// - Request tracking (start/complete) -/// -/// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`) -/// will be added in Phase 6 after `SyncContext` is available. +/// - Trigger-based queue processing when capacity frees up pub struct ThrottleManager { /// Per-domain throttle state. /// Uses DashMap for concurrent access from multiple sync tasks. @@ -253,6 +261,10 @@ pub struct ThrottleManager { /// Maximum requests per minute per domain. max_per_minute_per_domain: u32, + + /// Sync context for processing queued identifiers. + /// Set once at startup via `set_context()`. + ctx: OnceLock>, } impl ThrottleManager { @@ -266,38 +278,54 @@ impl ThrottleManager { throttles: DashMap::new(), max_concurrent_per_domain: max_concurrent, max_per_minute_per_domain: max_per_minute, + ctx: OnceLock::new(), } } + /// Set the sync context (called once at startup). + /// + /// The context is used for processing queued identifiers when capacity + /// becomes available. Must be called before any trigger-based processing + /// can occur. + /// + /// # Arguments + /// * `ctx` - The sync context implementation + pub fn set_context(&self, ctx: Arc) { + let _ = self.ctx.set(ctx); + } + /// Check if a domain is currently throttled (at capacity). /// /// Returns true if the domain has no capacity for another request, /// either due to concurrent limit or rate limit. pub fn is_throttled(&self, domain: &str) -> bool { - self.throttles - .get(domain) - .map_or(false, |entry| { - let throttle = entry.lock().unwrap(); - !throttle.has_capacity() - }) + self.throttles.get(domain).map_or(false, |entry| { + let throttle = entry.lock().unwrap(); + !throttle.has_capacity() + }) } /// Get or create a throttle for a domain. - fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex> { + fn get_or_create_throttle( + &self, + domain: &str, + ) -> dashmap::mapref::one::Ref<'_, String, Mutex> { // First, try to get existing if let Some(entry) = self.throttles.get(domain) { return entry; } - + // Create new throttle - self.throttles.entry(domain.to_string()).or_insert_with(|| { - Mutex::new(DomainThrottle::new( - domain.to_string(), - self.max_concurrent_per_domain, - self.max_per_minute_per_domain, - )) - }); - + self.throttles + .entry(domain.to_string()) + .or_insert_with(|| { + Mutex::new(DomainThrottle::new( + domain.to_string(), + self.max_concurrent_per_domain, + self.max_per_minute_per_domain, + )) + }); + // Return the entry (we know it exists now) self.throttles.get(domain).unwrap() } @@ -311,39 +339,193 @@ impl ThrottleManager { throttle.start_request(); } - /// Record that a request completed for a domain. + /// Record that a request completed for a domain (internal, no trigger). /// /// Decrements in-flight count and cleans up old timestamps. - /// - /// Note: Trigger-based processing (spawning tasks when capacity frees) - /// will be added in Phase 6 after `SyncContext` is available. - pub fn complete_request(&self, domain: &str) { + /// Does not trigger processing of queued identifiers. + fn complete_request_internal(&self, domain: &str) { if let Some(entry) = self.throttles.get(domain) { let mut throttle = entry.lock().unwrap(); throttle.complete_request(); } } - /// Add an identifier to a domain's waiting queue. + /// Record that a request completed for a domain. + /// + /// Decrements in-flight count, cleans up old timestamps, and triggers + /// processing of queued identifiers if capacity is available. + /// + /// # Arguments + /// * `domain` - The domain that completed a request + pub fn complete_request(self: &Arc, domain: &str) { + let should_trigger = { + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.complete_request(); + throttle.has_capacity() && throttle.has_queued_work() + } else { + false + } + }; + + if should_trigger { + self.try_process_next(domain); + } + } + + /// Add an identifier to a domain's waiting queue (internal, no trigger). /// /// If the identifier is already queued for this domain, merges the tried_urls sets. + /// Does not trigger processing. + fn enqueue_identifier_internal( + &self, + domain: &str, + identifier: String, + tried_urls_for_domain: HashSet, + ) { + let entry = self.get_or_create_throttle(domain); + let mut throttle = entry.lock().unwrap(); + throttle.enqueue_identifier(identifier, tried_urls_for_domain); + } + + /// Add an identifier to a domain's waiting queue. /// - /// Note: Trigger-based processing (spawning tasks when capacity available) - /// will be added in Phase 6 after `SyncContext` is available. + /// If the identifier is already queued for this domain, merges the tried_urls sets. + /// Triggers processing if capacity is available. /// /// # Arguments /// * `domain` - The domain to queue for /// * `identifier` - The repository identifier /// * `tried_urls_for_domain` - URLs from this domain that have already been tried pub fn enqueue_identifier( - &self, + self: &Arc, domain: &str, identifier: String, tried_urls_for_domain: HashSet, ) { - let entry = self.get_or_create_throttle(domain); - let mut throttle = entry.lock().unwrap(); - throttle.enqueue_identifier(identifier, tried_urls_for_domain); + let should_trigger = { + let entry = self.get_or_create_throttle(domain); + let mut throttle = entry.lock().unwrap(); + throttle.enqueue_identifier(identifier, tried_urls_for_domain); + throttle.has_capacity() + }; + + if should_trigger { + self.try_process_next(domain); + } + } + + /// Try to process the next queued identifier for a domain. + /// + /// This is called when capacity becomes available (either via `complete_request` + /// or when a new identifier is enqueued). Spawns a task to process the next + /// ready identifier if one exists. + fn try_process_next(self: &Arc, domain: &str) { + // Get next ready identifier (not in_progress) + let identifier = { + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.next_ready_identifier() + } else { + None + } + }; + + if let Some(identifier) = identifier { + let manager = self.clone(); + let domain = domain.to_string(); + + tokio::spawn(async move { + manager.process_queued_identifier(&domain, &identifier).await; + }); + } + } + + /// Process a single identifier from a domain's queue. + /// + /// This function: + /// 1. Gets the next URL to try for this identifier on this domain + /// 2. If a URL is found, fetches from it and marks it as tried + /// 3. If no URL is found, removes the identifier from this domain's queue + /// 4. Triggers processing of the next identifier if capacity is available + async fn process_queued_identifier(self: &Arc, domain: &str, identifier: &str) { + let ctx = match self.ctx.get() { + Some(ctx) => ctx, + None => { + debug!( + domain = %domain, + identifier = %identifier, + "No sync context set - cannot process queued identifier" + ); + // Mark not in progress so it can be retried + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.mark_identifier_not_in_progress(identifier); + } + return; + } + }; + + // Get tried URLs for this identifier on this domain + let tried_urls = { + self.throttles + .get(domain) + .map(|entry| { + let throttle = entry.lock().unwrap(); + throttle.get_tried_urls(identifier) + }) + .unwrap_or_default() + }; + + // Get next URL for this identifier on this specific domain + let url = sync_identifier_next_url( + ctx.as_ref(), + identifier, + Some(domain), + &tried_urls, + self, + ) + .await; + + match url { + Some(url) => { + debug!( + domain = %domain, + identifier = %identifier, + url = %url, + "Processing queued identifier - fetching from URL" + ); + + // Fetch from this URL + sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await; + + // Record URL as tried and mark not in_progress + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.mark_url_tried(identifier, url); + throttle.mark_identifier_not_in_progress(identifier); + } + + // complete_request was already called by sync_identifier_from_url, + // which will trigger try_process_next if capacity is available + } + None => { + debug!( + domain = %domain, + identifier = %identifier, + "No more URLs for identifier on this domain - removing from queue" + ); + + // No more URLs for this identifier on this domain - remove from queue + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.remove_identifier(identifier); + } + + // Try next identifier since we didn't use any capacity + self.try_process_next(domain); + } + } } } @@ -523,8 +705,8 @@ mod tests { // Should now be throttled assert!(manager.is_throttled("example.com")); - // Complete one request - manager.complete_request("example.com"); + // Complete one request (using internal method for non-Arc test) + manager.complete_request_internal("example.com"); // Should have capacity again assert!(!manager.is_throttled("example.com")); @@ -540,8 +722,8 @@ mod tests { // Domain doesn't exist yet assert!(!manager.throttles.contains_key("example.com")); - // Enqueue an identifier - manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); + // Enqueue an identifier (using internal method for non-Arc test) + manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); // Domain throttle should now exist assert!(manager.throttles.contains_key("example.com")); @@ -560,4 +742,28 @@ mod tests { // Domain throttle should now exist assert!(manager.throttles.contains_key("example.com")); } + + #[test] + fn has_queued_work_reflects_queue_state() { + let manager = ThrottleManager::new(5, 100); + + // Initially no queued work + let has_work = manager + .throttles + .get("example.com") + .map(|e| e.lock().unwrap().has_queued_work()) + .unwrap_or(false); + assert!(!has_work); + + // Enqueue an identifier + manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); + + // Now should have queued work + let has_work = manager + .throttles + .get("example.com") + .map(|e| e.lock().unwrap().has_queued_work()) + .unwrap_or(false); + assert!(has_work); + } } -- cgit v1.2.3