diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:57:55 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:57:55 +0000 |
| commit | 8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 (patch) | |
| tree | 0e85dce4da81e57aa26ead9810589fe0f44f89ca | |
| parent | 1c5dac680b5a446e26b161208a17030f5fbd8a88 (diff) | |
Add sync_identifier orchestration and ThrottleManager queue processing
Implement the main sync orchestration function and trigger-based queue
processing for throttled domains:
sync_identifier function:
- Orchestrates syncing git data for a single identifier
- Tries all non-throttled URLs in sequence
- Checks completion after each fetch (no pending events or all OIDs fetched)
- Enqueues with throttled domains when non-throttled URLs are exhausted
- Returns true if complete, false if events remain (for backoff)
ThrottleManager enhancements:
- Add set_context() to provide SyncContext for queue processing
- Add try_process_next() to spawn tasks when capacity frees
- Add process_queued_identifier() to handle queued work
- Update complete_request() to trigger processing on completion
- Update enqueue_identifier() to trigger processing when capacity available
- Add internal methods for non-Arc testing compatibility
Generic function updates:
- Add ?Sized bound to sync_identifier_next_url, sync_identifier_from_url,
sync_identifier, and get_throttled_domains_with_untried_urls for
dynamic dispatch support (Arc<dyn SyncContext>)
Tests:
- sync_identifier_tries_multiple_urls_until_complete: verifies sequential
URL fetching until all OIDs are available
- sync_identifier_enqueues_throttled_domains_when_incomplete: verifies
throttled domains get the identifier enqueued for later processing
- has_queued_work_reflects_queue_state: verifies queue state tracking
| -rw-r--r-- | src/purgatory/sync/functions.rs | 228 | ||||
| -rw-r--r-- | src/purgatory/sync/mod.rs | 4 | ||||
| -rw-r--r-- | src/purgatory/sync/throttle.rs | 278 |
3 files changed, 469 insertions, 41 deletions
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index de846f3..751dd5e 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs | |||
| @@ -55,7 +55,7 @@ fn extract_domain(url: &str) -> Option<String> { | |||
| 55 | /// | 55 | /// |
| 56 | /// * `Some(url)` - The next URL to try | 56 | /// * `Some(url)` - The next URL to try |
| 57 | /// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) | 57 | /// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) |
| 58 | pub async fn sync_identifier_next_url<C: SyncContext>( | 58 | pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>( |
| 59 | ctx: &C, | 59 | ctx: &C, |
| 60 | identifier: &str, | 60 | identifier: &str, |
| 61 | domain: Option<&str>, | 61 | domain: Option<&str>, |
| @@ -178,7 +178,7 @@ pub struct ThrottledDomainInfo { | |||
| 178 | /// | 178 | /// |
| 179 | /// A list of throttled domains that still have untried URLs, along with | 179 | /// A list of throttled domains that still have untried URLs, along with |
| 180 | /// the tried URLs for each domain (for proper queue state). | 180 | /// the tried URLs for each domain (for proper queue state). |
| 181 | pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>( | 181 | pub async fn get_throttled_domains_with_untried_urls<C: SyncContext + ?Sized>( |
| 182 | ctx: &C, | 182 | ctx: &C, |
| 183 | identifier: &str, | 183 | identifier: &str, |
| 184 | tried_urls: &HashSet<String>, | 184 | tried_urls: &HashSet<String>, |
| @@ -254,7 +254,7 @@ pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>( | |||
| 254 | /// # Returns | 254 | /// # Returns |
| 255 | /// | 255 | /// |
| 256 | /// The number of OIDs successfully fetched (0 on failure) | 256 | /// The number of OIDs successfully fetched (0 on failure) |
| 257 | pub async fn sync_identifier_from_url<C: SyncContext>( | 257 | pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( |
| 258 | ctx: &C, | 258 | ctx: &C, |
| 259 | identifier: &str, | 259 | identifier: &str, |
| 260 | url: &str, | 260 | url: &str, |
| @@ -347,6 +347,128 @@ pub async fn sync_identifier_from_url<C: SyncContext>( | |||
| 347 | oids_fetched | 347 | oids_fetched |
| 348 | } | 348 | } |
| 349 | 349 | ||
| 350 | /// Sync git data for an identifier. | ||
| 351 | /// | ||
| 352 | /// This is the main orchestration function called by the sync loop. It: | ||
| 353 | /// 1. Tries all non-throttled URLs in sequence | ||
| 354 | /// 2. After each fetch, checks if sync is complete (no pending events or no needed OIDs) | ||
| 355 | /// 3. When no non-throttled URLs remain, enqueues with throttled domains for later processing | ||
| 356 | /// 4. Returns without waiting for throttled domains to complete | ||
| 357 | /// | ||
| 358 | /// # Arguments | ||
| 359 | /// | ||
| 360 | /// * `ctx` - The sync context providing repository data and OID information | ||
| 361 | /// * `identifier` - The repository identifier (d-tag value) | ||
| 362 | /// * `throttle_manager` - Used for rate limiting and domain queue management | ||
| 363 | /// | ||
| 364 | /// # Returns | ||
| 365 | /// | ||
| 366 | /// * `true` - Sync completed (no pending events or all OIDs fetched) | ||
| 367 | /// * `false` - Events remain in purgatory (will be retried after backoff, or processed | ||
| 368 | /// by throttled domain queues) | ||
| 369 | pub async fn sync_identifier<C: SyncContext + ?Sized>( | ||
| 370 | ctx: &C, | ||
| 371 | identifier: &str, | ||
| 372 | throttle_manager: &Arc<ThrottleManager>, | ||
| 373 | ) -> bool { | ||
| 374 | let mut tried_urls: HashSet<String> = HashSet::new(); | ||
| 375 | |||
| 376 | debug!( | ||
| 377 | identifier = %identifier, | ||
| 378 | "Starting sync for identifier" | ||
| 379 | ); | ||
| 380 | |||
| 381 | // Try all non-throttled URLs | ||
| 382 | loop { | ||
| 383 | match sync_identifier_next_url(ctx, identifier, None, &tried_urls, throttle_manager).await { | ||
| 384 | Some(url) => { | ||
| 385 | debug!( | ||
| 386 | identifier = %identifier, | ||
| 387 | url = %url, | ||
| 388 | "Found non-throttled URL to try" | ||
| 389 | ); | ||
| 390 | |||
| 391 | // Fetch from this URL | ||
| 392 | sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await; | ||
| 393 | tried_urls.insert(url); | ||
| 394 | |||
| 395 | // Check if sync is now complete | ||
| 396 | if !ctx.has_pending_events(identifier) { | ||
| 397 | debug!( | ||
| 398 | identifier = %identifier, | ||
| 399 | "Sync complete - no pending events" | ||
| 400 | ); | ||
| 401 | return true; | ||
| 402 | } | ||
| 403 | |||
| 404 | let needed_oids = ctx.collect_needed_oids(identifier); | ||
| 405 | if needed_oids.is_empty() { | ||
| 406 | debug!( | ||
| 407 | identifier = %identifier, | ||
| 408 | "Sync complete - all OIDs available" | ||
| 409 | ); | ||
| 410 | return true; | ||
| 411 | } | ||
| 412 | |||
| 413 | // Continue trying more URLs | ||
| 414 | } | ||
| 415 | None => { | ||
| 416 | // No more non-throttled URLs available | ||
| 417 | debug!( | ||
| 418 | identifier = %identifier, | ||
| 419 | tried_count = tried_urls.len(), | ||
| 420 | "No more non-throttled URLs available" | ||
| 421 | ); | ||
| 422 | break; | ||
| 423 | } | ||
| 424 | } | ||
| 425 | } | ||
| 426 | |||
| 427 | // Check if we're done (no pending events or no needed OIDs) | ||
| 428 | if !ctx.has_pending_events(identifier) { | ||
| 429 | debug!( | ||
| 430 | identifier = %identifier, | ||
| 431 | "Sync complete after exhausting URLs - no pending events" | ||
| 432 | ); | ||
| 433 | return true; | ||
| 434 | } | ||
| 435 | |||
| 436 | let needed_oids = ctx.collect_needed_oids(identifier); | ||
| 437 | if needed_oids.is_empty() { | ||
| 438 | debug!( | ||
| 439 | identifier = %identifier, | ||
| 440 | "Sync complete after exhausting URLs - all OIDs available" | ||
| 441 | ); | ||
| 442 | return true; | ||
| 443 | } | ||
| 444 | |||
| 445 | // Enqueue with any throttled domains that have untried URLs | ||
| 446 | let throttled_domains = | ||
| 447 | get_throttled_domains_with_untried_urls(ctx, identifier, &tried_urls, throttle_manager) | ||
| 448 | .await; | ||
| 449 | |||
| 450 | for info in throttled_domains { | ||
| 451 | debug!( | ||
| 452 | identifier = %identifier, | ||
| 453 | domain = %info.domain, | ||
| 454 | "Enqueueing identifier with throttled domain" | ||
| 455 | ); | ||
| 456 | throttle_manager.enqueue_identifier( | ||
| 457 | &info.domain, | ||
| 458 | identifier.to_string(), | ||
| 459 | info.tried_urls_for_domain, | ||
| 460 | ); | ||
| 461 | } | ||
| 462 | |||
| 463 | // Return false - events remain, will retry after backoff | ||
| 464 | // (throttled domains will process independently) | ||
| 465 | debug!( | ||
| 466 | identifier = %identifier, | ||
| 467 | "Sync incomplete - returning false for backoff" | ||
| 468 | ); | ||
| 469 | false | ||
| 470 | } | ||
| 471 | |||
| 350 | #[cfg(test)] | 472 | #[cfg(test)] |
| 351 | mod tests { | 473 | mod tests { |
| 352 | use super::*; | 474 | use super::*; |
| @@ -617,4 +739,104 @@ mod tests { | |||
| 617 | assert_eq!(throttled[0].domain, "gitlab.com"); | 739 | assert_eq!(throttled[0].domain, "gitlab.com"); |
| 618 | assert!(throttled[0].tried_urls_for_domain.is_empty()); | 740 | assert!(throttled[0].tried_urls_for_domain.is_empty()); |
| 619 | } | 741 | } |
| 742 | |||
| 743 | // ========================================================================= | ||
| 744 | // Phase 6: sync_identifier tests | ||
| 745 | // ========================================================================= | ||
| 746 | |||
| 747 | #[tokio::test] | ||
| 748 | async fn sync_identifier_tries_multiple_urls_until_complete() { | ||
| 749 | // Set up mock with 3 URLs, each providing partial OIDs | ||
| 750 | // URL1 provides abc123, URL2 provides def456, URL3 provides ghi789 | ||
| 751 | let mock = MockSyncContext::new() | ||
| 752 | .with_urls(&[ | ||
| 753 | "https://server1.com/repo.git", | ||
| 754 | "https://server2.com/repo.git", | ||
| 755 | "https://server3.com/repo.git", | ||
| 756 | ]) | ||
| 757 | .with_needed_oids(&["abc123", "def456", "ghi789"]) | ||
| 758 | .with_pending_events(true) | ||
| 759 | .url_provides("https://server1.com/repo.git", &["abc123"]) | ||
| 760 | .url_provides("https://server2.com/repo.git", &["def456"]) | ||
| 761 | .url_provides("https://server3.com/repo.git", &["ghi789"]); | ||
| 762 | |||
| 763 | let throttle_manager = Arc::new(ThrottleManager::new(5, 100)); | ||
| 764 | |||
| 765 | // Run sync_identifier | ||
| 766 | let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; | ||
| 767 | |||
| 768 | // Should return true (sync complete) | ||
| 769 | assert!(complete, "Expected sync to complete after trying all URLs"); | ||
| 770 | |||
| 771 | // Should have tried all 3 URLs | ||
| 772 | let fetch_log = mock.fetch_log(); | ||
| 773 | assert_eq!( | ||
| 774 | fetch_log.len(), | ||
| 775 | 3, | ||
| 776 | "Expected 3 fetch attempts, got: {:?}", | ||
| 777 | fetch_log | ||
| 778 | ); | ||
| 779 | |||
| 780 | // All OIDs should now be fetched | ||
| 781 | assert!( | ||
| 782 | mock.current_needed_oids().is_empty(), | ||
| 783 | "Expected all OIDs to be fetched" | ||
| 784 | ); | ||
| 785 | } | ||
| 786 | |||
| 787 | #[tokio::test] | ||
| 788 | async fn sync_identifier_enqueues_throttled_domains_when_incomplete() { | ||
| 789 | // Set up mock with URLs from two domains | ||
| 790 | // Only github.com can provide the OID, but it will be throttled | ||
| 791 | let mock = MockSyncContext::new() | ||
| 792 | .with_urls(&[ | ||
| 793 | "https://github.com/foo/bar.git", | ||
| 794 | "https://gitlab.com/foo/bar.git", | ||
| 795 | ]) | ||
| 796 | .with_needed_oids(&["abc123"]) | ||
| 797 | .with_pending_events(true) | ||
| 798 | .url_provides("https://github.com/foo/bar.git", &["abc123"]); | ||
| 799 | // Note: gitlab.com doesn't provide any OIDs | ||
| 800 | |||
| 801 | let throttle_manager = Arc::new(ThrottleManager::new(1, 100)); | ||
| 802 | |||
| 803 | // Throttle github.com by starting a request | ||
| 804 | throttle_manager.start_request("github.com"); | ||
| 805 | |||
| 806 | // Run sync_identifier | ||
| 807 | let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; | ||
| 808 | |||
| 809 | // Should return false (sync incomplete - github.com is throttled) | ||
| 810 | assert!( | ||
| 811 | !complete, | ||
| 812 | "Expected sync to be incomplete when required domain is throttled" | ||
| 813 | ); | ||
| 814 | |||
| 815 | // Should have tried gitlab.com (not throttled) but it doesn't have the OID | ||
| 816 | let fetch_log = mock.fetch_log(); | ||
| 817 | assert_eq!( | ||
| 818 | fetch_log.len(), | ||
| 819 | 1, | ||
| 820 | "Expected 1 fetch attempt (gitlab.com), got: {:?}", | ||
| 821 | fetch_log | ||
| 822 | ); | ||
| 823 | assert!( | ||
| 824 | fetch_log[0].contains("gitlab.com"), | ||
| 825 | "Expected gitlab.com to be tried first" | ||
| 826 | ); | ||
| 827 | |||
| 828 | // OID should still be needed | ||
| 829 | assert!( | ||
| 830 | mock.current_needed_oids().contains("abc123"), | ||
| 831 | "Expected OID to still be needed" | ||
| 832 | ); | ||
| 833 | |||
| 834 | // github.com should have the identifier enqueued | ||
| 835 | // We can verify this by checking if github.com is still throttled (it should be, | ||
| 836 | // since the identifier was enqueued but not processed yet) | ||
| 837 | assert!( | ||
| 838 | throttle_manager.is_throttled("github.com"), | ||
| 839 | "Expected github.com to still be throttled" | ||
| 840 | ); | ||
| 841 | } | ||
| 620 | } | 842 | } |
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index d26c1f0..8ac9216 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs | |||
| @@ -13,8 +13,8 @@ mod throttle; | |||
| 13 | 13 | ||
| 14 | pub use context::{ProcessResult, SyncContext}; | 14 | pub use context::{ProcessResult, SyncContext}; |
| 15 | pub use functions::{ | 15 | pub use functions::{ |
| 16 | get_throttled_domains_with_untried_urls, sync_identifier_from_url, sync_identifier_next_url, | 16 | get_throttled_domains_with_untried_urls, sync_identifier, sync_identifier_from_url, |
| 17 | ThrottledDomainInfo, | 17 | sync_identifier_next_url, ThrottledDomainInfo, |
| 18 | }; | 18 | }; |
| 19 | pub use queue::SyncQueueEntry; | 19 | pub use queue::SyncQueueEntry; |
| 20 | pub use throttle::{DomainThrottle, ThrottleManager}; | 20 | pub use throttle::{DomainThrottle, ThrottleManager}; |
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index 94056b5..a310a91 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs | |||
| @@ -8,12 +8,22 @@ | |||
| 8 | //! | 8 | //! |
| 9 | //! The `ThrottleManager` owns all `DomainThrottle` instances and provides the | 9 | //! The `ThrottleManager` owns all `DomainThrottle` instances and provides the |
| 10 | //! interface for checking throttle status and managing identifier queues. | 10 | //! interface for checking throttle status and managing identifier queues. |
| 11 | //! | ||
| 12 | //! ## Trigger-based Processing | ||
| 13 | //! | ||
| 14 | //! When capacity frees up (via `complete_request`) or a new identifier is enqueued | ||
| 15 | //! (via `enqueue_identifier`), the manager automatically spawns tasks to process | ||
| 16 | //! queued identifiers. This is trigger-based, not polling-based. | ||
| 11 | 17 | ||
| 12 | use dashmap::DashMap; | 18 | use dashmap::DashMap; |
| 13 | use indexmap::IndexMap; | 19 | use indexmap::IndexMap; |
| 14 | use std::collections::{HashSet, VecDeque}; | 20 | use std::collections::{HashSet, VecDeque}; |
| 15 | use std::sync::Mutex; | 21 | use std::sync::{Arc, Mutex, OnceLock}; |
| 16 | use std::time::{Duration, Instant}; | 22 | use std::time::{Duration, Instant}; |
| 23 | use tracing::debug; | ||
| 24 | |||
| 25 | use super::context::SyncContext; | ||
| 26 | use super::functions::{sync_identifier_from_url, sync_identifier_next_url}; | ||
| 17 | 27 | ||
| 18 | /// State for an identifier waiting in a domain's queue. | 28 | /// State for an identifier waiting in a domain's queue. |
| 19 | /// | 29 | /// |
| @@ -240,9 +250,7 @@ impl DomainThrottle { | |||
| 240 | /// - Throttle status checking for `sync_identifier_next_url` | 250 | /// - Throttle status checking for `sync_identifier_next_url` |
| 241 | /// - Identifier queue management | 251 | /// - Identifier queue management |
| 242 | /// - Request tracking (start/complete) | 252 | /// - Request tracking (start/complete) |
| 243 | /// | 253 | /// - Trigger-based queue processing when capacity frees up |
| 244 | /// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`) | ||
| 245 | /// will be added in Phase 6 after `SyncContext` is available. | ||
| 246 | pub struct ThrottleManager { | 254 | pub struct ThrottleManager { |
| 247 | /// Per-domain throttle state. | 255 | /// Per-domain throttle state. |
| 248 | /// Uses DashMap for concurrent access from multiple sync tasks. | 256 | /// Uses DashMap for concurrent access from multiple sync tasks. |
| @@ -253,6 +261,10 @@ pub struct ThrottleManager { | |||
| 253 | 261 | ||
| 254 | /// Maximum requests per minute per domain. | 262 | /// Maximum requests per minute per domain. |
| 255 | max_per_minute_per_domain: u32, | 263 | max_per_minute_per_domain: u32, |
| 264 | |||
| 265 | /// Sync context for processing queued identifiers. | ||
| 266 | /// Set once at startup via `set_context()`. | ||
| 267 | ctx: OnceLock<Arc<dyn SyncContext>>, | ||
| 256 | } | 268 | } |
| 257 | 269 | ||
| 258 | impl ThrottleManager { | 270 | impl ThrottleManager { |
| @@ -266,38 +278,54 @@ impl ThrottleManager { | |||
| 266 | throttles: DashMap::new(), | 278 | throttles: DashMap::new(), |
| 267 | max_concurrent_per_domain: max_concurrent, | 279 | max_concurrent_per_domain: max_concurrent, |
| 268 | max_per_minute_per_domain: max_per_minute, | 280 | max_per_minute_per_domain: max_per_minute, |
| 281 | ctx: OnceLock::new(), | ||
| 269 | } | 282 | } |
| 270 | } | 283 | } |
| 271 | 284 | ||
| 285 | /// Set the sync context (called once at startup). | ||
| 286 | /// | ||
| 287 | /// The context is used for processing queued identifiers when capacity | ||
| 288 | /// becomes available. Must be called before any trigger-based processing | ||
| 289 | /// can occur. | ||
| 290 | /// | ||
| 291 | /// # Arguments | ||
| 292 | /// * `ctx` - The sync context implementation | ||
| 293 | pub fn set_context(&self, ctx: Arc<dyn SyncContext>) { | ||
| 294 | let _ = self.ctx.set(ctx); | ||
| 295 | } | ||
| 296 | |||
| 272 | /// Check if a domain is currently throttled (at capacity). | 297 | /// Check if a domain is currently throttled (at capacity). |
| 273 | /// | 298 | /// |
| 274 | /// Returns true if the domain has no capacity for another request, | 299 | /// Returns true if the domain has no capacity for another request, |
| 275 | /// either due to concurrent limit or rate limit. | 300 | /// either due to concurrent limit or rate limit. |
| 276 | pub fn is_throttled(&self, domain: &str) -> bool { | 301 | pub fn is_throttled(&self, domain: &str) -> bool { |
| 277 | self.throttles | 302 | self.throttles.get(domain).map_or(false, |entry| { |
| 278 | .get(domain) | 303 | let throttle = entry.lock().unwrap(); |
| 279 | .map_or(false, |entry| { | 304 | !throttle.has_capacity() |
| 280 | let throttle = entry.lock().unwrap(); | 305 | }) |
| 281 | !throttle.has_capacity() | ||
| 282 | }) | ||
| 283 | } | 306 | } |
| 284 | 307 | ||
| 285 | /// Get or create a throttle for a domain. | 308 | /// Get or create a throttle for a domain. |
| 286 | fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> { | 309 | fn get_or_create_throttle( |
| 310 | &self, | ||
| 311 | domain: &str, | ||
| 312 | ) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> { | ||
| 287 | // First, try to get existing | 313 | // First, try to get existing |
| 288 | if let Some(entry) = self.throttles.get(domain) { | 314 | if let Some(entry) = self.throttles.get(domain) { |
| 289 | return entry; | 315 | return entry; |
| 290 | } | 316 | } |
| 291 | 317 | ||
| 292 | // Create new throttle | 318 | // Create new throttle |
| 293 | self.throttles.entry(domain.to_string()).or_insert_with(|| { | 319 | self.throttles |
| 294 | Mutex::new(DomainThrottle::new( | 320 | .entry(domain.to_string()) |
| 295 | domain.to_string(), | 321 | .or_insert_with(|| { |
| 296 | self.max_concurrent_per_domain, | 322 | Mutex::new(DomainThrottle::new( |
| 297 | self.max_per_minute_per_domain, | 323 | domain.to_string(), |
| 298 | )) | 324 | self.max_concurrent_per_domain, |
| 299 | }); | 325 | self.max_per_minute_per_domain, |
| 300 | 326 | )) | |
| 327 | }); | ||
| 328 | |||
| 301 | // Return the entry (we know it exists now) | 329 | // Return the entry (we know it exists now) |
| 302 | self.throttles.get(domain).unwrap() | 330 | self.throttles.get(domain).unwrap() |
| 303 | } | 331 | } |
| @@ -311,39 +339,193 @@ impl ThrottleManager { | |||
| 311 | throttle.start_request(); | 339 | throttle.start_request(); |
| 312 | } | 340 | } |
| 313 | 341 | ||
| 314 | /// Record that a request completed for a domain. | 342 | /// Record that a request completed for a domain (internal, no trigger). |
| 315 | /// | 343 | /// |
| 316 | /// Decrements in-flight count and cleans up old timestamps. | 344 | /// Decrements in-flight count and cleans up old timestamps. |
| 317 | /// | 345 | /// Does not trigger processing of queued identifiers. |
| 318 | /// Note: Trigger-based processing (spawning tasks when capacity frees) | 346 | fn complete_request_internal(&self, domain: &str) { |
| 319 | /// will be added in Phase 6 after `SyncContext` is available. | ||
| 320 | pub fn complete_request(&self, domain: &str) { | ||
| 321 | if let Some(entry) = self.throttles.get(domain) { | 347 | if let Some(entry) = self.throttles.get(domain) { |
| 322 | let mut throttle = entry.lock().unwrap(); | 348 | let mut throttle = entry.lock().unwrap(); |
| 323 | throttle.complete_request(); | 349 | throttle.complete_request(); |
| 324 | } | 350 | } |
| 325 | } | 351 | } |
| 326 | 352 | ||
| 327 | /// Add an identifier to a domain's waiting queue. | 353 | /// Record that a request completed for a domain. |
| 354 | /// | ||
| 355 | /// Decrements in-flight count, cleans up old timestamps, and triggers | ||
| 356 | /// processing of queued identifiers if capacity is available. | ||
| 357 | /// | ||
| 358 | /// # Arguments | ||
| 359 | /// * `domain` - The domain that completed a request | ||
| 360 | pub fn complete_request(self: &Arc<Self>, domain: &str) { | ||
| 361 | let should_trigger = { | ||
| 362 | if let Some(entry) = self.throttles.get(domain) { | ||
| 363 | let mut throttle = entry.lock().unwrap(); | ||
| 364 | throttle.complete_request(); | ||
| 365 | throttle.has_capacity() && throttle.has_queued_work() | ||
| 366 | } else { | ||
| 367 | false | ||
| 368 | } | ||
| 369 | }; | ||
| 370 | |||
| 371 | if should_trigger { | ||
| 372 | self.try_process_next(domain); | ||
| 373 | } | ||
| 374 | } | ||
| 375 | |||
| 376 | /// Add an identifier to a domain's waiting queue (internal, no trigger). | ||
| 328 | /// | 377 | /// |
| 329 | /// If the identifier is already queued for this domain, merges the tried_urls sets. | 378 | /// If the identifier is already queued for this domain, merges the tried_urls sets. |
| 379 | /// Does not trigger processing. | ||
| 380 | fn enqueue_identifier_internal( | ||
| 381 | &self, | ||
| 382 | domain: &str, | ||
| 383 | identifier: String, | ||
| 384 | tried_urls_for_domain: HashSet<String>, | ||
| 385 | ) { | ||
| 386 | let entry = self.get_or_create_throttle(domain); | ||
| 387 | let mut throttle = entry.lock().unwrap(); | ||
| 388 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | ||
| 389 | } | ||
| 390 | |||
| 391 | /// Add an identifier to a domain's waiting queue. | ||
| 330 | /// | 392 | /// |
| 331 | /// Note: Trigger-based processing (spawning tasks when capacity available) | 393 | /// If the identifier is already queued for this domain, merges the tried_urls sets. |
| 332 | /// will be added in Phase 6 after `SyncContext` is available. | 394 | /// Triggers processing if capacity is available. |
| 333 | /// | 395 | /// |
| 334 | /// # Arguments | 396 | /// # Arguments |
| 335 | /// * `domain` - The domain to queue for | 397 | /// * `domain` - The domain to queue for |
| 336 | /// * `identifier` - The repository identifier | 398 | /// * `identifier` - The repository identifier |
| 337 | /// * `tried_urls_for_domain` - URLs from this domain that have already been tried | 399 | /// * `tried_urls_for_domain` - URLs from this domain that have already been tried |
| 338 | pub fn enqueue_identifier( | 400 | pub fn enqueue_identifier( |
| 339 | &self, | 401 | self: &Arc<Self>, |
| 340 | domain: &str, | 402 | domain: &str, |
| 341 | identifier: String, | 403 | identifier: String, |
| 342 | tried_urls_for_domain: HashSet<String>, | 404 | tried_urls_for_domain: HashSet<String>, |
| 343 | ) { | 405 | ) { |
| 344 | let entry = self.get_or_create_throttle(domain); | 406 | let should_trigger = { |
| 345 | let mut throttle = entry.lock().unwrap(); | 407 | let entry = self.get_or_create_throttle(domain); |
| 346 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | 408 | let mut throttle = entry.lock().unwrap(); |
| 409 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | ||
| 410 | throttle.has_capacity() | ||
| 411 | }; | ||
| 412 | |||
| 413 | if should_trigger { | ||
| 414 | self.try_process_next(domain); | ||
| 415 | } | ||
| 416 | } | ||
| 417 | |||
| 418 | /// Try to process the next queued identifier for a domain. | ||
| 419 | /// | ||
| 420 | /// This is called when capacity becomes available (either via `complete_request` | ||
| 421 | /// or when a new identifier is enqueued). Spawns a task to process the next | ||
| 422 | /// ready identifier if one exists. | ||
| 423 | fn try_process_next(self: &Arc<Self>, domain: &str) { | ||
| 424 | // Get next ready identifier (not in_progress) | ||
| 425 | let identifier = { | ||
| 426 | if let Some(entry) = self.throttles.get(domain) { | ||
| 427 | let mut throttle = entry.lock().unwrap(); | ||
| 428 | throttle.next_ready_identifier() | ||
| 429 | } else { | ||
| 430 | None | ||
| 431 | } | ||
| 432 | }; | ||
| 433 | |||
| 434 | if let Some(identifier) = identifier { | ||
| 435 | let manager = self.clone(); | ||
| 436 | let domain = domain.to_string(); | ||
| 437 | |||
| 438 | tokio::spawn(async move { | ||
| 439 | manager.process_queued_identifier(&domain, &identifier).await; | ||
| 440 | }); | ||
| 441 | } | ||
| 442 | } | ||
| 443 | |||
| 444 | /// Process a single identifier from a domain's queue. | ||
| 445 | /// | ||
| 446 | /// This function: | ||
| 447 | /// 1. Gets the next URL to try for this identifier on this domain | ||
| 448 | /// 2. If a URL is found, fetches from it and marks it as tried | ||
| 449 | /// 3. If no URL is found, removes the identifier from this domain's queue | ||
| 450 | /// 4. Triggers processing of the next identifier if capacity is available | ||
| 451 | async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str) { | ||
| 452 | let ctx = match self.ctx.get() { | ||
| 453 | Some(ctx) => ctx, | ||
| 454 | None => { | ||
| 455 | debug!( | ||
| 456 | domain = %domain, | ||
| 457 | identifier = %identifier, | ||
| 458 | "No sync context set - cannot process queued identifier" | ||
| 459 | ); | ||
| 460 | // Mark not in progress so it can be retried | ||
| 461 | if let Some(entry) = self.throttles.get(domain) { | ||
| 462 | let mut throttle = entry.lock().unwrap(); | ||
| 463 | throttle.mark_identifier_not_in_progress(identifier); | ||
| 464 | } | ||
| 465 | return; | ||
| 466 | } | ||
| 467 | }; | ||
| 468 | |||
| 469 | // Get tried URLs for this identifier on this domain | ||
| 470 | let tried_urls = { | ||
| 471 | self.throttles | ||
| 472 | .get(domain) | ||
| 473 | .map(|entry| { | ||
| 474 | let throttle = entry.lock().unwrap(); | ||
| 475 | throttle.get_tried_urls(identifier) | ||
| 476 | }) | ||
| 477 | .unwrap_or_default() | ||
| 478 | }; | ||
| 479 | |||
| 480 | // Get next URL for this identifier on this specific domain | ||
| 481 | let url = sync_identifier_next_url( | ||
| 482 | ctx.as_ref(), | ||
| 483 | identifier, | ||
| 484 | Some(domain), | ||
| 485 | &tried_urls, | ||
| 486 | self, | ||
| 487 | ) | ||
| 488 | .await; | ||
| 489 | |||
| 490 | match url { | ||
| 491 | Some(url) => { | ||
| 492 | debug!( | ||
| 493 | domain = %domain, | ||
| 494 | identifier = %identifier, | ||
| 495 | url = %url, | ||
| 496 | "Processing queued identifier - fetching from URL" | ||
| 497 | ); | ||
| 498 | |||
| 499 | // Fetch from this URL | ||
| 500 | sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await; | ||
| 501 | |||
| 502 | // Record URL as tried and mark not in_progress | ||
| 503 | if let Some(entry) = self.throttles.get(domain) { | ||
| 504 | let mut throttle = entry.lock().unwrap(); | ||
| 505 | throttle.mark_url_tried(identifier, url); | ||
| 506 | throttle.mark_identifier_not_in_progress(identifier); | ||
| 507 | } | ||
| 508 | |||
| 509 | // complete_request was already called by sync_identifier_from_url, | ||
| 510 | // which will trigger try_process_next if capacity is available | ||
| 511 | } | ||
| 512 | None => { | ||
| 513 | debug!( | ||
| 514 | domain = %domain, | ||
| 515 | identifier = %identifier, | ||
| 516 | "No more URLs for identifier on this domain - removing from queue" | ||
| 517 | ); | ||
| 518 | |||
| 519 | // No more URLs for this identifier on this domain - remove from queue | ||
| 520 | if let Some(entry) = self.throttles.get(domain) { | ||
| 521 | let mut throttle = entry.lock().unwrap(); | ||
| 522 | throttle.remove_identifier(identifier); | ||
| 523 | } | ||
| 524 | |||
| 525 | // Try next identifier since we didn't use any capacity | ||
| 526 | self.try_process_next(domain); | ||
| 527 | } | ||
| 528 | } | ||
| 347 | } | 529 | } |
| 348 | } | 530 | } |
| 349 | 531 | ||
| @@ -523,8 +705,8 @@ mod tests { | |||
| 523 | // Should now be throttled | 705 | // Should now be throttled |
| 524 | assert!(manager.is_throttled("example.com")); | 706 | assert!(manager.is_throttled("example.com")); |
| 525 | 707 | ||
| 526 | // Complete one request | 708 | // Complete one request (using internal method for non-Arc test) |
| 527 | manager.complete_request("example.com"); | 709 | manager.complete_request_internal("example.com"); |
| 528 | 710 | ||
| 529 | // Should have capacity again | 711 | // Should have capacity again |
| 530 | assert!(!manager.is_throttled("example.com")); | 712 | assert!(!manager.is_throttled("example.com")); |
| @@ -540,8 +722,8 @@ mod tests { | |||
| 540 | // Domain doesn't exist yet | 722 | // Domain doesn't exist yet |
| 541 | assert!(!manager.throttles.contains_key("example.com")); | 723 | assert!(!manager.throttles.contains_key("example.com")); |
| 542 | 724 | ||
| 543 | // Enqueue an identifier | 725 | // Enqueue an identifier (using internal method for non-Arc test) |
| 544 | manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); | 726 | manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); |
| 545 | 727 | ||
| 546 | // Domain throttle should now exist | 728 | // Domain throttle should now exist |
| 547 | assert!(manager.throttles.contains_key("example.com")); | 729 | assert!(manager.throttles.contains_key("example.com")); |
| @@ -560,4 +742,28 @@ mod tests { | |||
| 560 | // Domain throttle should now exist | 742 | // Domain throttle should now exist |
| 561 | assert!(manager.throttles.contains_key("example.com")); | 743 | assert!(manager.throttles.contains_key("example.com")); |
| 562 | } | 744 | } |
| 745 | |||
| 746 | #[test] | ||
| 747 | fn has_queued_work_reflects_queue_state() { | ||
| 748 | let manager = ThrottleManager::new(5, 100); | ||
| 749 | |||
| 750 | // Initially no queued work | ||
| 751 | let has_work = manager | ||
| 752 | .throttles | ||
| 753 | .get("example.com") | ||
| 754 | .map(|e| e.lock().unwrap().has_queued_work()) | ||
| 755 | .unwrap_or(false); | ||
| 756 | assert!(!has_work); | ||
| 757 | |||
| 758 | // Enqueue an identifier | ||
| 759 | manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new()); | ||
| 760 | |||
| 761 | // Now should have queued work | ||
| 762 | let has_work = manager | ||
| 763 | .throttles | ||
| 764 | .get("example.com") | ||
| 765 | .map(|e| e.lock().unwrap().has_queued_work()) | ||
| 766 | .unwrap_or(false); | ||
| 767 | assert!(has_work); | ||
| 768 | } | ||
| 563 | } | 769 | } |