diff options
Diffstat (limited to 'src/purgatory/sync/throttle.rs')
| -rw-r--r-- | src/purgatory/sync/throttle.rs | 278 |
1 files changed, 242 insertions, 36 deletions
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index 94056b5..a310a91 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs | |||
| @@ -8,12 +8,22 @@ | |||
| 8 | //! | 8 | //! |
| 9 | //! The `ThrottleManager` owns all `DomainThrottle` instances and provides the | 9 | //! The `ThrottleManager` owns all `DomainThrottle` instances and provides the |
| 10 | //! interface for checking throttle status and managing identifier queues. | 10 | //! interface for checking throttle status and managing identifier queues. |
| 11 | //! | ||
| 12 | //! ## Trigger-based Processing | ||
| 13 | //! | ||
| 14 | //! When capacity frees up (via `complete_request`) or a new identifier is enqueued | ||
| 15 | //! (via `enqueue_identifier`), the manager automatically spawns tasks to process | ||
| 16 | //! queued identifiers. This is trigger-based, not polling-based. | ||
| 11 | 17 | ||
| 12 | use dashmap::DashMap; | 18 | use dashmap::DashMap; |
| 13 | use indexmap::IndexMap; | 19 | use indexmap::IndexMap; |
| 14 | use std::collections::{HashSet, VecDeque}; | 20 | use std::collections::{HashSet, VecDeque}; |
| 15 | use std::sync::Mutex; | 21 | use std::sync::{Arc, Mutex, OnceLock}; |
| 16 | use std::time::{Duration, Instant}; | 22 | use std::time::{Duration, Instant}; |
| 23 | use tracing::debug; | ||
| 24 | |||
| 25 | use super::context::SyncContext; | ||
| 26 | use super::functions::{sync_identifier_from_url, sync_identifier_next_url}; | ||
| 17 | 27 | ||
| 18 | /// State for an identifier waiting in a domain's queue. | 28 | /// State for an identifier waiting in a domain's queue. |
| 19 | /// | 29 | /// |
| @@ -240,9 +250,7 @@ impl DomainThrottle { | |||
| 240 | /// - Throttle status checking for `sync_identifier_next_url` | 250 | /// - Throttle status checking for `sync_identifier_next_url` |
| 241 | /// - Identifier queue management | 251 | /// - Identifier queue management |
| 242 | /// - Request tracking (start/complete) | 252 | /// - Request tracking (start/complete) |
| 243 | /// | 253 | /// - Trigger-based queue processing when capacity frees up |
| 244 | /// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`) | ||
| 245 | /// will be added in Phase 6 after `SyncContext` is available. | ||
| 246 | pub struct ThrottleManager { | 254 | pub struct ThrottleManager { |
| 247 | /// Per-domain throttle state. | 255 | /// Per-domain throttle state. |
| 248 | /// Uses DashMap for concurrent access from multiple sync tasks. | 256 | /// Uses DashMap for concurrent access from multiple sync tasks. |
| @@ -253,6 +261,10 @@ pub struct ThrottleManager { | |||
| 253 | 261 | ||
| 254 | /// Maximum requests per minute per domain. | 262 | /// Maximum requests per minute per domain. |
| 255 | max_per_minute_per_domain: u32, | 263 | max_per_minute_per_domain: u32, |
| 264 | |||
| 265 | /// Sync context for processing queued identifiers. | ||
| 266 | /// Set once at startup via `set_context()`. | ||
| 267 | ctx: OnceLock<Arc<dyn SyncContext>>, | ||
| 256 | } | 268 | } |
| 257 | 269 | ||
| 258 | impl ThrottleManager { | 270 | impl ThrottleManager { |
| @@ -266,38 +278,54 @@ impl ThrottleManager { | |||
| 266 | throttles: DashMap::new(), | 278 | throttles: DashMap::new(), |
| 267 | max_concurrent_per_domain: max_concurrent, | 279 | max_concurrent_per_domain: max_concurrent, |
| 268 | max_per_minute_per_domain: max_per_minute, | 280 | max_per_minute_per_domain: max_per_minute, |
| 281 | ctx: OnceLock::new(), | ||
| 269 | } | 282 | } |
| 270 | } | 283 | } |
| 271 | 284 | ||
| 285 | /// Set the sync context (called once at startup). | ||
| 286 | /// | ||
| 287 | /// The context is used for processing queued identifiers when capacity | ||
| 288 | /// becomes available. Must be called before any trigger-based processing | ||
| 289 | /// can occur. | ||
| 290 | /// | ||
| 291 | /// # Arguments | ||
| 292 | /// * `ctx` - The sync context implementation | ||
| 293 | pub fn set_context(&self, ctx: Arc<dyn SyncContext>) { | ||
| 294 | let _ = self.ctx.set(ctx); | ||
| 295 | } | ||
| 296 | |||
| 272 | /// Check if a domain is currently throttled (at capacity). | 297 | /// Check if a domain is currently throttled (at capacity). |
| 273 | /// | 298 | /// |
| 274 | /// Returns true if the domain has no capacity for another request, | 299 | /// Returns true if the domain has no capacity for another request, |
| 275 | /// either due to concurrent limit or rate limit. | 300 | /// either due to concurrent limit or rate limit. |
| 276 | pub fn is_throttled(&self, domain: &str) -> bool { | 301 | pub fn is_throttled(&self, domain: &str) -> bool { |
| 277 | self.throttles | 302 | self.throttles.get(domain).map_or(false, |entry| { |
| 278 | .get(domain) | 303 | let throttle = entry.lock().unwrap(); |
| 279 | .map_or(false, |entry| { | 304 | !throttle.has_capacity() |
| 280 | let throttle = entry.lock().unwrap(); | 305 | }) |
| 281 | !throttle.has_capacity() | ||
| 282 | }) | ||
| 283 | } | 306 | } |
| 284 | 307 | ||
| 285 | /// Get or create a throttle for a domain. | 308 | /// Get or create a throttle for a domain. |
| 286 | fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> { | 309 | fn get_or_create_throttle( |
| 310 | &self, | ||
| 311 | domain: &str, | ||
| 312 | ) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> { | ||
| 287 | // First, try to get existing | 313 | // First, try to get existing |
| 288 | if let Some(entry) = self.throttles.get(domain) { | 314 | if let Some(entry) = self.throttles.get(domain) { |
| 289 | return entry; | 315 | return entry; |
| 290 | } | 316 | } |
| 291 | 317 | ||
| 292 | // Create new throttle | 318 | // Create new throttle |
| 293 | self.throttles.entry(domain.to_string()).or_insert_with(|| { | 319 | self.throttles |
| 294 | Mutex::new(DomainThrottle::new( | 320 | .entry(domain.to_string()) |
| 295 | domain.to_string(), | 321 | .or_insert_with(|| { |
| 296 | self.max_concurrent_per_domain, | 322 | Mutex::new(DomainThrottle::new( |
| 297 | self.max_per_minute_per_domain, | 323 | domain.to_string(), |
| 298 | )) | 324 | self.max_concurrent_per_domain, |
| 299 | }); | 325 | self.max_per_minute_per_domain, |
| 300 | 326 | )) | |
| 327 | }); | ||
| 328 | |||
| 301 | // Return the entry (we know it exists now) | 329 | // Return the entry (we know it exists now) |
| 302 | self.throttles.get(domain).unwrap() | 330 | self.throttles.get(domain).unwrap() |
| 303 | } | 331 | } |
| @@ -311,39 +339,193 @@ impl ThrottleManager { | |||
| 311 | throttle.start_request(); | 339 | throttle.start_request(); |
| 312 | } | 340 | } |
| 313 | 341 | ||
| 314 | /// Record that a request completed for a domain. | 342 | /// Record that a request completed for a domain (internal, no trigger). |
| 315 | /// | 343 | /// |
| 316 | /// Decrements in-flight count and cleans up old timestamps. | 344 | /// Decrements in-flight count and cleans up old timestamps. |
| 317 | /// | 345 | /// Does not trigger processing of queued identifiers. |
| 318 | /// Note: Trigger-based processing (spawning tasks when capacity frees) | 346 | fn complete_request_internal(&self, domain: &str) { |
| 319 | /// will be added in Phase 6 after `SyncContext` is available. | ||
| 320 | pub fn complete_request(&self, domain: &str) { | ||
| 321 | if let Some(entry) = self.throttles.get(domain) { | 347 | if let Some(entry) = self.throttles.get(domain) { |
| 322 | let mut throttle = entry.lock().unwrap(); | 348 | let mut throttle = entry.lock().unwrap(); |
| 323 | throttle.complete_request(); | 349 | throttle.complete_request(); |
| 324 | } | 350 | } |
| 325 | } | 351 | } |
| 326 | 352 | ||
| 327 | /// Add an identifier to a domain's waiting queue. | 353 | /// Record that a request completed for a domain. |
| 354 | /// | ||
| 355 | /// Decrements in-flight count, cleans up old timestamps, and triggers | ||
| 356 | /// processing of queued identifiers if capacity is available. | ||
| 357 | /// | ||
| 358 | /// # Arguments | ||
| 359 | /// * `domain` - The domain that completed a request | ||
| 360 | pub fn complete_request(self: &Arc<Self>, domain: &str) { | ||
| 361 | let should_trigger = { | ||
| 362 | if let Some(entry) = self.throttles.get(domain) { | ||
| 363 | let mut throttle = entry.lock().unwrap(); | ||
| 364 | throttle.complete_request(); | ||
| 365 | throttle.has_capacity() && throttle.has_queued_work() | ||
| 366 | } else { | ||
| 367 | false | ||
| 368 | } | ||
| 369 | }; | ||
| 370 | |||
| 371 | if should_trigger { | ||
| 372 | self.try_process_next(domain); | ||
| 373 | } | ||
| 374 | } | ||
| 375 | |||
| 376 | /// Add an identifier to a domain's waiting queue (internal, no trigger). | ||
| 328 | /// | 377 | /// |
| 329 | /// If the identifier is already queued for this domain, merges the tried_urls sets. | 378 | /// If the identifier is already queued for this domain, merges the tried_urls sets. |
| 379 | /// Does not trigger processing. | ||
| 380 | fn enqueue_identifier_internal( | ||
| 381 | &self, | ||
| 382 | domain: &str, | ||
| 383 | identifier: String, | ||
| 384 | tried_urls_for_domain: HashSet<String>, | ||
| 385 | ) { | ||
| 386 | let entry = self.get_or_create_throttle(domain); | ||
| 387 | let mut throttle = entry.lock().unwrap(); | ||
| 388 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | ||
| 389 | } | ||
| 390 | |||
| 391 | /// Add an identifier to a domain's waiting queue. | ||
| 330 | /// | 392 | /// |
| 331 | /// Note: Trigger-based processing (spawning tasks when capacity available) | 393 | /// If the identifier is already queued for this domain, merges the tried_urls sets. |
| 332 | /// will be added in Phase 6 after `SyncContext` is available. | 394 | /// Triggers processing if capacity is available. |
| 333 | /// | 395 | /// |
| 334 | /// # Arguments | 396 | /// # Arguments |
| 335 | /// * `domain` - The domain to queue for | 397 | /// * `domain` - The domain to queue for |
| 336 | /// * `identifier` - The repository identifier | 398 | /// * `identifier` - The repository identifier |
| 337 | /// * `tried_urls_for_domain` - URLs from this domain that have already been tried | 399 | /// * `tried_urls_for_domain` - URLs from this domain that have already been tried |
| 338 | pub fn enqueue_identifier( | 400 | pub fn enqueue_identifier( |
| 339 | &self, | 401 | self: &Arc<Self>, |
| 340 | domain: &str, | 402 | domain: &str, |
| 341 | identifier: String, | 403 | identifier: String, |
| 342 | tried_urls_for_domain: HashSet<String>, | 404 | tried_urls_for_domain: HashSet<String>, |
| 343 | ) { | 405 | ) { |
| 344 | let entry = self.get_or_create_throttle(domain); | 406 | let should_trigger = { |
| 345 | let mut throttle = entry.lock().unwrap(); | 407 | let entry = self.get_or_create_throttle(domain); |
| 346 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | 408 | let mut throttle = entry.lock().unwrap(); |
| 409 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | ||
| 410 | throttle.has_capacity() | ||
| 411 | }; | ||
| 412 | |||
| 413 | if should_trigger { | ||
| 414 | self.try_process_next(domain); | ||
| 415 | } | ||
| 416 | } | ||
| 417 | |||
| 418 | /// Try to process the next queued identifier for a domain. | ||
| 419 | /// | ||
| 420 | /// This is called when capacity becomes available (either via `complete_request` | ||
| 421 | /// or when a new identifier is enqueued). Spawns a task to process the next | ||
| 422 | /// ready identifier if one exists. | ||
| 423 | fn try_process_next(self: &Arc<Self>, domain: &str) { | ||
| 424 | // Get next ready identifier (not in_progress) | ||
| 425 | let identifier = { | ||
| 426 | if let Some(entry) = self.throttles.get(domain) { | ||
| 427 | let mut throttle = entry.lock().unwrap(); | ||
| 428 | throttle.next_ready_identifier() | ||
| 429 | } else { | ||
| 430 | None | ||
| 431 | } | ||
| 432 | }; | ||
| 433 | |||
| 434 | if let Some(identifier) = identifier { | ||
| 435 | let manager = self.clone(); | ||
| 436 | let domain = domain.to_string(); | ||
| 437 | |||
| 438 | tokio::spawn(async move { | ||
| 439 | manager.process_queued_identifier(&domain, &identifier).await; | ||
| 440 | }); | ||
| 441 | } | ||
| 442 | } | ||
| 443 | |||
| 444 | /// Process a single identifier from a domain's queue. | ||
| 445 | /// | ||
| 446 | /// This function: | ||
| 447 | /// 1. Gets the next URL to try for this identifier on this domain | ||
| 448 | /// 2. If a URL is found, fetches from it and marks it as tried | ||
| 449 | /// 3. If no URL is found, removes the identifier from this domain's queue | ||
| 450 | /// 4. Triggers processing of the next identifier if capacity is available | ||
| 451 | async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str) { | ||
| 452 | let ctx = match self.ctx.get() { | ||
| 453 | Some(ctx) => ctx, | ||
| 454 | None => { | ||
| 455 | debug!( | ||
| 456 | domain = %domain, | ||
| 457 | identifier = %identifier, | ||
| 458 | "No sync context set - cannot process queued identifier" | ||
| 459 | ); | ||
| 460 | // Mark not in progress so it can be retried | ||
| 461 | if let Some(entry) = self.throttles.get(domain) { | ||
| 462 | let mut throttle = entry.lock().unwrap(); | ||
| 463 | throttle.mark_identifier_not_in_progress(identifier); | ||
| 464 | } | ||
| 465 | return; | ||
| 466 | } | ||
| 467 | }; | ||
| 468 | |||
| 469 | // Get tried URLs for this identifier on this domain | ||
| 470 | let tried_urls = { | ||
| 471 | self.throttles | ||
| 472 | .get(domain) | ||
| 473 | .map(|entry| { | ||
| 474 | let throttle = entry.lock().unwrap(); | ||
| 475 | throttle.get_tried_urls(identifier) | ||
| 476 | }) | ||
| 477 | .unwrap_or_default() | ||
| 478 | }; | ||
| 479 | |||
| 480 | // Get next URL for this identifier on this specific domain | ||
| 481 | let url = sync_identifier_next_url( | ||
| 482 | ctx.as_ref(), | ||
| 483 | identifier, | ||
| 484 | Some(domain), | ||
| 485 | &tried_urls, | ||
| 486 | self, | ||
| 487 | ) | ||
| 488 | .await; | ||
| 489 | |||
| 490 | match url { | ||
| 491 | Some(url) => { | ||
| 492 | debug!( | ||
| 493 | domain = %domain, | ||
| 494 | identifier = %identifier, | ||
| 495 | url = %url, | ||
| 496 | "Processing queued identifier - fetching from URL" | ||
| 497 | ); | ||
| 498 | |||
| 499 | // Fetch from this URL | ||
| 500 | sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await; | ||
| 501 | |||
| 502 | // Record URL as tried and mark not in_progress | ||
| 503 | if let Some(entry) = self.throttles.get(domain) { | ||
| 504 | let mut throttle = entry.lock().unwrap(); | ||
| 505 | throttle.mark_url_tried(identifier, url); | ||
| 506 | throttle.mark_identifier_not_in_progress(identifier); | ||
| 507 | } | ||
| 508 | |||
| 509 | // complete_request was already called by sync_identifier_from_url, | ||
| 510 | // which will trigger try_process_next if capacity is available | ||
| 511 | } | ||
| 512 | None => { | ||
| 513 | debug!( | ||
| 514 | domain = %domain, | ||
| 515 | identifier = %identifier, | ||
| 516 | "No more URLs for identifier on this domain - removing from queue" | ||
| 517 | ); | ||
| 518 | |||
| 519 | // No more URLs for this identifier on this domain - remove from queue | ||
| 520 | if let Some(entry) = self.throttles.get(domain) { | ||
| 521 | let mut throttle = entry.lock().unwrap(); | ||
| 522 | throttle.remove_identifier(identifier); | ||
| 523 | } | ||
| 524 | |||
| 525 | // Try next identifier since we didn't use any capacity | ||
| 526 | self.try_process_next(domain); | ||
| 527 | } | ||
| 528 | } | ||
| 347 | } | 529 | } |
| 348 | } | 530 | } |
| 349 | 531 | ||
| @@ -523,8 +705,8 @@ mod tests { | |||
| 523 | // Should now be throttled | 705 | // Should now be throttled |
| 524 | assert!(manager.is_throttled("example.com")); | 706 | assert!(manager.is_throttled("example.com")); |
| 525 | 707 | ||
| 526 | // Complete one request | 708 | // Complete one request (using internal method for non-Arc test) |
| 527 | manager.complete_request("example.com"); | 709 | manager.complete_request_internal("example.com"); |
| 528 | 710 | ||
| 529 | // Should have capacity again | 711 | // Should have capacity again |
| 530 | assert!(!manager.is_throttled("example.com")); | 712 | assert!(!manager.is_throttled("example.com")); |
| @@ -540,8 +722,8 @@ mod tests { | |||
| 540 | // Domain doesn't exist yet | 722 | // Domain doesn't exist yet |
| 541 | assert!(!manager.throttles.contains_key("example.com")); | 723 | assert!(!manager.throttles.contains_key("example.com")); |
| 542 | 724 | ||
| 543 | // Enqueue an identifier | 725 | // Enqueue an identifier (using internal method for non-Arc test) |
| 544 | manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); | 726 | manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); |
| 545 | 727 | ||
| 546 | // Domain throttle should now exist | 728 | // Domain throttle should now exist |
| 547 | assert!(manager.throttles.contains_key("example.com")); | 729 | assert!(manager.throttles.contains_key("example.com")); |
| @@ -560,4 +742,28 @@ mod tests { | |||
| 560 | // Domain throttle should now exist | 742 | // Domain throttle should now exist |
| 561 | assert!(manager.throttles.contains_key("example.com")); | 743 | assert!(manager.throttles.contains_key("example.com")); |
| 562 | } | 744 | } |
| 745 | |||
| 746 | #[test] | ||
| 747 | fn has_queued_work_reflects_queue_state() { | ||
| 748 | let manager = ThrottleManager::new(5, 100); | ||
| 749 | |||
| 750 | // Initially no queued work | ||
| 751 | let has_work = manager | ||
| 752 | .throttles | ||
| 753 | .get("example.com") | ||
| 754 | .map(|e| e.lock().unwrap().has_queued_work()) | ||
| 755 | .unwrap_or(false); | ||
| 756 | assert!(!has_work); | ||
| 757 | |||
| 758 | // Enqueue an identifier | ||
| 759 | manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); | ||
| 760 | |||
| 761 | // Now should have queued work | ||
| 762 | let has_work = manager | ||
| 763 | .throttles | ||
| 764 | .get("example.com") | ||
| 765 | .map(|e| e.lock().unwrap().has_queued_work()) | ||
| 766 | .unwrap_or(false); | ||
| 767 | assert!(has_work); | ||
| 768 | } | ||
| 563 | } | 769 | } |