From 8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 11:57:55 +0000 Subject: Add sync_identifier orchestration and ThrottleManager queue processing Implement the main sync orchestration function and trigger-based queue processing for throttled domains: sync_identifier function: - Orchestrates syncing git data for a single identifier - Tries all non-throttled URLs in sequence - Checks completion after each fetch (no pending events or all OIDs fetched) - Enqueues with throttled domains when non-throttled URLs are exhausted - Returns true if complete, false if events remain (for backoff) ThrottleManager enhancements: - Add set_context() to provide SyncContext for queue processing - Add try_process_next() to spawn tasks when capacity frees - Add process_queued_identifier() to handle queued work - Update complete_request() to trigger processing on completion - Update enqueue_identifier() to trigger processing when capacity available - Add internal methods for non-Arc testing compatibility Generic function updates: - Add ?Sized bound to sync_identifier_next_url, sync_identifier_from_url, sync_identifier, and get_throttled_domains_with_untried_urls for dynamic dispatch support (Arc) Tests: - sync_identifier_tries_multiple_urls_until_complete: verifies sequential URL fetching until all OIDs are available - sync_identifier_enqueues_throttled_domains_when_incomplete: verifies throttled domains get the identifier enqueued for later processing - has_queued_work_reflects_queue_state: verifies queue state tracking --- src/purgatory/sync/throttle.rs | 278 +++++++++++++++++++++++++++++++++++------ 1 file changed, 242 insertions(+), 36 deletions(-) (limited to 'src/purgatory/sync/throttle.rs') diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index 94056b5..a310a91 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs @@ -8,12 +8,22 @@ //! //! The `ThrottleManager` owns all `DomainThrottle` instances and provides the //! interface for checking throttle status and managing identifier queues. +//! +//! ## Trigger-based Processing +//! +//! When capacity frees up (via `complete_request`) or a new identifier is enqueued +//! (via `enqueue_identifier`), the manager automatically spawns tasks to process +//! queued identifiers. This is trigger-based, not polling-based. use dashmap::DashMap; use indexmap::IndexMap; use std::collections::{HashSet, VecDeque}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex, OnceLock}; use std::time::{Duration, Instant}; +use tracing::debug; + +use super::context::SyncContext; +use super::functions::{sync_identifier_from_url, sync_identifier_next_url}; /// State for an identifier waiting in a domain's queue. /// @@ -240,9 +250,7 @@ impl DomainThrottle { /// - Throttle status checking for `sync_identifier_next_url` /// - Identifier queue management /// - Request tracking (start/complete) -/// -/// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`) -/// will be added in Phase 6 after `SyncContext` is available. +/// - Trigger-based queue processing when capacity frees up pub struct ThrottleManager { /// Per-domain throttle state. /// Uses DashMap for concurrent access from multiple sync tasks. @@ -253,6 +261,10 @@ pub struct ThrottleManager { /// Maximum requests per minute per domain. max_per_minute_per_domain: u32, + + /// Sync context for processing queued identifiers. + /// Set once at startup via `set_context()`. + ctx: OnceLock>, } impl ThrottleManager { @@ -266,38 +278,54 @@ impl ThrottleManager { throttles: DashMap::new(), max_concurrent_per_domain: max_concurrent, max_per_minute_per_domain: max_per_minute, + ctx: OnceLock::new(), } } + /// Set the sync context (called once at startup). + /// + /// The context is used for processing queued identifiers when capacity + /// becomes available. Must be called before any trigger-based processing + /// can occur. + /// + /// # Arguments + /// * `ctx` - The sync context implementation + pub fn set_context(&self, ctx: Arc) { + let _ = self.ctx.set(ctx); + } + /// Check if a domain is currently throttled (at capacity). /// /// Returns true if the domain has no capacity for another request, /// either due to concurrent limit or rate limit. pub fn is_throttled(&self, domain: &str) -> bool { - self.throttles - .get(domain) - .map_or(false, |entry| { - let throttle = entry.lock().unwrap(); - !throttle.has_capacity() - }) + self.throttles.get(domain).map_or(false, |entry| { + let throttle = entry.lock().unwrap(); + !throttle.has_capacity() + }) } /// Get or create a throttle for a domain. - fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex> { + fn get_or_create_throttle( + &self, + domain: &str, + ) -> dashmap::mapref::one::Ref<'_, String, Mutex> { // First, try to get existing if let Some(entry) = self.throttles.get(domain) { return entry; } - + // Create new throttle - self.throttles.entry(domain.to_string()).or_insert_with(|| { - Mutex::new(DomainThrottle::new( - domain.to_string(), - self.max_concurrent_per_domain, - self.max_per_minute_per_domain, - )) - }); - + self.throttles + .entry(domain.to_string()) + .or_insert_with(|| { + Mutex::new(DomainThrottle::new( + domain.to_string(), + self.max_concurrent_per_domain, + self.max_per_minute_per_domain, + )) + }); + // Return the entry (we know it exists now) self.throttles.get(domain).unwrap() } @@ -311,39 +339,193 @@ impl ThrottleManager { throttle.start_request(); } - /// Record that a request completed for a domain. + /// Record that a request completed for a domain (internal, no trigger). /// /// Decrements in-flight count and cleans up old timestamps. - /// - /// Note: Trigger-based processing (spawning tasks when capacity frees) - /// will be added in Phase 6 after `SyncContext` is available. - pub fn complete_request(&self, domain: &str) { + /// Does not trigger processing of queued identifiers. + fn complete_request_internal(&self, domain: &str) { if let Some(entry) = self.throttles.get(domain) { let mut throttle = entry.lock().unwrap(); throttle.complete_request(); } } - /// Add an identifier to a domain's waiting queue. + /// Record that a request completed for a domain. + /// + /// Decrements in-flight count, cleans up old timestamps, and triggers + /// processing of queued identifiers if capacity is available. + /// + /// # Arguments + /// * `domain` - The domain that completed a request + pub fn complete_request(self: &Arc, domain: &str) { + let should_trigger = { + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.complete_request(); + throttle.has_capacity() && throttle.has_queued_work() + } else { + false + } + }; + + if should_trigger { + self.try_process_next(domain); + } + } + + /// Add an identifier to a domain's waiting queue (internal, no trigger). /// /// If the identifier is already queued for this domain, merges the tried_urls sets. + /// Does not trigger processing. + fn enqueue_identifier_internal( + &self, + domain: &str, + identifier: String, + tried_urls_for_domain: HashSet, + ) { + let entry = self.get_or_create_throttle(domain); + let mut throttle = entry.lock().unwrap(); + throttle.enqueue_identifier(identifier, tried_urls_for_domain); + } + + /// Add an identifier to a domain's waiting queue. /// - /// Note: Trigger-based processing (spawning tasks when capacity available) - /// will be added in Phase 6 after `SyncContext` is available. + /// If the identifier is already queued for this domain, merges the tried_urls sets. + /// Triggers processing if capacity is available. /// /// # Arguments /// * `domain` - The domain to queue for /// * `identifier` - The repository identifier /// * `tried_urls_for_domain` - URLs from this domain that have already been tried pub fn enqueue_identifier( - &self, + self: &Arc, domain: &str, identifier: String, tried_urls_for_domain: HashSet, ) { - let entry = self.get_or_create_throttle(domain); - let mut throttle = entry.lock().unwrap(); - throttle.enqueue_identifier(identifier, tried_urls_for_domain); + let should_trigger = { + let entry = self.get_or_create_throttle(domain); + let mut throttle = entry.lock().unwrap(); + throttle.enqueue_identifier(identifier, tried_urls_for_domain); + throttle.has_capacity() + }; + + if should_trigger { + self.try_process_next(domain); + } + } + + /// Try to process the next queued identifier for a domain. + /// + /// This is called when capacity becomes available (either via `complete_request` + /// or when a new identifier is enqueued). Spawns a task to process the next + /// ready identifier if one exists. + fn try_process_next(self: &Arc, domain: &str) { + // Get next ready identifier (not in_progress) + let identifier = { + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.next_ready_identifier() + } else { + None + } + }; + + if let Some(identifier) = identifier { + let manager = self.clone(); + let domain = domain.to_string(); + + tokio::spawn(async move { + manager.process_queued_identifier(&domain, &identifier).await; + }); + } + } + + /// Process a single identifier from a domain's queue. + /// + /// This function: + /// 1. Gets the next URL to try for this identifier on this domain + /// 2. If a URL is found, fetches from it and marks it as tried + /// 3. If no URL is found, removes the identifier from this domain's queue + /// 4. Triggers processing of the next identifier if capacity is available + async fn process_queued_identifier(self: &Arc, domain: &str, identifier: &str) { + let ctx = match self.ctx.get() { + Some(ctx) => ctx, + None => { + debug!( + domain = %domain, + identifier = %identifier, + "No sync context set - cannot process queued identifier" + ); + // Mark not in progress so it can be retried + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.mark_identifier_not_in_progress(identifier); + } + return; + } + }; + + // Get tried URLs for this identifier on this domain + let tried_urls = { + self.throttles + .get(domain) + .map(|entry| { + let throttle = entry.lock().unwrap(); + throttle.get_tried_urls(identifier) + }) + .unwrap_or_default() + }; + + // Get next URL for this identifier on this specific domain + let url = sync_identifier_next_url( + ctx.as_ref(), + identifier, + Some(domain), + &tried_urls, + self, + ) + .await; + + match url { + Some(url) => { + debug!( + domain = %domain, + identifier = %identifier, + url = %url, + "Processing queued identifier - fetching from URL" + ); + + // Fetch from this URL + sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await; + + // Record URL as tried and mark not in_progress + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.mark_url_tried(identifier, url); + throttle.mark_identifier_not_in_progress(identifier); + } + + // complete_request was already called by sync_identifier_from_url, + // which will trigger try_process_next if capacity is available + } + None => { + debug!( + domain = %domain, + identifier = %identifier, + "No more URLs for identifier on this domain - removing from queue" + ); + + // No more URLs for this identifier on this domain - remove from queue + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.remove_identifier(identifier); + } + + // Try next identifier since we didn't use any capacity + self.try_process_next(domain); + } + } } } @@ -523,8 +705,8 @@ mod tests { // Should now be throttled assert!(manager.is_throttled("example.com")); - // Complete one request - manager.complete_request("example.com"); + // Complete one request (using internal method for non-Arc test) + manager.complete_request_internal("example.com"); // Should have capacity again assert!(!manager.is_throttled("example.com")); @@ -540,8 +722,8 @@ mod tests { // Domain doesn't exist yet assert!(!manager.throttles.contains_key("example.com")); - // Enqueue an identifier - manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); + // Enqueue an identifier (using internal method for non-Arc test) + manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); // Domain throttle should now exist assert!(manager.throttles.contains_key("example.com")); @@ -560,4 +742,28 @@ mod tests { // Domain throttle should now exist assert!(manager.throttles.contains_key("example.com")); } + + #[test] + fn has_queued_work_reflects_queue_state() { + let manager = ThrottleManager::new(5, 100); + + // Initially no queued work + let has_work = manager + .throttles + .get("example.com") + .map(|e| e.lock().unwrap().has_queued_work()) + .unwrap_or(false); + assert!(!has_work); + + // Enqueue an identifier + manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); + + // Now should have queued work + let has_work = manager + .throttles + .get("example.com") + .map(|e| e.lock().unwrap().has_queued_work()) + .unwrap_or(false); + assert!(has_work); + } } -- cgit v1.2.3