From 09d9771acaffdc6e798cc32d2a68e6d46a323d3a Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 11:33:03 +0000 Subject: Add ThrottleManager for cross-domain rate limiting Implements ThrottleManager which manages all per-domain DomainThrottle instances and provides: - Throttle status checking via is_throttled() for sync URL selection - Request tracking via start_request()/complete_request() - Identifier queue management via enqueue_identifier() - Automatic domain throttle creation on first access - Thread-safe access via DashMap with Mutex-wrapped throttles The manager uses the configured max_concurrent and max_per_minute limits for all domains. Trigger-based queue processing (set_context, process_queued_identifier) will be added after SyncContext is available. Tests verify: - is_throttled reflects domain capacity correctly - enqueue_identifier creates domain throttle if needed - start_request creates domain throttle if needed --- src/purgatory/sync/mod.rs | 2 +- src/purgatory/sync/throttle.rs | 172 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index 3c0d875..b29f10e 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs @@ -10,4 +10,4 @@ mod queue; mod throttle; pub use queue::SyncQueueEntry; -pub use throttle::DomainThrottle; +pub use throttle::{DomainThrottle, ThrottleManager}; diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index 6f01c86..94056b5 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs @@ -5,9 +5,14 @@ //! - Concurrent request limit (max in-flight requests) //! - Rate limit (max requests per minute) //! - Queue of identifiers waiting for capacity (with round-robin processing) +//! +//! The `ThrottleManager` owns all `DomainThrottle` instances and provides the +//! interface for checking throttle status and managing identifier queues. +use dashmap::DashMap; use indexmap::IndexMap; use std::collections::{HashSet, VecDeque}; +use std::sync::Mutex; use std::time::{Duration, Instant}; /// State for an identifier waiting in a domain's queue. @@ -229,6 +234,119 @@ impl DomainThrottle { } } +/// Manages rate limiting across all domains. +/// +/// Owns a collection of `DomainThrottle` instances and provides: +/// - Throttle status checking for `sync_identifier_next_url` +/// - Identifier queue management +/// - Request tracking (start/complete) +/// +/// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`) +/// will be added in Phase 6 after `SyncContext` is available. +pub struct ThrottleManager { + /// Per-domain throttle state. + /// Uses DashMap for concurrent access from multiple sync tasks. + throttles: DashMap>, + + /// Maximum concurrent requests per domain. + max_concurrent_per_domain: u32, + + /// Maximum requests per minute per domain. + max_per_minute_per_domain: u32, +} + +impl ThrottleManager { + /// Create a new throttle manager with the specified limits. + /// + /// # Arguments + /// * `max_concurrent` - Maximum concurrent in-flight requests per domain + /// * `max_per_minute` - Maximum requests per 60-second window per domain + pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { + Self { + throttles: DashMap::new(), + max_concurrent_per_domain: max_concurrent, + max_per_minute_per_domain: max_per_minute, + } + } + + /// Check if a domain is currently throttled (at capacity). + /// + /// Returns true if the domain has no capacity for another request, + /// either due to concurrent limit or rate limit. + pub fn is_throttled(&self, domain: &str) -> bool { + self.throttles + .get(domain) + .map_or(false, |entry| { + let throttle = entry.lock().unwrap(); + !throttle.has_capacity() + }) + } + + /// Get or create a throttle for a domain. + fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex> { + // First, try to get existing + if let Some(entry) = self.throttles.get(domain) { + return entry; + } + + // Create new throttle + self.throttles.entry(domain.to_string()).or_insert_with(|| { + Mutex::new(DomainThrottle::new( + domain.to_string(), + self.max_concurrent_per_domain, + self.max_per_minute_per_domain, + )) + }); + + // Return the entry (we know it exists now) + self.throttles.get(domain).unwrap() + } + + /// Record that a request is starting for a domain. + /// + /// Increments in-flight count and records timestamp for rate limiting. + pub fn start_request(&self, domain: &str) { + let entry = self.get_or_create_throttle(domain); + let mut throttle = entry.lock().unwrap(); + throttle.start_request(); + } + + /// Record that a request completed for a domain. + /// + /// Decrements in-flight count and cleans up old timestamps. + /// + /// Note: Trigger-based processing (spawning tasks when capacity frees) + /// will be added in Phase 6 after `SyncContext` is available. + pub fn complete_request(&self, domain: &str) { + if let Some(entry) = self.throttles.get(domain) { + let mut throttle = entry.lock().unwrap(); + throttle.complete_request(); + } + } + + /// Add an identifier to a domain's waiting queue. + /// + /// If the identifier is already queued for this domain, merges the tried_urls sets. + /// + /// Note: Trigger-based processing (spawning tasks when capacity available) + /// will be added in Phase 6 after `SyncContext` is available. + /// + /// # Arguments + /// * `domain` - The domain to queue for + /// * `identifier` - The repository identifier + /// * `tried_urls_for_domain` - URLs from this domain that have already been tried + pub fn enqueue_identifier( + &self, + domain: &str, + identifier: String, + tried_urls_for_domain: HashSet, + ) { + let entry = self.get_or_create_throttle(domain); + let mut throttle = entry.lock().unwrap(); + throttle.enqueue_identifier(identifier, tried_urls_for_domain); + } +} + #[cfg(test)] mod tests { use super::*; @@ -388,4 +506,58 @@ mod tests { assert!(tried.contains("url2")); assert_eq!(tried.len(), 2); } + + // ThrottleManager tests + + #[test] + fn is_throttled_reflects_domain_capacity() { + let manager = ThrottleManager::new(2, 100); + + // New domain should not be throttled (has capacity) + assert!(!manager.is_throttled("example.com")); + + // Start 2 requests (at concurrent limit) + manager.start_request("example.com"); + manager.start_request("example.com"); + + // Should now be throttled + assert!(manager.is_throttled("example.com")); + + // Complete one request + manager.complete_request("example.com"); + + // Should have capacity again + assert!(!manager.is_throttled("example.com")); + + // Different domain should be independent + assert!(!manager.is_throttled("other.com")); + } + + #[test] + fn enqueue_identifier_creates_domain_throttle() { + let manager = ThrottleManager::new(5, 100); + + // Domain doesn't exist yet + assert!(!manager.throttles.contains_key("example.com")); + + // Enqueue an identifier + manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); + + // Domain throttle should now exist + assert!(manager.throttles.contains_key("example.com")); + } + + #[test] + fn start_request_creates_domain_throttle() { + let manager = ThrottleManager::new(5, 100); + + // Domain doesn't exist yet + assert!(!manager.throttles.contains_key("example.com")); + + // Start a request + manager.start_request("example.com"); + + // Domain throttle should now exist + assert!(manager.throttles.contains_key("example.com")); + } } -- cgit v1.2.3