upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory/sync/functions.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:57:55 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:57:55 +0000
commit8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 (patch)
tree0e85dce4da81e57aa26ead9810589fe0f44f89ca /src/purgatory/sync/functions.rs
parent1c5dac680b5a446e26b161208a17030f5fbd8a88 (diff)
Add sync_identifier orchestration and ThrottleManager queue processing
Implement the main sync orchestration function and trigger-based queue processing for throttled domains: sync_identifier function: - Orchestrates syncing git data for a single identifier - Tries all non-throttled URLs in sequence - Checks completion after each fetch (no pending events or all OIDs fetched) - Enqueues with throttled domains when non-throttled URLs are exhausted - Returns true if complete, false if events remain (for backoff) ThrottleManager enhancements: - Add set_context() to provide SyncContext for queue processing - Add try_process_next() to spawn tasks when capacity frees - Add process_queued_identifier() to handle queued work - Update complete_request() to trigger processing on completion - Update enqueue_identifier() to trigger processing when capacity available - Add internal methods for non-Arc testing compatibility Generic function updates: - Add ?Sized bound to sync_identifier_next_url, sync_identifier_from_url, sync_identifier, and get_throttled_domains_with_untried_urls for dynamic dispatch support (Arc<dyn SyncContext>) Tests: - sync_identifier_tries_multiple_urls_until_complete: verifies sequential URL fetching until all OIDs are available - sync_identifier_enqueues_throttled_domains_when_incomplete: verifies throttled domains get the identifier enqueued for later processing - has_queued_work_reflects_queue_state: verifies queue state tracking
Diffstat (limited to 'src/purgatory/sync/functions.rs')
-rw-r--r--src/purgatory/sync/functions.rs228
1 files changed, 225 insertions, 3 deletions
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
index de846f3..751dd5e 100644
--- a/src/purgatory/sync/functions.rs
+++ b/src/purgatory/sync/functions.rs
@@ -55,7 +55,7 @@ fn extract_domain(url: &str) -> Option<String> {
55/// 55///
56/// * `Some(url)` - The next URL to try 56/// * `Some(url)` - The next URL to try
57/// * `None` - No suitable URL found (all tried, all throttled, or no URLs available) 57/// * `None` - No suitable URL found (all tried, all throttled, or no URLs available)
58pub async fn sync_identifier_next_url<C: SyncContext>( 58pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>(
59 ctx: &C, 59 ctx: &C,
60 identifier: &str, 60 identifier: &str,
61 domain: Option<&str>, 61 domain: Option<&str>,
@@ -178,7 +178,7 @@ pub struct ThrottledDomainInfo {
178/// 178///
179/// A list of throttled domains that still have untried URLs, along with 179/// A list of throttled domains that still have untried URLs, along with
180/// the tried URLs for each domain (for proper queue state). 180/// the tried URLs for each domain (for proper queue state).
181pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>( 181pub async fn get_throttled_domains_with_untried_urls<C: SyncContext + ?Sized>(
182 ctx: &C, 182 ctx: &C,
183 identifier: &str, 183 identifier: &str,
184 tried_urls: &HashSet<String>, 184 tried_urls: &HashSet<String>,
@@ -254,7 +254,7 @@ pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>(
254/// # Returns 254/// # Returns
255/// 255///
256/// The number of OIDs successfully fetched (0 on failure) 256/// The number of OIDs successfully fetched (0 on failure)
257pub async fn sync_identifier_from_url<C: SyncContext>( 257pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
258 ctx: &C, 258 ctx: &C,
259 identifier: &str, 259 identifier: &str,
260 url: &str, 260 url: &str,
@@ -347,6 +347,128 @@ pub async fn sync_identifier_from_url<C: SyncContext>(
347 oids_fetched 347 oids_fetched
348} 348}
349 349
350/// Sync git data for an identifier.
351///
352/// This is the main orchestration function called by the sync loop. It:
353/// 1. Tries all non-throttled URLs in sequence
354/// 2. After each fetch, checks if sync is complete (no pending events or no needed OIDs)
355/// 3. When no non-throttled URLs remain, enqueues with throttled domains for later processing
356/// 4. Returns without waiting for throttled domains to complete
357///
358/// # Arguments
359///
360/// * `ctx` - The sync context providing repository data and OID information
361/// * `identifier` - The repository identifier (d-tag value)
362/// * `throttle_manager` - Used for rate limiting and domain queue management
363///
364/// # Returns
365///
366/// * `true` - Sync completed (no pending events or all OIDs fetched)
367/// * `false` - Events remain in purgatory (will be retried after backoff, or processed
368/// by throttled domain queues)
369pub async fn sync_identifier<C: SyncContext + ?Sized>(
370 ctx: &C,
371 identifier: &str,
372 throttle_manager: &Arc<ThrottleManager>,
373) -> bool {
374 let mut tried_urls: HashSet<String> = HashSet::new();
375
376 debug!(
377 identifier = %identifier,
378 "Starting sync for identifier"
379 );
380
381 // Try all non-throttled URLs
382 loop {
383 match sync_identifier_next_url(ctx, identifier, None, &tried_urls, throttle_manager).await {
384 Some(url) => {
385 debug!(
386 identifier = %identifier,
387 url = %url,
388 "Found non-throttled URL to try"
389 );
390
391 // Fetch from this URL
392 sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await;
393 tried_urls.insert(url);
394
395 // Check if sync is now complete
396 if !ctx.has_pending_events(identifier) {
397 debug!(
398 identifier = %identifier,
399 "Sync complete - no pending events"
400 );
401 return true;
402 }
403
404 let needed_oids = ctx.collect_needed_oids(identifier);
405 if needed_oids.is_empty() {
406 debug!(
407 identifier = %identifier,
408 "Sync complete - all OIDs available"
409 );
410 return true;
411 }
412
413 // Continue trying more URLs
414 }
415 None => {
416 // No more non-throttled URLs available
417 debug!(
418 identifier = %identifier,
419 tried_count = tried_urls.len(),
420 "No more non-throttled URLs available"
421 );
422 break;
423 }
424 }
425 }
426
427 // Check if we're done (no pending events or no needed OIDs)
428 if !ctx.has_pending_events(identifier) {
429 debug!(
430 identifier = %identifier,
431 "Sync complete after exhausting URLs - no pending events"
432 );
433 return true;
434 }
435
436 let needed_oids = ctx.collect_needed_oids(identifier);
437 if needed_oids.is_empty() {
438 debug!(
439 identifier = %identifier,
440 "Sync complete after exhausting URLs - all OIDs available"
441 );
442 return true;
443 }
444
445 // Enqueue with any throttled domains that have untried URLs
446 let throttled_domains =
447 get_throttled_domains_with_untried_urls(ctx, identifier, &tried_urls, throttle_manager)
448 .await;
449
450 for info in throttled_domains {
451 debug!(
452 identifier = %identifier,
453 domain = %info.domain,
454 "Enqueueing identifier with throttled domain"
455 );
456 throttle_manager.enqueue_identifier(
457 &info.domain,
458 identifier.to_string(),
459 info.tried_urls_for_domain,
460 );
461 }
462
463 // Return false - events remain, will retry after backoff
464 // (throttled domains will process independently)
465 debug!(
466 identifier = %identifier,
467 "Sync incomplete - returning false for backoff"
468 );
469 false
470}
471
350#[cfg(test)] 472#[cfg(test)]
351mod tests { 473mod tests {
352 use super::*; 474 use super::*;
@@ -617,4 +739,104 @@ mod tests {
617 assert_eq!(throttled[0].domain, "gitlab.com"); 739 assert_eq!(throttled[0].domain, "gitlab.com");
618 assert!(throttled[0].tried_urls_for_domain.is_empty()); 740 assert!(throttled[0].tried_urls_for_domain.is_empty());
619 } 741 }
742
743 // =========================================================================
744 // Phase 6: sync_identifier tests
745 // =========================================================================
746
747 #[tokio::test]
748 async fn sync_identifier_tries_multiple_urls_until_complete() {
749 // Set up mock with 3 URLs, each providing partial OIDs
750 // URL1 provides abc123, URL2 provides def456, URL3 provides ghi789
751 let mock = MockSyncContext::new()
752 .with_urls(&[
753 "https://server1.com/repo.git",
754 "https://server2.com/repo.git",
755 "https://server3.com/repo.git",
756 ])
757 .with_needed_oids(&["abc123", "def456", "ghi789"])
758 .with_pending_events(true)
759 .url_provides("https://server1.com/repo.git", &["abc123"])
760 .url_provides("https://server2.com/repo.git", &["def456"])
761 .url_provides("https://server3.com/repo.git", &["ghi789"]);
762
763 let throttle_manager = Arc::new(ThrottleManager::new(5, 100));
764
765 // Run sync_identifier
766 let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await;
767
768 // Should return true (sync complete)
769 assert!(complete, "Expected sync to complete after trying all URLs");
770
771 // Should have tried all 3 URLs
772 let fetch_log = mock.fetch_log();
773 assert_eq!(
774 fetch_log.len(),
775 3,
776 "Expected 3 fetch attempts, got: {:?}",
777 fetch_log
778 );
779
780 // All OIDs should now be fetched
781 assert!(
782 mock.current_needed_oids().is_empty(),
783 "Expected all OIDs to be fetched"
784 );
785 }
786
787 #[tokio::test]
788 async fn sync_identifier_enqueues_throttled_domains_when_incomplete() {
789 // Set up mock with URLs from two domains
790 // Only github.com can provide the OID, but it will be throttled
791 let mock = MockSyncContext::new()
792 .with_urls(&[
793 "https://github.com/foo/bar.git",
794 "https://gitlab.com/foo/bar.git",
795 ])
796 .with_needed_oids(&["abc123"])
797 .with_pending_events(true)
798 .url_provides("https://github.com/foo/bar.git", &["abc123"]);
799 // Note: gitlab.com doesn't provide any OIDs
800
801 let throttle_manager = Arc::new(ThrottleManager::new(1, 100));
802
803 // Throttle github.com by starting a request
804 throttle_manager.start_request("github.com");
805
806 // Run sync_identifier
807 let complete = sync_identifier(&mock, "test-repo", &throttle_manager).await;
808
809 // Should return false (sync incomplete - github.com is throttled)
810 assert!(
811 !complete,
812 "Expected sync to be incomplete when required domain is throttled"
813 );
814
815 // Should have tried gitlab.com (not throttled) but it doesn't have the OID
816 let fetch_log = mock.fetch_log();
817 assert_eq!(
818 fetch_log.len(),
819 1,
820 "Expected 1 fetch attempt (gitlab.com), got: {:?}",
821 fetch_log
822 );
823 assert!(
824 fetch_log[0].contains("gitlab.com"),
825 "Expected gitlab.com to be tried first"
826 );
827
828 // OID should still be needed
829 assert!(
830 mock.current_needed_oids().contains("abc123"),
831 "Expected OID to still be needed"
832 );
833
834 // github.com should have the identifier enqueued
835 // We can verify this by checking if github.com is still throttled (it should be,
836 // since the identifier was enqueued but not processed yet)
837 assert!(
838 throttle_manager.is_throttled("github.com"),
839 "Expected github.com to still be throttled"
840 );
841 }
620} 842}