upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:57:55 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:57:55 +0000
commit8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 (patch)
tree0e85dce4da81e57aa26ead9810589fe0f44f89ca
parent1c5dac680b5a446e26b161208a17030f5fbd8a88 (diff)
Add sync_identifier orchestration and ThrottleManager queue processing
Implement the main sync orchestration function and trigger-based queue processing for throttled domains: sync_identifier function: - Orchestrates syncing git data for a single identifier - Tries all non-throttled URLs in sequence - Checks completion after each fetch (no pending events or all OIDs fetched) - Enqueues with throttled domains when non-throttled URLs are exhausted - Returns true if complete, false if events remain (for backoff) ThrottleManager enhancements: - Add set_context() to provide SyncContext for queue processing - Add try_process_next() to spawn tasks when capacity frees - Add process_queued_identifier() to handle queued work - Update complete_request() to trigger processing on completion - Update enqueue_identifier() to trigger processing when capacity available - Add internal methods for non-Arc testing compatibility Generic function updates: - Add ?Sized bound to sync_identifier_next_url, sync_identifier_from_url, sync_identifier, and get_throttled_domains_with_untried_urls for dynamic dispatch support (Arc<dyn SyncContext>) Tests: - sync_identifier_tries_multiple_urls_until_complete: verifies sequential URL fetching until all OIDs are available - sync_identifier_enqueues_throttled_domains_when_incomplete: verifies throttled domains get the identifier enqueued for later processing - has_queued_work_reflects_queue_state: verifies queue state tracking
-rw-r--r--src/purgatory/sync/functions.rs228
-rw-r--r--src/purgatory/sync/mod.rs4
-rw-r--r--src/purgatory/sync/throttle.rs278
3 files changed, 469 insertions, 41 deletions
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
index de846f3..751dd5e 100644
--- a/src/purgatory/sync/functions.rs
+++ b/src/purgatory/sync/functions.rs
@@ -55,7 +55,7 @@ fn extract_domain(url: &str) -> Option<String> {
55/// 55///
56/// * `Some(url)` - The next URL to try 56/// * `Some(url)` - The next URL to try
57/// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) 57/// * `None` - No suitable URL found (all tried, all throttled, or no URLs available)
58pub async fn sync_identifier_next_url<C: SyncContext>( 58pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>(
59 ctx: &C, 59 ctx: &C,
60 identifier: &str, 60 identifier: &str,
61 domain: Option<&str>, 61 domain: Option<&str>,
@@ -178,7 +178,7 @@ pub struct ThrottledDomainInfo {
178/// 178///
179/// A list of throttled domains that still have untried URLs, along with 179/// A list of throttled domains that still have untried URLs, along with
180/// the tried URLs for each domain (for proper queue state). 180/// the tried URLs for each domain (for proper queue state).
181pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>( 181pub async fn get_throttled_domains_with_untried_urls<C: SyncContext + ?Sized>(
182 ctx: &C, 182 ctx: &C,
183 identifier: &str, 183 identifier: &str,
184 tried_urls: &HashSet<String>, 184 tried_urls: &HashSet<String>,
@@ -254,7 +254,7 @@ pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>(
254/// # Returns 254/// # Returns
255/// 255///
256/// The number of OIDs successfully fetched (0 on failure) 256/// The number of OIDs successfully fetched (0 on failure)
257pub async fn sync_identifier_from_url<C: SyncContext>( 257pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
258 ctx: &C, 258 ctx: &C,
259 identifier: &str, 259 identifier: &str,
260 url: &str, 260 url: &str,
@@ -347,6 +347,128 @@ pub async fn sync_identifier_from_url<C: SyncContext>(
347 oids_fetched 347 oids_fetched
348} 348}
349 349
350/// Sync git data for an identifier.
351///
352/// This is the main orchestration function called by the sync loop. It:
353/// 1. Tries all non-throttled URLs in sequence
354/// 2. After each fetch, checks if sync is complete (no pending events or no needed OIDs)
355/// 3. When no non-throttled URLs remain, enqueues with throttled domains for later processing
356/// 4. Returns without waiting for throttled domains to complete
357///
358/// # Arguments
359///
360/// * `ctx` - The sync context providing repository data and OID information
361/// * `identifier` - The repository identifier (d-tag value)
362/// * `throttle_manager` - Used for rate limiting and domain queue management
363///
364/// # Returns
365///
366/// * `true` - Sync completed (no pending events or all OIDs fetched)
367/// * `false` - Events remain in purgatory (will be retried after backoff, or processed
368/// by throttled domain queues)
369pub async fn sync_identifier<C: SyncContext + ?Sized>(
370 ctx: &C,
371 identifier: &str,
372 throttle_manager: &Arc<ThrottleManager>,
373) -> bool {
374 let mut tried_urls: HashSet<String> = HashSet::new();
375
376 debug!(
377 identifier = %identifier,
378 "Starting sync for identifier"
379 );
380
381 // Try all non-throttled URLs
382 loop {
383 match sync_identifier_next_url(ctx, identifier, None, &tried_urls, throttle_manager).await {
384 Some(url) => {
385 debug!(
386 identifier = %identifier,
387 url = %url,
388 "Found non-throttled URL to try"
389 );
390
391 // Fetch from this URL
392 sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await;
393 tried_urls.insert(url);
394
395 // Check if sync is now complete
396 if !ctx.has_pending_events(identifier) {
397 debug!(
398 identifier = %identifier,
399 "Sync complete - no pending events"
400 );
401 return true;
402 }
403
404 let needed_oids = ctx.collect_needed_oids(identifier);
405 if needed_oids.is_empty() {
406 debug!(
407 identifier = %identifier,
408 "Sync complete - all OIDs available"
409 );
410 return true;
411 }
412
413 // Continue trying more URLs
414 }
415 None => {
416 // No more non-throttled URLs available
417 debug!(
418 identifier = %identifier,
419 tried_count = tried_urls.len(),
420 "No more non-throttled URLs available"
421 );
422 break;
423 }
424 }
425 }
426
427 // Check if we're done (no pending events or no needed OIDs)
428 if !ctx.has_pending_events(identifier) {
429 debug!(
430 identifier = %identifier,
431 "Sync complete after exhausting URLs - no pending events"
432 );
433 return true;
434 }
435
436 let needed_oids = ctx.collect_needed_oids(identifier);
437 if needed_oids.is_empty() {
438 debug!(
439 identifier = %identifier,
440 "Sync complete after exhausting URLs - all OIDs available"
441 );
442 return true;
443 }
444
445 // Enqueue with any throttled domains that have untried URLs
446 let throttled_domains =
447 get_throttled_domains_with_untried_urls(ctx, identifier, &tried_urls, throttle_manager)
448 .await;
449
450 for info in throttled_domains {
451 debug!(
452 identifier = %identifier,
453 domain = %info.domain,
454 "Enqueueing identifier with throttled domain"
455 );
456 throttle_manager.enqueue_identifier(
457 &info.domain,
458 identifier.to_string(),
459 info.tried_urls_for_domain,
460 );
461 }
462
463 // Return false - events remain, will retry after backoff
464 // (throttled domains will process independently)
465 debug!(
466 identifier = %identifier,
467 "Sync incomplete - returning false for backoff"
468 );
469 false
470}
471
350#[cfg(test)] 472#[cfg(test)]
351mod tests { 473mod tests {
352 use super::*; 474 use super::*;
@@ -617,4 +739,104 @@ mod tests {
617 assert_eq!(throttled[0].domain, "gitlab.com"); 739 assert_eq!(throttled[0].domain, "gitlab.com");
618 assert!(throttled[0].tried_urls_for_domain.is_empty()); 740 assert!(throttled[0].tried_urls_for_domain.is_empty());
619 } 741 }
742
743 // =========================================================================
744 // Phase 6: sync_identifier tests
745 // =========================================================================
746
747 #[tokio::test]
748 async fn sync_identifier_tries_multiple_urls_until_complete() {
749 // Set up mock with 3 URLs, each providing partial OIDs
750 // URL1 provides abc123, URL2 provides def456, URL3 provides ghi789
751 let mock = MockSyncContext::new()
752 .with_urls(&[
753 "https://server1.com/repo.git",
754 "https://server2.com/repo.git",
755 "https://server3.com/repo.git",
756 ])
757 .with_needed_oids(&["abc123", "def456", "ghi789"])
758 .with_pending_events(true)
759 .url_provides("https://server1.com/repo.git", &["abc123"])
760 .url_provides("https://server2.com/repo.git", &["def456"])
761 .url_provides("https://server3.com/repo.git", &["ghi789"]);
762
763 let throttle_manager = Arc::new(ThrottleManager::new(5, 100));
764
765 // Run sync_identifier
766 let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await;
767
768 // Should return true (sync complete)
769 assert!(complete, "Expected sync to complete after trying all URLs");
770
771 // Should have tried all 3 URLs
772 let fetch_log = mock.fetch_log();
773 assert_eq!(
774 fetch_log.len(),
775 3,
776 "Expected 3 fetch attempts, got: {:?}",
777 fetch_log
778 );
779
780 // All OIDs should now be fetched
781 assert!(
782 mock.current_needed_oids().is_empty(),
783 "Expected all OIDs to be fetched"
784 );
785 }
786
787 #[tokio::test]
788 async fn sync_identifier_enqueues_throttled_domains_when_incomplete() {
789 // Set up mock with URLs from two domains
790 // Only github.com can provide the OID, but it will be throttled
791 let mock = MockSyncContext::new()
792 .with_urls(&[
793 "https://github.com/foo/bar.git",
794 "https://gitlab.com/foo/bar.git",
795 ])
796 .with_needed_oids(&["abc123"])
797 .with_pending_events(true)
798 .url_provides("https://github.com/foo/bar.git", &["abc123"]);
799 // Note: gitlab.com doesn't provide any OIDs
800
801 let throttle_manager = Arc::new(ThrottleManager::new(1, 100));
802
803 // Throttle github.com by starting a request
804 throttle_manager.start_request("github.com");
805
806 // Run sync_identifier
807 let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await;
808
809 // Should return false (sync incomplete - github.com is throttled)
810 assert!(
811 !complete,
812 "Expected sync to be incomplete when required domain is throttled"
813 );
814
815 // Should have tried gitlab.com (not throttled) but it doesn't have the OID
816 let fetch_log = mock.fetch_log();
817 assert_eq!(
818 fetch_log.len(),
819 1,
820 "Expected 1 fetch attempt (gitlab.com), got: {:?}",
821 fetch_log
822 );
823 assert!(
824 fetch_log[0].contains("gitlab.com"),
825 "Expected gitlab.com to be tried first"
826 );
827
828 // OID should still be needed
829 assert!(
830 mock.current_needed_oids().contains("abc123"),
831 "Expected OID to still be needed"
832 );
833
834 // github.com should have the identifier enqueued
835 // We can verify this by checking if github.com is still throttled (it should be,
836 // since the identifier was enqueued but not processed yet)
837 assert!(
838 throttle_manager.is_throttled("github.com"),
839 "Expected github.com to still be throttled"
840 );
841 }
620} 842}
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs
index d26c1f0..8ac9216 100644
--- a/src/purgatory/sync/mod.rs
+++ b/src/purgatory/sync/mod.rs
@@ -13,8 +13,8 @@ mod throttle;
13 13
14pub use context::{ProcessResult, SyncContext}; 14pub use context::{ProcessResult, SyncContext};
15pub use functions::{ 15pub use functions::{
16 get_throttled_domains_with_untried_urls, sync_identifier_from_url, sync_identifier_next_url, 16 get_throttled_domains_with_untried_urls, sync_identifier, sync_identifier_from_url,
17 ThrottledDomainInfo, 17 sync_identifier_next_url, ThrottledDomainInfo,
18}; 18};
19pub use queue::SyncQueueEntry; 19pub use queue::SyncQueueEntry;
20pub use throttle::{DomainThrottle, ThrottleManager}; 20pub use throttle::{DomainThrottle, ThrottleManager};
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs
index 94056b5..a310a91 100644
--- a/src/purgatory/sync/throttle.rs
+++ b/src/purgatory/sync/throttle.rs
@@ -8,12 +8,22 @@
8//! 8//!
9//! The `ThrottleManager` owns all `DomainThrottle` instances and provides the 9//! The `ThrottleManager` owns all `DomainThrottle` instances and provides the
10//! interface for checking throttle status and managing identifier queues. 10//! interface for checking throttle status and managing identifier queues.
11//!
12//! ## Trigger-based Processing
13//!
14//! When capacity frees up (via `complete_request`) or a new identifier is enqueued
15//! (via `enqueue_identifier`), the manager automatically spawns tasks to process
16//! queued identifiers. This is trigger-based, not polling-based.
11 17
12use dashmap::DashMap; 18use dashmap::DashMap;
13use indexmap::IndexMap; 19use indexmap::IndexMap;
14use std::collections::{HashSet, VecDeque}; 20use std::collections::{HashSet, VecDeque};
15use std::sync::Mutex; 21use std::sync::{Arc, Mutex, OnceLock};
16use std::time::{Duration, Instant}; 22use std::time::{Duration, Instant};
23use tracing::debug;
24
25use super::context::SyncContext;
26use super::functions::{sync_identifier_from_url, sync_identifier_next_url};
17 27
18/// State for an identifier waiting in a domain's queue. 28/// State for an identifier waiting in a domain's queue.
19/// 29///
@@ -240,9 +250,7 @@ impl DomainThrottle {
240/// - Throttle status checking for `sync_identifier_next_url` 250/// - Throttle status checking for `sync_identifier_next_url`
241/// - Identifier queue management 251/// - Identifier queue management
242/// - Request tracking (start/complete) 252/// - Request tracking (start/complete)
243/// 253/// - Trigger-based queue processing when capacity frees up
244/// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`)
245/// will be added in Phase 6 after `SyncContext` is available.
246pub struct ThrottleManager { 254pub struct ThrottleManager {
247 /// Per-domain throttle state. 255 /// Per-domain throttle state.
248 /// Uses DashMap for concurrent access from multiple sync tasks. 256 /// Uses DashMap for concurrent access from multiple sync tasks.
@@ -253,6 +261,10 @@ pub struct ThrottleManager {
253 261
254 /// Maximum requests per minute per domain. 262 /// Maximum requests per minute per domain.
255 max_per_minute_per_domain: u32, 263 max_per_minute_per_domain: u32,
264
265 /// Sync context for processing queued identifiers.
266 /// Set once at startup via `set_context()`.
267 ctx: OnceLock<Arc<dyn SyncContext>>,
256} 268}
257 269
258impl ThrottleManager { 270impl ThrottleManager {
@@ -266,38 +278,54 @@ impl ThrottleManager {
266 throttles: DashMap::new(), 278 throttles: DashMap::new(),
267 max_concurrent_per_domain: max_concurrent, 279 max_concurrent_per_domain: max_concurrent,
268 max_per_minute_per_domain: max_per_minute, 280 max_per_minute_per_domain: max_per_minute,
281 ctx: OnceLock::new(),
269 } 282 }
270 } 283 }
271 284
285 /// Set the sync context (called once at startup).
286 ///
287 /// The context is used for processing queued identifiers when capacity
288 /// becomes available. Must be called before any trigger-based processing
289 /// can occur.
290 ///
291 /// # Arguments
292 /// * `ctx` - The sync context implementation
293 pub fn set_context(&self, ctx: Arc<dyn SyncContext>) {
294 let _ = self.ctx.set(ctx);
295 }
296
272 /// Check if a domain is currently throttled (at capacity). 297 /// Check if a domain is currently throttled (at capacity).
273 /// 298 ///
274 /// Returns true if the domain has no capacity for another request, 299 /// Returns true if the domain has no capacity for another request,
275 /// either due to concurrent limit or rate limit. 300 /// either due to concurrent limit or rate limit.
276 pub fn is_throttled(&self, domain: &str) -> bool { 301 pub fn is_throttled(&self, domain: &str) -> bool {
277 self.throttles 302 self.throttles.get(domain).map_or(false, |entry| {
278 .get(domain) 303 let throttle = entry.lock().unwrap();
279 .map_or(false, |entry| { 304 !throttle.has_capacity()
280 let throttle = entry.lock().unwrap(); 305 })
281 !throttle.has_capacity()
282 })
283 } 306 }
284 307
285 /// Get or create a throttle for a domain. 308 /// Get or create a throttle for a domain.
286 fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> { 309 fn get_or_create_throttle(
310 &self,
311 domain: &str,
312 ) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> {
287 // First, try to get existing 313 // First, try to get existing
288 if let Some(entry) = self.throttles.get(domain) { 314 if let Some(entry) = self.throttles.get(domain) {
289 return entry; 315 return entry;
290 } 316 }
291 317
292 // Create new throttle 318 // Create new throttle
293 self.throttles.entry(domain.to_string()).or_insert_with(|| { 319 self.throttles
294 Mutex::new(DomainThrottle::new( 320 .entry(domain.to_string())
295 domain.to_string(), 321 .or_insert_with(|| {
296 self.max_concurrent_per_domain, 322 Mutex::new(DomainThrottle::new(
297 self.max_per_minute_per_domain, 323 domain.to_string(),
298 )) 324 self.max_concurrent_per_domain,
299 }); 325 self.max_per_minute_per_domain,
300 326 ))
327 });
328
301 // Return the entry (we know it exists now) 329 // Return the entry (we know it exists now)
302 self.throttles.get(domain).unwrap() 330 self.throttles.get(domain).unwrap()
303 } 331 }
@@ -311,39 +339,193 @@ impl ThrottleManager {
311 throttle.start_request(); 339 throttle.start_request();
312 } 340 }
313 341
314 /// Record that a request completed for a domain. 342 /// Record that a request completed for a domain (internal, no trigger).
315 /// 343 ///
316 /// Decrements in-flight count and cleans up old timestamps. 344 /// Decrements in-flight count and cleans up old timestamps.
317 /// 345 /// Does not trigger processing of queued identifiers.
318 /// Note: Trigger-based processing (spawning tasks when capacity frees) 346 fn complete_request_internal(&self, domain: &str) {
319 /// will be added in Phase 6 after `SyncContext` is available.
320 pub fn complete_request(&self, domain: &str) {
321 if let Some(entry) = self.throttles.get(domain) { 347 if let Some(entry) = self.throttles.get(domain) {
322 let mut throttle = entry.lock().unwrap(); 348 let mut throttle = entry.lock().unwrap();
323 throttle.complete_request(); 349 throttle.complete_request();
324 } 350 }
325 } 351 }
326 352
327 /// Add an identifier to a domain's waiting queue. 353 /// Record that a request completed for a domain.
354 ///
355 /// Decrements in-flight count, cleans up old timestamps, and triggers
356 /// processing of queued identifiers if capacity is available.
357 ///
358 /// # Arguments
359 /// * `domain` - The domain that completed a request
360 pub fn complete_request(self: &Arc<Self>, domain: &str) {
361 let should_trigger = {
362 if let Some(entry) = self.throttles.get(domain) {
363 let mut throttle = entry.lock().unwrap();
364 throttle.complete_request();
365 throttle.has_capacity() && throttle.has_queued_work()
366 } else {
367 false
368 }
369 };
370
371 if should_trigger {
372 self.try_process_next(domain);
373 }
374 }
375
376 /// Add an identifier to a domain's waiting queue (internal, no trigger).
328 /// 377 ///
329 /// If the identifier is already queued for this domain, merges the tried_urls sets. 378 /// If the identifier is already queued for this domain, merges the tried_urls sets.
379 /// Does not trigger processing.
380 fn enqueue_identifier_internal(
381 &self,
382 domain: &str,
383 identifier: String,
384 tried_urls_for_domain: HashSet<String>,
385 ) {
386 let entry = self.get_or_create_throttle(domain);
387 let mut throttle = entry.lock().unwrap();
388 throttle.enqueue_identifier(identifier, tried_urls_for_domain);
389 }
390
391 /// Add an identifier to a domain's waiting queue.
330 /// 392 ///
331 /// Note: Trigger-based processing (spawning tasks when capacity available) 393 /// If the identifier is already queued for this domain, merges the tried_urls sets.
332 /// will be added in Phase 6 after `SyncContext` is available. 394 /// Triggers processing if capacity is available.
333 /// 395 ///
334 /// # Arguments 396 /// # Arguments
335 /// * `domain` - The domain to queue for 397 /// * `domain` - The domain to queue for
336 /// * `identifier` - The repository identifier 398 /// * `identifier` - The repository identifier
337 /// * `tried_urls_for_domain` - URLs from this domain that have already been tried 399 /// * `tried_urls_for_domain` - URLs from this domain that have already been tried
338 pub fn enqueue_identifier( 400 pub fn enqueue_identifier(
339 &self, 401 self: &Arc<Self>,
340 domain: &str, 402 domain: &str,
341 identifier: String, 403 identifier: String,
342 tried_urls_for_domain: HashSet<String>, 404 tried_urls_for_domain: HashSet<String>,
343 ) { 405 ) {
344 let entry = self.get_or_create_throttle(domain); 406 let should_trigger = {
345 let mut throttle = entry.lock().unwrap(); 407 let entry = self.get_or_create_throttle(domain);
346 throttle.enqueue_identifier(identifier, tried_urls_for_domain); 408 let mut throttle = entry.lock().unwrap();
409 throttle.enqueue_identifier(identifier, tried_urls_for_domain);
410 throttle.has_capacity()
411 };
412
413 if should_trigger {
414 self.try_process_next(domain);
415 }
416 }
417
418 /// Try to process the next queued identifier for a domain.
419 ///
420 /// This is called when capacity becomes available (either via `complete_request`
421 /// or when a new identifier is enqueued). Spawns a task to process the next
422 /// ready identifier if one exists.
423 fn try_process_next(self: &Arc<Self>, domain: &str) {
424 // Get next ready identifier (not in_progress)
425 let identifier = {
426 if let Some(entry) = self.throttles.get(domain) {
427 let mut throttle = entry.lock().unwrap();
428 throttle.next_ready_identifier()
429 } else {
430 None
431 }
432 };
433
434 if let Some(identifier) = identifier {
435 let manager = self.clone();
436 let domain = domain.to_string();
437
438 tokio::spawn(async move {
439 manager.process_queued_identifier(&domain, &identifier).await;
440 });
441 }
442 }
443
444 /// Process a single identifier from a domain's queue.
445 ///
446 /// This function:
447 /// 1. Gets the next URL to try for this identifier on this domain
448 /// 2. If a URL is found, fetches from it and marks it as tried
449 /// 3. If no URL is found, removes the identifier from this domain's queue
450 /// 4. Triggers processing of the next identifier if capacity is available
451 async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str) {
452 let ctx = match self.ctx.get() {
453 Some(ctx) => ctx,
454 None => {
455 debug!(
456 domain = %domain,
457 identifier = %identifier,
458 "No sync context set - cannot process queued identifier"
459 );
460 // Mark not in progress so it can be retried
461 if let Some(entry) = self.throttles.get(domain) {
462 let mut throttle = entry.lock().unwrap();
463 throttle.mark_identifier_not_in_progress(identifier);
464 }
465 return;
466 }
467 };
468
469 // Get tried URLs for this identifier on this domain
470 let tried_urls = {
471 self.throttles
472 .get(domain)
473 .map(|entry| {
474 let throttle = entry.lock().unwrap();
475 throttle.get_tried_urls(identifier)
476 })
477 .unwrap_or_default()
478 };
479
480 // Get next URL for this identifier on this specific domain
481 let url = sync_identifier_next_url(
482 ctx.as_ref(),
483 identifier,
484 Some(domain),
485 &tried_urls,
486 self,
487 )
488 .await;
489
490 match url {
491 Some(url) => {
492 debug!(
493 domain = %domain,
494 identifier = %identifier,
495 url = %url,
496 "Processing queued identifier - fetching from URL"
497 );
498
499 // Fetch from this URL
500 sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await;
501
502 // Record URL as tried and mark not in_progress
503 if let Some(entry) = self.throttles.get(domain) {
504 let mut throttle = entry.lock().unwrap();
505 throttle.mark_url_tried(identifier, url);
506 throttle.mark_identifier_not_in_progress(identifier);
507 }
508
509 // complete_request was already called by sync_identifier_from_url,
510 // which will trigger try_process_next if capacity is available
511 }
512 None => {
513 debug!(
514 domain = %domain,
515 identifier = %identifier,
516 "No more URLs for identifier on this domain - removing from queue"
517 );
518
519 // No more URLs for this identifier on this domain - remove from queue
520 if let Some(entry) = self.throttles.get(domain) {
521 let mut throttle = entry.lock().unwrap();
522 throttle.remove_identifier(identifier);
523 }
524
525 // Try next identifier since we didn't use any capacity
526 self.try_process_next(domain);
527 }
528 }
347 } 529 }
348} 530}
349 531
@@ -523,8 +705,8 @@ mod tests {
523 // Should now be throttled 705 // Should now be throttled
524 assert!(manager.is_throttled("example.com")); 706 assert!(manager.is_throttled("example.com"));
525 707
526 // Complete one request 708 // Complete one request (using internal method for non-Arc test)
527 manager.complete_request("example.com"); 709 manager.complete_request_internal("example.com");
528 710
529 // Should have capacity again 711 // Should have capacity again
530 assert!(!manager.is_throttled("example.com")); 712 assert!(!manager.is_throttled("example.com"));
@@ -540,8 +722,8 @@ mod tests {
540 // Domain doesn't exist yet 722 // Domain doesn't exist yet
541 assert!(!manager.throttles.contains_key("example.com")); 723 assert!(!manager.throttles.contains_key("example.com"));
542 724
543 // Enqueue an identifier 725 // Enqueue an identifier (using internal method for non-Arc test)
544 manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); 726 manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new());
545 727
546 // Domain throttle should now exist 728 // Domain throttle should now exist
547 assert!(manager.throttles.contains_key("example.com")); 729 assert!(manager.throttles.contains_key("example.com"));
@@ -560,4 +742,28 @@ mod tests {
560 // Domain throttle should now exist 742 // Domain throttle should now exist
561 assert!(manager.throttles.contains_key("example.com")); 743 assert!(manager.throttles.contains_key("example.com"));
562 } 744 }
745
746 #[test]
747 fn has_queued_work_reflects_queue_state() {
748 let manager = ThrottleManager::new(5, 100);
749
750 // Initially no queued work
751 let has_work = manager
752 .throttles
753 .get("example.com")
754 .map(|e| e.lock().unwrap().has_queued_work())
755 .unwrap_or(false);
756 assert!(!has_work);
757
758 // Enqueue an identifier
759 manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new());
760
761 // Now should have queued work
762 let has_work = manager
763 .throttles
764 .get("example.com")
765 .map(|e| e.lock().unwrap().has_queued_work())
766 .unwrap_or(false);
767 assert!(has_work);
768 }
563} 769}