diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:57:55 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:57:55 +0000 |
| commit | 8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 (patch) | |
| tree | 0e85dce4da81e57aa26ead9810589fe0f44f89ca /src/purgatory/sync/functions.rs | |
| parent | 1c5dac680b5a446e26b161208a17030f5fbd8a88 (diff) | |
Add sync_identifier orchestration and ThrottleManager queue processing
Implement the main sync orchestration function and trigger-based queue
processing for throttled domains:
sync_identifier function:
- Orchestrates syncing git data for a single identifier
- Tries all non-throttled URLs in sequence
- Checks completion after each fetch (no pending events or all OIDs fetched)
- Enqueues with throttled domains when non-throttled URLs are exhausted
- Returns true if complete, false if events remain (for backoff)
ThrottleManager enhancements:
- Add set_context() to provide SyncContext for queue processing
- Add try_process_next() to spawn tasks when capacity frees
- Add process_queued_identifier() to handle queued work
- Update complete_request() to trigger processing on completion
- Update enqueue_identifier() to trigger processing when capacity available
- Add internal methods for non-Arc testing compatibility
Generic function updates:
- Add ?Sized bound to sync_identifier_next_url, sync_identifier_from_url,
sync_identifier, and get_throttled_domains_with_untried_urls for
dynamic dispatch support (Arc<dyn SyncContext>)
Tests:
- sync_identifier_tries_multiple_urls_until_complete: verifies sequential
URL fetching until all OIDs are available
- sync_identifier_enqueues_throttled_domains_when_incomplete: verifies
throttled domains get the identifier enqueued for later processing
- has_queued_work_reflects_queue_state: verifies queue state tracking
Diffstat (limited to 'src/purgatory/sync/functions.rs')
| -rw-r--r-- | src/purgatory/sync/functions.rs | 228 |
1 files changed, 225 insertions, 3 deletions
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index de846f3..751dd5e 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs | |||
| @@ -55,7 +55,7 @@ fn extract_domain(url: &str) -> Option<String> { | |||
| 55 | /// | 55 | /// |
| 56 | /// * `Some(url)` - The next URL to try | 56 | /// * `Some(url)` - The next URL to try |
| 57 | /// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) | 57 | /// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) |
| 58 | pub async fn sync_identifier_next_url<C: SyncContext>( | 58 | pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>( |
| 59 | ctx: &C, | 59 | ctx: &C, |
| 60 | identifier: &str, | 60 | identifier: &str, |
| 61 | domain: Option<&str>, | 61 | domain: Option<&str>, |
| @@ -178,7 +178,7 @@ pub struct ThrottledDomainInfo { | |||
| 178 | /// | 178 | /// |
| 179 | /// A list of throttled domains that still have untried URLs, along with | 179 | /// A list of throttled domains that still have untried URLs, along with |
| 180 | /// the tried URLs for each domain (for proper queue state). | 180 | /// the tried URLs for each domain (for proper queue state). |
| 181 | pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>( | 181 | pub async fn get_throttled_domains_with_untried_urls<C: SyncContext + ?Sized>( |
| 182 | ctx: &C, | 182 | ctx: &C, |
| 183 | identifier: &str, | 183 | identifier: &str, |
| 184 | tried_urls: &HashSet<String>, | 184 | tried_urls: &HashSet<String>, |
| @@ -254,7 +254,7 @@ pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>( | |||
| 254 | /// # Returns | 254 | /// # Returns |
| 255 | /// | 255 | /// |
| 256 | /// The number of OIDs successfully fetched (0 on failure) | 256 | /// The number of OIDs successfully fetched (0 on failure) |
| 257 | pub async fn sync_identifier_from_url<C: SyncContext>( | 257 | pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( |
| 258 | ctx: &C, | 258 | ctx: &C, |
| 259 | identifier: &str, | 259 | identifier: &str, |
| 260 | url: &str, | 260 | url: &str, |
| @@ -347,6 +347,128 @@ pub async fn sync_identifier_from_url<C: SyncContext>( | |||
| 347 | oids_fetched | 347 | oids_fetched |
| 348 | } | 348 | } |
| 349 | 349 | ||
| 350 | /// Sync git data for an identifier. | ||
| 351 | /// | ||
| 352 | /// This is the main orchestration function called by the sync loop. It: | ||
| 353 | /// 1. Tries all non-throttled URLs in sequence | ||
| 354 | /// 2. After each fetch, checks if sync is complete (no pending events or no needed OIDs) | ||
| 355 | /// 3. When no non-throttled URLs remain, enqueues with throttled domains for later processing | ||
| 356 | /// 4. Returns without waiting for throttled domains to complete | ||
| 357 | /// | ||
| 358 | /// # Arguments | ||
| 359 | /// | ||
| 360 | /// * `ctx` - The sync context providing repository data and OID information | ||
| 361 | /// * `identifier` - The repository identifier (d-tag value) | ||
| 362 | /// * `throttle_manager` - Used for rate limiting and domain queue management | ||
| 363 | /// | ||
| 364 | /// # Returns | ||
| 365 | /// | ||
| 366 | /// * `true` - Sync completed (no pending events or all OIDs fetched) | ||
| 367 | /// * `false` - Events remain in purgatory (will be retried after backoff, or processed | ||
| 368 | /// by throttled domain queues) | ||
| 369 | pub async fn sync_identifier<C: SyncContext + ?Sized>( | ||
| 370 | ctx: &C, | ||
| 371 | identifier: &str, | ||
| 372 | throttle_manager: &Arc<ThrottleManager>, | ||
| 373 | ) -> bool { | ||
| 374 | let mut tried_urls: HashSet<String> = HashSet::new(); | ||
| 375 | |||
| 376 | debug!( | ||
| 377 | identifier = %identifier, | ||
| 378 | "Starting sync for identifier" | ||
| 379 | ); | ||
| 380 | |||
| 381 | // Try all non-throttled URLs | ||
| 382 | loop { | ||
| 383 | match sync_identifier_next_url(ctx, identifier, None, &tried_urls, throttle_manager).await { | ||
| 384 | Some(url) => { | ||
| 385 | debug!( | ||
| 386 | identifier = %identifier, | ||
| 387 | url = %url, | ||
| 388 | "Found non-throttled URL to try" | ||
| 389 | ); | ||
| 390 | |||
| 391 | // Fetch from this URL | ||
| 392 | sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await; | ||
| 393 | tried_urls.insert(url); | ||
| 394 | |||
| 395 | // Check if sync is now complete | ||
| 396 | if !ctx.has_pending_events(identifier) { | ||
| 397 | debug!( | ||
| 398 | identifier = %identifier, | ||
| 399 | "Sync complete - no pending events" | ||
| 400 | ); | ||
| 401 | return true; | ||
| 402 | } | ||
| 403 | |||
| 404 | let needed_oids = ctx.collect_needed_oids(identifier); | ||
| 405 | if needed_oids.is_empty() { | ||
| 406 | debug!( | ||
| 407 | identifier = %identifier, | ||
| 408 | "Sync complete - all OIDs available" | ||
| 409 | ); | ||
| 410 | return true; | ||
| 411 | } | ||
| 412 | |||
| 413 | // Continue trying more URLs | ||
| 414 | } | ||
| 415 | None => { | ||
| 416 | // No more non-throttled URLs available | ||
| 417 | debug!( | ||
| 418 | identifier = %identifier, | ||
| 419 | tried_count = tried_urls.len(), | ||
| 420 | "No more non-throttled URLs available" | ||
| 421 | ); | ||
| 422 | break; | ||
| 423 | } | ||
| 424 | } | ||
| 425 | } | ||
| 426 | |||
| 427 | // Check if we're done (no pending events or no needed OIDs) | ||
| 428 | if !ctx.has_pending_events(identifier) { | ||
| 429 | debug!( | ||
| 430 | identifier = %identifier, | ||
| 431 | "Sync complete after exhausting URLs - no pending events" | ||
| 432 | ); | ||
| 433 | return true; | ||
| 434 | } | ||
| 435 | |||
| 436 | let needed_oids = ctx.collect_needed_oids(identifier); | ||
| 437 | if needed_oids.is_empty() { | ||
| 438 | debug!( | ||
| 439 | identifier = %identifier, | ||
| 440 | "Sync complete after exhausting URLs - all OIDs available" | ||
| 441 | ); | ||
| 442 | return true; | ||
| 443 | } | ||
| 444 | |||
| 445 | // Enqueue with any throttled domains that have untried URLs | ||
| 446 | let throttled_domains = | ||
| 447 | get_throttled_domains_with_untried_urls(ctx, identifier, &tried_urls, throttle_manager) | ||
| 448 | .await; | ||
| 449 | |||
| 450 | for info in throttled_domains { | ||
| 451 | debug!( | ||
| 452 | identifier = %identifier, | ||
| 453 | domain = %info.domain, | ||
| 454 | "Enqueueing identifier with throttled domain" | ||
| 455 | ); | ||
| 456 | throttle_manager.enqueue_identifier( | ||
| 457 | &info.domain, | ||
| 458 | identifier.to_string(), | ||
| 459 | info.tried_urls_for_domain, | ||
| 460 | ); | ||
| 461 | } | ||
| 462 | |||
| 463 | // Return false - events remain, will retry after backoff | ||
| 464 | // (throttled domains will process independently) | ||
| 465 | debug!( | ||
| 466 | identifier = %identifier, | ||
| 467 | "Sync incomplete - returning false for backoff" | ||
| 468 | ); | ||
| 469 | false | ||
| 470 | } | ||
| 471 | |||
| 350 | #[cfg(test)] | 472 | #[cfg(test)] |
| 351 | mod tests { | 473 | mod tests { |
| 352 | use super::*; | 474 | use super::*; |
| @@ -617,4 +739,104 @@ mod tests { | |||
| 617 | assert_eq!(throttled[0].domain, "gitlab.com"); | 739 | assert_eq!(throttled[0].domain, "gitlab.com"); |
| 618 | assert!(throttled[0].tried_urls_for_domain.is_empty()); | 740 | assert!(throttled[0].tried_urls_for_domain.is_empty()); |
| 619 | } | 741 | } |
| 742 | |||
| 743 | // ========================================================================= | ||
| 744 | // Phase 6: sync_identifier tests | ||
| 745 | // ========================================================================= | ||
| 746 | |||
| 747 | #[tokio::test] | ||
| 748 | async fn sync_identifier_tries_multiple_urls_until_complete() { | ||
| 749 | // Set up mock with 3 URLs, each providing partial OIDs | ||
| 750 | // URL1 provides abc123, URL2 provides def456, URL3 provides ghi789 | ||
| 751 | let mock = MockSyncContext::new() | ||
| 752 | .with_urls(&[ | ||
| 753 | "https://server1.com/repo.git", | ||
| 754 | "https://server2.com/repo.git", | ||
| 755 | "https://server3.com/repo.git", | ||
| 756 | ]) | ||
| 757 | .with_needed_oids(&["abc123", "def456", "ghi789"]) | ||
| 758 | .with_pending_events(true) | ||
| 759 | .url_provides("https://server1.com/repo.git", &["abc123"]) | ||
| 760 | .url_provides("https://server2.com/repo.git", &["def456"]) | ||
| 761 | .url_provides("https://server3.com/repo.git", &["ghi789"]); | ||
| 762 | |||
| 763 | let throttle_manager = Arc::new(ThrottleManager::new(5, 100)); | ||
| 764 | |||
| 765 | // Run sync_identifier | ||
| 766 | let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; | ||
| 767 | |||
| 768 | // Should return true (sync complete) | ||
| 769 | assert!(complete, "Expected sync to complete after trying all URLs"); | ||
| 770 | |||
| 771 | // Should have tried all 3 URLs | ||
| 772 | let fetch_log = mock.fetch_log(); | ||
| 773 | assert_eq!( | ||
| 774 | fetch_log.len(), | ||
| 775 | 3, | ||
| 776 | "Expected 3 fetch attempts, got: {:?}", | ||
| 777 | fetch_log | ||
| 778 | ); | ||
| 779 | |||
| 780 | // All OIDs should now be fetched | ||
| 781 | assert!( | ||
| 782 | mock.current_needed_oids().is_empty(), | ||
| 783 | "Expected all OIDs to be fetched" | ||
| 784 | ); | ||
| 785 | } | ||
| 786 | |||
| 787 | #[tokio::test] | ||
| 788 | async fn sync_identifier_enqueues_throttled_domains_when_incomplete() { | ||
| 789 | // Set up mock with URLs from two domains | ||
| 790 | // Only github.com can provide the OID, but it will be throttled | ||
| 791 | let mock = MockSyncContext::new() | ||
| 792 | .with_urls(&[ | ||
| 793 | "https://github.com/foo/bar.git", | ||
| 794 | "https://gitlab.com/foo/bar.git", | ||
| 795 | ]) | ||
| 796 | .with_needed_oids(&["abc123"]) | ||
| 797 | .with_pending_events(true) | ||
| 798 | .url_provides("https://github.com/foo/bar.git", &["abc123"]); | ||
| 799 | // Note: gitlab.com doesn't provide any OIDs | ||
| 800 | |||
| 801 | let throttle_manager = Arc::new(ThrottleManager::new(1, 100)); | ||
| 802 | |||
| 803 | // Throttle github.com by starting a request | ||
| 804 | throttle_manager.start_request("github.com"); | ||
| 805 | |||
| 806 | // Run sync_identifier | ||
| 807 | let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; | ||
| 808 | |||
| 809 | // Should return false (sync incomplete - github.com is throttled) | ||
| 810 | assert!( | ||
| 811 | !complete, | ||
| 812 | "Expected sync to be incomplete when required domain is throttled" | ||
| 813 | ); | ||
| 814 | |||
| 815 | // Should have tried gitlab.com (not throttled) but it doesn't have the OID | ||
| 816 | let fetch_log = mock.fetch_log(); | ||
| 817 | assert_eq!( | ||
| 818 | fetch_log.len(), | ||
| 819 | 1, | ||
| 820 | "Expected 1 fetch attempt (gitlab.com), got: {:?}", | ||
| 821 | fetch_log | ||
| 822 | ); | ||
| 823 | assert!( | ||
| 824 | fetch_log[0].contains("gitlab.com"), | ||
| 825 | "Expected gitlab.com to be tried first" | ||
| 826 | ); | ||
| 827 | |||
| 828 | // OID should still be needed | ||
| 829 | assert!( | ||
| 830 | mock.current_needed_oids().contains("abc123"), | ||
| 831 | "Expected OID to still be needed" | ||
| 832 | ); | ||
| 833 | |||
| 834 | // github.com should have the identifier enqueued | ||
| 835 | // We can verify this by checking if github.com is still throttled (it should be, | ||
| 836 | // since the identifier was enqueued but not processed yet) | ||
| 837 | assert!( | ||
| 838 | throttle_manager.is_throttled("github.com"), | ||
| 839 | "Expected github.com to still be throttled" | ||
| 840 | ); | ||
| 841 | } | ||
| 620 | } | 842 | } |