diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/purgatory/sync/mod.rs | 2 | ||||
| -rw-r--r-- | src/purgatory/sync/throttle.rs | 172 |
2 files changed, 173 insertions, 1 deletions
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index 3c0d875..b29f10e 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs | |||
| @@ -10,4 +10,4 @@ mod queue; | |||
| 10 | mod throttle; | 10 | mod throttle; |
| 11 | 11 | ||
| 12 | pub use queue::SyncQueueEntry; | 12 | pub use queue::SyncQueueEntry; |
| 13 | pub use throttle::DomainThrottle; | 13 | pub use throttle::{DomainThrottle, ThrottleManager}; |
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index 6f01c86..94056b5 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs | |||
| @@ -5,9 +5,14 @@ | |||
| 5 | //! - Concurrent request limit (max in-flight requests) | 5 | //! - Concurrent request limit (max in-flight requests) |
| 6 | //! - Rate limit (max requests per minute) | 6 | //! - Rate limit (max requests per minute) |
| 7 | //! - Queue of identifiers waiting for capacity (with round-robin processing) | 7 | //! - Queue of identifiers waiting for capacity (with round-robin processing) |
| 8 | //! | ||
| 9 | //! The `ThrottleManager` owns all `DomainThrottle` instances and provides the | ||
| 10 | //! interface for checking throttle status and managing identifier queues. | ||
| 8 | 11 | ||
| 12 | use dashmap::DashMap; | ||
| 9 | use indexmap::IndexMap; | 13 | use indexmap::IndexMap; |
| 10 | use std::collections::{HashSet, VecDeque}; | 14 | use std::collections::{HashSet, VecDeque}; |
| 15 | use std::sync::Mutex; | ||
| 11 | use std::time::{Duration, Instant}; | 16 | use std::time::{Duration, Instant}; |
| 12 | 17 | ||
| 13 | /// State for an identifier waiting in a domain's queue. | 18 | /// State for an identifier waiting in a domain's queue. |
| @@ -229,6 +234,119 @@ impl DomainThrottle { | |||
| 229 | } | 234 | } |
| 230 | } | 235 | } |
| 231 | 236 | ||
| 237 | /// Manages rate limiting across all domains. | ||
| 238 | /// | ||
| 239 | /// Owns a collection of `DomainThrottle` instances and provides: | ||
| 240 | /// - Throttle status checking for `sync_identifier_next_url` | ||
| 241 | /// - Identifier queue management | ||
| 242 | /// - Request tracking (start/complete) | ||
| 243 | /// | ||
| 244 | /// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`) | ||
| 245 | /// will be added in Phase 6 after `SyncContext` is available. | ||
| 246 | pub struct ThrottleManager { | ||
| 247 | /// Per-domain throttle state. | ||
| 248 | /// Uses DashMap for concurrent access from multiple sync tasks. | ||
| 249 | throttles: DashMap<String, Mutex<DomainThrottle>>, | ||
| 250 | |||
| 251 | /// Maximum concurrent requests per domain. | ||
| 252 | max_concurrent_per_domain: u32, | ||
| 253 | |||
| 254 | /// Maximum requests per minute per domain. | ||
| 255 | max_per_minute_per_domain: u32, | ||
| 256 | } | ||
| 257 | |||
| 258 | impl ThrottleManager { | ||
| 259 | /// Create a new throttle manager with the specified limits. | ||
| 260 | /// | ||
| 261 | /// # Arguments | ||
| 262 | /// * `max_concurrent` - Maximum concurrent in-flight requests per domain | ||
| 263 | /// * `max_per_minute` - Maximum requests per 60-second window per domain | ||
| 264 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { | ||
| 265 | Self { | ||
| 266 | throttles: DashMap::new(), | ||
| 267 | max_concurrent_per_domain: max_concurrent, | ||
| 268 | max_per_minute_per_domain: max_per_minute, | ||
| 269 | } | ||
| 270 | } | ||
| 271 | |||
| 272 | /// Check if a domain is currently throttled (at capacity). | ||
| 273 | /// | ||
| 274 | /// Returns true if the domain has no capacity for another request, | ||
| 275 | /// either due to concurrent limit or rate limit. | ||
| 276 | pub fn is_throttled(&self, domain: &str) -> bool { | ||
| 277 | self.throttles | ||
| 278 | .get(domain) | ||
| 279 | .map_or(false, |entry| { | ||
| 280 | let throttle = entry.lock().unwrap(); | ||
| 281 | !throttle.has_capacity() | ||
| 282 | }) | ||
| 283 | } | ||
| 284 | |||
| 285 | /// Get or create a throttle for a domain. | ||
| 286 | fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> { | ||
| 287 | // First, try to get existing | ||
| 288 | if let Some(entry) = self.throttles.get(domain) { | ||
| 289 | return entry; | ||
| 290 | } | ||
| 291 | |||
| 292 | // Create new throttle | ||
| 293 | self.throttles.entry(domain.to_string()).or_insert_with(|| { | ||
| 294 | Mutex::new(DomainThrottle::new( | ||
| 295 | domain.to_string(), | ||
| 296 | self.max_concurrent_per_domain, | ||
| 297 | self.max_per_minute_per_domain, | ||
| 298 | )) | ||
| 299 | }); | ||
| 300 | |||
| 301 | // Return the entry (we know it exists now) | ||
| 302 | self.throttles.get(domain).unwrap() | ||
| 303 | } | ||
| 304 | |||
| 305 | /// Record that a request is starting for a domain. | ||
| 306 | /// | ||
| 307 | /// Increments in-flight count and records timestamp for rate limiting. | ||
| 308 | pub fn start_request(&self, domain: &str) { | ||
| 309 | let entry = self.get_or_create_throttle(domain); | ||
| 310 | let mut throttle = entry.lock().unwrap(); | ||
| 311 | throttle.start_request(); | ||
| 312 | } | ||
| 313 | |||
| 314 | /// Record that a request completed for a domain. | ||
| 315 | /// | ||
| 316 | /// Decrements in-flight count and cleans up old timestamps. | ||
| 317 | /// | ||
| 318 | /// Note: Trigger-based processing (spawning tasks when capacity frees) | ||
| 319 | /// will be added in Phase 6 after `SyncContext` is available. | ||
| 320 | pub fn complete_request(&self, domain: &str) { | ||
| 321 | if let Some(entry) = self.throttles.get(domain) { | ||
| 322 | let mut throttle = entry.lock().unwrap(); | ||
| 323 | throttle.complete_request(); | ||
| 324 | } | ||
| 325 | } | ||
| 326 | |||
| 327 | /// Add an identifier to a domain's waiting queue. | ||
| 328 | /// | ||
| 329 | /// If the identifier is already queued for this domain, merges the tried_urls sets. | ||
| 330 | /// | ||
| 331 | /// Note: Trigger-based processing (spawning tasks when capacity available) | ||
| 332 | /// will be added in Phase 6 after `SyncContext` is available. | ||
| 333 | /// | ||
| 334 | /// # Arguments | ||
| 335 | /// * `domain` - The domain to queue for | ||
| 336 | /// * `identifier` - The repository identifier | ||
| 337 | /// * `tried_urls_for_domain` - URLs from this domain that have already been tried | ||
| 338 | pub fn enqueue_identifier( | ||
| 339 | &self, | ||
| 340 | domain: &str, | ||
| 341 | identifier: String, | ||
| 342 | tried_urls_for_domain: HashSet<String>, | ||
| 343 | ) { | ||
| 344 | let entry = self.get_or_create_throttle(domain); | ||
| 345 | let mut throttle = entry.lock().unwrap(); | ||
| 346 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | ||
| 347 | } | ||
| 348 | } | ||
| 349 | |||
| 232 | #[cfg(test)] | 350 | #[cfg(test)] |
| 233 | mod tests { | 351 | mod tests { |
| 234 | use super::*; | 352 | use super::*; |
| @@ -388,4 +506,58 @@ mod tests { | |||
| 388 | assert!(tried.contains("url2")); | 506 | assert!(tried.contains("url2")); |
| 389 | assert_eq!(tried.len(), 2); | 507 | assert_eq!(tried.len(), 2); |
| 390 | } | 508 | } |
| 509 | |||
| 510 | // ThrottleManager tests | ||
| 511 | |||
| 512 | #[test] | ||
| 513 | fn is_throttled_reflects_domain_capacity() { | ||
| 514 | let manager = ThrottleManager::new(2, 100); | ||
| 515 | |||
| 516 | // New domain should not be throttled (has capacity) | ||
| 517 | assert!(!manager.is_throttled("example.com")); | ||
| 518 | |||
| 519 | // Start 2 requests (at concurrent limit) | ||
| 520 | manager.start_request("example.com"); | ||
| 521 | manager.start_request("example.com"); | ||
| 522 | |||
| 523 | // Should now be throttled | ||
| 524 | assert!(manager.is_throttled("example.com")); | ||
| 525 | |||
| 526 | // Complete one request | ||
| 527 | manager.complete_request("example.com"); | ||
| 528 | |||
| 529 | // Should have capacity again | ||
| 530 | assert!(!manager.is_throttled("example.com")); | ||
| 531 | |||
| 532 | // Different domain should be independent | ||
| 533 | assert!(!manager.is_throttled("other.com")); | ||
| 534 | } | ||
| 535 | |||
| 536 | #[test] | ||
| 537 | fn enqueue_identifier_creates_domain_throttle() { | ||
| 538 | let manager = ThrottleManager::new(5, 100); | ||
| 539 | |||
| 540 | // Domain doesn't exist yet | ||
| 541 | assert!(!manager.throttles.contains_key("example.com")); | ||
| 542 | |||
| 543 | // Enqueue an identifier | ||
| 544 | manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); | ||
| 545 | |||
| 546 | // Domain throttle should now exist | ||
| 547 | assert!(manager.throttles.contains_key("example.com")); | ||
| 548 | } | ||
| 549 | |||
| 550 | #[test] | ||
| 551 | fn start_request_creates_domain_throttle() { | ||
| 552 | let manager = ThrottleManager::new(5, 100); | ||
| 553 | |||
| 554 | // Domain doesn't exist yet | ||
| 555 | assert!(!manager.throttles.contains_key("example.com")); | ||
| 556 | |||
| 557 | // Start a request | ||
| 558 | manager.start_request("example.com"); | ||
| 559 | |||
| 560 | // Domain throttle should now exist | ||
| 561 | assert!(manager.throttles.contains_key("example.com")); | ||
| 562 | } | ||
| 391 | } | 563 | } |