From 8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 11:57:55 +0000 Subject: Add sync_identifier orchestration and ThrottleManager queue processing Implement the main sync orchestration function and trigger-based queue processing for throttled domains: sync_identifier function: - Orchestrates syncing git data for a single identifier - Tries all non-throttled URLs in sequence - Checks completion after each fetch (no pending events or all OIDs fetched) - Enqueues with throttled domains when non-throttled URLs are exhausted - Returns true if complete, false if events remain (for backoff) ThrottleManager enhancements: - Add set_context() to provide SyncContext for queue processing - Add try_process_next() to spawn tasks when capacity frees - Add process_queued_identifier() to handle queued work - Update complete_request() to trigger processing on completion - Update enqueue_identifier() to trigger processing when capacity available - Add internal methods for non-Arc testing compatibility Generic function updates: - Add ?Sized bound to sync_identifier_next_url, sync_identifier_from_url, sync_identifier, and get_throttled_domains_with_untried_urls for dynamic dispatch support (Arc) Tests: - sync_identifier_tries_multiple_urls_until_complete: verifies sequential URL fetching until all OIDs are available - sync_identifier_enqueues_throttled_domains_when_incomplete: verifies throttled domains get the identifier enqueued for later processing - has_queued_work_reflects_queue_state: verifies queue state tracking --- src/purgatory/sync/functions.rs | 228 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 225 insertions(+), 3 deletions(-) (limited to 'src/purgatory/sync/functions.rs') diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index de846f3..751dd5e 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs @@ -55,7 +55,7 @@ fn extract_domain(url: &str) -> Option { /// /// * `Some(url)` - The next URL to try /// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) -pub async fn sync_identifier_next_url( +pub async fn sync_identifier_next_url( ctx: &C, identifier: &str, domain: Option<&str>, @@ -178,7 +178,7 @@ pub struct ThrottledDomainInfo { /// /// A list of throttled domains that still have untried URLs, along with /// the tried URLs for each domain (for proper queue state). -pub async fn get_throttled_domains_with_untried_urls( +pub async fn get_throttled_domains_with_untried_urls( ctx: &C, identifier: &str, tried_urls: &HashSet, @@ -254,7 +254,7 @@ pub async fn get_throttled_domains_with_untried_urls( /// # Returns /// /// The number of OIDs successfully fetched (0 on failure) -pub async fn sync_identifier_from_url( +pub async fn sync_identifier_from_url( ctx: &C, identifier: &str, url: &str, @@ -347,6 +347,128 @@ pub async fn sync_identifier_from_url( oids_fetched } +/// Sync git data for an identifier. +/// +/// This is the main orchestration function called by the sync loop. It: +/// 1. Tries all non-throttled URLs in sequence +/// 2. After each fetch, checks if sync is complete (no pending events or no needed OIDs) +/// 3. When no non-throttled URLs remain, enqueues with throttled domains for later processing +/// 4. Returns without waiting for throttled domains to complete +/// +/// # Arguments +/// +/// * `ctx` - The sync context providing repository data and OID information +/// * `identifier` - The repository identifier (d-tag value) +/// * `throttle_manager` - Used for rate limiting and domain queue management +/// +/// # Returns +/// +/// * `true` - Sync completed (no pending events or all OIDs fetched) +/// * `false` - Events remain in purgatory (will be retried after backoff, or processed +/// by throttled domain queues) +pub async fn sync_identifier( + ctx: &C, + identifier: &str, + throttle_manager: &Arc, +) -> bool { + let mut tried_urls: HashSet = HashSet::new(); + + debug!( + identifier = %identifier, + "Starting sync for identifier" + ); + + // Try all non-throttled URLs + loop { + match sync_identifier_next_url(ctx, identifier, None, &tried_urls, throttle_manager).await { + Some(url) => { + debug!( + identifier = %identifier, + url = %url, + "Found non-throttled URL to try" + ); + + // Fetch from this URL + sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await; + tried_urls.insert(url); + + // Check if sync is now complete + if !ctx.has_pending_events(identifier) { + debug!( + identifier = %identifier, + "Sync complete - no pending events" + ); + return true; + } + + let needed_oids = ctx.collect_needed_oids(identifier); + if needed_oids.is_empty() { + debug!( + identifier = %identifier, + "Sync complete - all OIDs available" + ); + return true; + } + + // Continue trying more URLs + } + None => { + // No more non-throttled URLs available + debug!( + identifier = %identifier, + tried_count = tried_urls.len(), + "No more non-throttled URLs available" + ); + break; + } + } + } + + // Check if we're done (no pending events or no needed OIDs) + if !ctx.has_pending_events(identifier) { + debug!( + identifier = %identifier, + "Sync complete after exhausting URLs - no pending events" + ); + return true; + } + + let needed_oids = ctx.collect_needed_oids(identifier); + if needed_oids.is_empty() { + debug!( + identifier = %identifier, + "Sync complete after exhausting URLs - all OIDs available" + ); + return true; + } + + // Enqueue with any throttled domains that have untried URLs + let throttled_domains = + get_throttled_domains_with_untried_urls(ctx, identifier, &tried_urls, throttle_manager) + .await; + + for info in throttled_domains { + debug!( + identifier = %identifier, + domain = %info.domain, + "Enqueueing identifier with throttled domain" + ); + throttle_manager.enqueue_identifier( + &info.domain, + identifier.to_string(), + info.tried_urls_for_domain, + ); + } + + // Return false - events remain, will retry after backoff + // (throttled domains will process independently) + debug!( + identifier = %identifier, + "Sync incomplete - returning false for backoff" + ); + false +} + #[cfg(test)] mod tests { use super::*; @@ -617,4 +739,104 @@ mod tests { assert_eq!(throttled[0].domain, "gitlab.com"); assert!(throttled[0].tried_urls_for_domain.is_empty()); } + + // ========================================================================= + // Phase 6: sync_identifier tests + // ========================================================================= + + #[tokio::test] + async fn sync_identifier_tries_multiple_urls_until_complete() { + // Set up mock with 3 URLs, each providing partial OIDs + // URL1 provides abc123, URL2 provides def456, URL3 provides ghi789 + let mock = MockSyncContext::new() + .with_urls(&[ + "https://server1.com/repo.git", + "https://server2.com/repo.git", + "https://server3.com/repo.git", + ]) + .with_needed_oids(&["abc123", "def456", "ghi789"]) + .with_pending_events(true) + .url_provides("https://server1.com/repo.git", &["abc123"]) + .url_provides("https://server2.com/repo.git", &["def456"]) + .url_provides("https://server3.com/repo.git", &["ghi789"]); + + let throttle_manager = Arc::new(ThrottleManager::new(5, 100)); + + // Run sync_identifier + let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; + + // Should return true (sync complete) + assert!(complete, "Expected sync to complete after trying all URLs"); + + // Should have tried all 3 URLs + let fetch_log = mock.fetch_log(); + assert_eq!( + fetch_log.len(), + 3, + "Expected 3 fetch attempts, got: {:?}", + fetch_log + ); + + // All OIDs should now be fetched + assert!( + mock.current_needed_oids().is_empty(), + "Expected all OIDs to be fetched" + ); + } + + #[tokio::test] + async fn sync_identifier_enqueues_throttled_domains_when_incomplete() { + // Set up mock with URLs from two domains + // Only github.com can provide the OID, but it will be throttled + let mock = MockSyncContext::new() + .with_urls(&[ + "https://github.com/foo/bar.git", + "https://gitlab.com/foo/bar.git", + ]) + .with_needed_oids(&["abc123"]) + .with_pending_events(true) + .url_provides("https://github.com/foo/bar.git", &["abc123"]); + // Note: gitlab.com doesn't provide any OIDs + + let throttle_manager = Arc::new(ThrottleManager::new(1, 100)); + + // Throttle github.com by starting a request + throttle_manager.start_request("github.com"); + + // Run sync_identifier + let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await; + + // Should return false (sync incomplete - github.com is throttled) + assert!( + !complete, + "Expected sync to be incomplete when required domain is throttled" + ); + + // Should have tried gitlab.com (not throttled) but it doesn't have the OID + let fetch_log = mock.fetch_log(); + assert_eq!( + fetch_log.len(), + 1, + "Expected 1 fetch attempt (gitlab.com), got: {:?}", + fetch_log + ); + assert!( + fetch_log[0].contains("gitlab.com"), + "Expected gitlab.com to be tried first" + ); + + // OID should still be needed + assert!( + mock.current_needed_oids().contains("abc123"), + "Expected OID to still be needed" + ); + + // github.com should have the identifier enqueued + // We can verify this by checking if github.com is still throttled (it should be, + // since the identifier was enqueued but not processed yet) + assert!( + throttle_manager.is_throttled("github.com"), + "Expected github.com to still be throttled" + ); + } } -- cgit v1.2.3