diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-08 00:50:54 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-08 00:50:54 +0000 |
| commit | f75e1c59aacf5ce668fd327e4e3d827511661c2a (patch) | |
| tree | 867926c7503e7c587e86c67896a9e7347600447b /src/purgatory/sync | |
| parent | 3f14f998d64b5fa15bdddd7570b4f72874eb9f29 (diff) | |
chore: cargo fmt
Diffstat (limited to 'src/purgatory/sync')
| -rw-r--r-- | src/purgatory/sync/context.rs | 19 | ||||
| -rw-r--r-- | src/purgatory/sync/functions.rs | 69 | ||||
| -rw-r--r-- | src/purgatory/sync/loop.rs | 5 | ||||
| -rw-r--r-- | src/purgatory/sync/throttle.rs | 31 |
4 files changed, 57 insertions, 67 deletions
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 2922f10..9e195c7 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -119,12 +119,8 @@ pub trait SyncContext: Send + Sync { | |||
| 119 | /// | 119 | /// |
| 120 | /// # Returns | 120 | /// # Returns |
| 121 | /// List of OIDs that were successfully fetched | 121 | /// List of OIDs that were successfully fetched |
| 122 | async fn fetch_oids( | 122 | async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) |
| 123 | &self, | 123 | -> Result<Vec<String>>; |
| 124 | repo_path: &Path, | ||
| 125 | url: &str, | ||
| 126 | oids: &[String], | ||
| 127 | ) -> Result<Vec<String>>; | ||
| 128 | 124 | ||
| 129 | /// Process newly available git data. | 125 | /// Process newly available git data. |
| 130 | /// | 126 | /// |
| @@ -368,10 +364,7 @@ impl SyncContext for RealSyncContext { | |||
| 368 | .cloned() | 364 | .cloned() |
| 369 | .collect(); | 365 | .collect(); |
| 370 | 366 | ||
| 371 | debug!( | 367 | debug!(fetched_count = fetched.len(), "Successfully fetched OIDs"); |
| 372 | fetched_count = fetched.len(), | ||
| 373 | "Successfully fetched OIDs" | ||
| 374 | ); | ||
| 375 | 368 | ||
| 376 | fetched | 369 | fetched |
| 377 | } | 370 | } |
| @@ -702,11 +695,7 @@ pub mod mock { | |||
| 702 | } | 695 | } |
| 703 | 696 | ||
| 704 | // Get OIDs this URL can provide | 697 | // Get OIDs this URL can provide |
| 705 | let provides = self | 698 | let provides = self.url_provides_oids.get(url).cloned().unwrap_or_default(); |
| 706 | .url_provides_oids | ||
| 707 | .get(url) | ||
| 708 | .cloned() | ||
| 709 | .unwrap_or_default(); | ||
| 710 | 699 | ||
| 711 | // Find which requested OIDs this URL can provide | 700 | // Find which requested OIDs this URL can provide |
| 712 | let fetched: Vec<String> = oids | 701 | let fetched: Vec<String> = oids |
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index bb7c0b9..370990e 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs | |||
| @@ -32,15 +32,17 @@ use super::throttle::ThrottleManager; | |||
| 32 | fn extract_domain(url: &str) -> Option<String> { | 32 | fn extract_domain(url: &str) -> Option<String> { |
| 33 | // Simple URL parsing for HTTP(S) URLs | 33 | // Simple URL parsing for HTTP(S) URLs |
| 34 | // Format: scheme://[user@]host[:port]/path | 34 | // Format: scheme://[user@]host[:port]/path |
| 35 | let url = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://"))?; | 35 | let url = url |
| 36 | 36 | .strip_prefix("https://") | |
| 37 | .or_else(|| url.strip_prefix("http://"))?; | ||
| 38 | |||
| 37 | // Remove user info if present (e.g., "user@host" -> "host") | 39 | // Remove user info if present (e.g., "user@host" -> "host") |
| 38 | let url = url.split('@').next_back()?; | 40 | let url = url.split('@').next_back()?; |
| 39 | 41 | ||
| 40 | // Extract host (before first '/' or ':') | 42 | // Extract host (before first '/' or ':') |
| 41 | let host = url.split('/').next()?; | 43 | let host = url.split('/').next()?; |
| 42 | let host = host.split(':').next()?; | 44 | let host = host.split(':').next()?; |
| 43 | 45 | ||
| 44 | if host.is_empty() { | 46 | if host.is_empty() { |
| 45 | None | 47 | None |
| 46 | } else { | 48 | } else { |
| @@ -112,17 +114,17 @@ pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>( | |||
| 112 | 114 | ||
| 113 | // 4. Collect clone URLs from announcements AND PR events in purgatory | 115 | // 4. Collect clone URLs from announcements AND PR events in purgatory |
| 114 | let our_domain = ctx.our_domain(); | 116 | let our_domain = ctx.our_domain(); |
| 115 | 117 | ||
| 116 | // Get clone URLs from repository announcements | 118 | // Get clone URLs from repository announcements |
| 117 | let announcement_urls: HashSet<String> = repo_data | 119 | let announcement_urls: HashSet<String> = repo_data |
| 118 | .announcements | 120 | .announcements |
| 119 | .iter() | 121 | .iter() |
| 120 | .flat_map(|a| a.clone_urls.iter().cloned()) | 122 | .flat_map(|a| a.clone_urls.iter().cloned()) |
| 121 | .collect(); | 123 | .collect(); |
| 122 | 124 | ||
| 123 | // Get clone URLs from PR events in purgatory | 125 | // Get clone URLs from PR events in purgatory |
| 124 | let pr_urls = ctx.collect_pr_clone_urls(identifier); | 126 | let pr_urls = ctx.collect_pr_clone_urls(identifier); |
| 125 | 127 | ||
| 126 | // Merge and filter out our domain | 128 | // Merge and filter out our domain |
| 127 | let all_urls: HashSet<String> = announcement_urls | 129 | let all_urls: HashSet<String> = announcement_urls |
| 128 | .union(&pr_urls) | 130 | .union(&pr_urls) |
| @@ -151,11 +153,9 @@ pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>( | |||
| 151 | match domain { | 153 | match domain { |
| 152 | Some(specific_domain) => { | 154 | Some(specific_domain) => { |
| 153 | // Only look at URLs from this specific domain | 155 | // Only look at URLs from this specific domain |
| 154 | urls_by_domain.get(specific_domain).and_then(|urls| { | 156 | urls_by_domain |
| 155 | urls.iter() | 157 | .get(specific_domain) |
| 156 | .find(|url| !tried_urls.contains(*url)) | 158 | .and_then(|urls| urls.iter().find(|url| !tried_urls.contains(*url)).cloned()) |
| 157 | .cloned() | ||
| 158 | }) | ||
| 159 | } | 159 | } |
| 160 | None => { | 160 | None => { |
| 161 | // Try any non-throttled domain | 161 | // Try any non-throttled domain |
| @@ -217,17 +217,17 @@ pub async fn get_throttled_domains_with_untried_urls<C: SyncContext + ?Sized>( | |||
| 217 | }; | 217 | }; |
| 218 | 218 | ||
| 219 | let our_domain = ctx.our_domain(); | 219 | let our_domain = ctx.our_domain(); |
| 220 | 220 | ||
| 221 | // Get clone URLs from repository announcements | 221 | // Get clone URLs from repository announcements |
| 222 | let announcement_urls: HashSet<String> = repo_data | 222 | let announcement_urls: HashSet<String> = repo_data |
| 223 | .announcements | 223 | .announcements |
| 224 | .iter() | 224 | .iter() |
| 225 | .flat_map(|a| a.clone_urls.iter().cloned()) | 225 | .flat_map(|a| a.clone_urls.iter().cloned()) |
| 226 | .collect(); | 226 | .collect(); |
| 227 | 227 | ||
| 228 | // Get clone URLs from PR events in purgatory | 228 | // Get clone URLs from PR events in purgatory |
| 229 | let pr_urls = ctx.collect_pr_clone_urls(identifier); | 229 | let pr_urls = ctx.collect_pr_clone_urls(identifier); |
| 230 | 230 | ||
| 231 | // Merge and filter out our domain | 231 | // Merge and filter out our domain |
| 232 | let all_urls: HashSet<String> = announcement_urls | 232 | let all_urls: HashSet<String> = announcement_urls |
| 233 | .union(&pr_urls) | 233 | .union(&pr_urls) |
| @@ -766,9 +766,13 @@ mod tests { | |||
| 766 | let mut tried_urls = HashSet::new(); | 766 | let mut tried_urls = HashSet::new(); |
| 767 | tried_urls.insert("https://github.com/foo/bar.git".to_string()); | 767 | tried_urls.insert("https://github.com/foo/bar.git".to_string()); |
| 768 | 768 | ||
| 769 | let throttled = | 769 | let throttled = get_throttled_domains_with_untried_urls( |
| 770 | get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) | 770 | &mock, |
| 771 | .await; | 771 | "test-repo", |
| 772 | &tried_urls, | ||
| 773 | &throttle_manager, | ||
| 774 | ) | ||
| 775 | .await; | ||
| 772 | 776 | ||
| 773 | // Should only include gitlab.com (throttled with untried URLs) | 777 | // Should only include gitlab.com (throttled with untried URLs) |
| 774 | // github.com is throttled but URL was tried | 778 | // github.com is throttled but URL was tried |
| @@ -885,11 +889,10 @@ mod tests { | |||
| 885 | #[tokio::test] | 889 | #[tokio::test] |
| 886 | async fn test_collect_pr_clone_urls_returns_configured_urls() { | 890 | async fn test_collect_pr_clone_urls_returns_configured_urls() { |
| 887 | // Test that MockSyncContext returns configured PR clone URLs | 891 | // Test that MockSyncContext returns configured PR clone URLs |
| 888 | let mock = MockSyncContext::new() | 892 | let mock = MockSyncContext::new().with_pr_clone_urls(&[ |
| 889 | .with_pr_clone_urls(&[ | 893 | "https://pr-server.com/fork.git", |
| 890 | "https://pr-server.com/fork.git", | 894 | "https://another-server.com/fork.git", |
| 891 | "https://another-server.com/fork.git", | 895 | ]); |
| 892 | ]); | ||
| 893 | 896 | ||
| 894 | let pr_urls = mock.collect_pr_clone_urls("test-repo"); | 897 | let pr_urls = mock.collect_pr_clone_urls("test-repo"); |
| 895 | 898 | ||
| @@ -945,7 +948,7 @@ mod tests { | |||
| 945 | .with_urls(&["https://github.com/owner/repo.git"]) | 948 | .with_urls(&["https://github.com/owner/repo.git"]) |
| 946 | .with_pr_clone_urls(&[ | 949 | .with_pr_clone_urls(&[ |
| 947 | "https://our-relay.com/fork.git", // Should be filtered | 950 | "https://our-relay.com/fork.git", // Should be filtered |
| 948 | "https://external.com/fork.git", // Should be included | 951 | "https://external.com/fork.git", // Should be included |
| 949 | ]) | 952 | ]) |
| 950 | .with_our_domain("our-relay.com") | 953 | .with_our_domain("our-relay.com") |
| 951 | .with_needed_oids(&["abc123"]) | 954 | .with_needed_oids(&["abc123"]) |
| @@ -957,8 +960,7 @@ mod tests { | |||
| 957 | // Collect all available URLs | 960 | // Collect all available URLs |
| 958 | let mut available_urls = Vec::new(); | 961 | let mut available_urls = Vec::new(); |
| 959 | while let Some(url) = | 962 | while let Some(url) = |
| 960 | sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager) | 963 | sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager).await |
| 961 | .await | ||
| 962 | { | 964 | { |
| 963 | available_urls.push(url.clone()); | 965 | available_urls.push(url.clone()); |
| 964 | tried_urls.insert(url); | 966 | tried_urls.insert(url); |
| @@ -1006,16 +1008,17 @@ mod tests { | |||
| 1006 | 1008 | ||
| 1007 | let tried_urls = HashSet::new(); | 1009 | let tried_urls = HashSet::new(); |
| 1008 | 1010 | ||
| 1009 | let throttled = | 1011 | let throttled = get_throttled_domains_with_untried_urls( |
| 1010 | get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) | 1012 | &mock, |
| 1011 | .await; | 1013 | "test-repo", |
| 1014 | &tried_urls, | ||
| 1015 | &throttle_manager, | ||
| 1016 | ) | ||
| 1017 | .await; | ||
| 1012 | 1018 | ||
| 1013 | // Should include both throttled domains | 1019 | // Should include both throttled domains |
| 1014 | let domains: Vec<&str> = throttled.iter().map(|t| t.domain.as_str()).collect(); | 1020 | let domains: Vec<&str> = throttled.iter().map(|t| t.domain.as_str()).collect(); |
| 1015 | assert!( | 1021 | assert!(domains.contains(&"github.com"), "Should include github.com"); |
| 1016 | domains.contains(&"github.com"), | ||
| 1017 | "Should include github.com" | ||
| 1018 | ); | ||
| 1019 | assert!( | 1022 | assert!( |
| 1020 | domains.contains(&"pr-server.com"), | 1023 | domains.contains(&"pr-server.com"), |
| 1021 | "Should include pr-server.com from PR clone URLs" | 1024 | "Should include pr-server.com from PR clone URLs" |
diff --git a/src/purgatory/sync/loop.rs b/src/purgatory/sync/loop.rs index ebca766..92e0594 100644 --- a/src/purgatory/sync/loop.rs +++ b/src/purgatory/sync/loop.rs | |||
| @@ -62,7 +62,10 @@ impl Purgatory { | |||
| 62 | ctx: Arc<dyn SyncContext>, | 62 | ctx: Arc<dyn SyncContext>, |
| 63 | throttle_manager: Arc<ThrottleManager>, | 63 | throttle_manager: Arc<ThrottleManager>, |
| 64 | ) -> JoinHandle<()> { | 64 | ) -> JoinHandle<()> { |
| 65 | info!("Starting purgatory sync loop (interval: {:?})", SYNC_LOOP_INTERVAL); | 65 | info!( |
| 66 | "Starting purgatory sync loop (interval: {:?})", | ||
| 67 | SYNC_LOOP_INTERVAL | ||
| 68 | ); | ||
| 66 | 69 | ||
| 67 | tokio::spawn(async move { | 70 | tokio::spawn(async move { |
| 68 | let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL); | 71 | let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL); |
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs index e6efe1f..ad6e8ea 100644 --- a/src/purgatory/sync/throttle.rs +++ b/src/purgatory/sync/throttle.rs | |||
| @@ -316,15 +316,13 @@ impl ThrottleManager { | |||
| 316 | } | 316 | } |
| 317 | 317 | ||
| 318 | // Create new throttle | 318 | // Create new throttle |
| 319 | self.throttles | 319 | self.throttles.entry(domain.to_string()).or_insert_with(|| { |
| 320 | .entry(domain.to_string()) | 320 | Mutex::new(DomainThrottle::new( |
| 321 | .or_insert_with(|| { | 321 | domain.to_string(), |
| 322 | Mutex::new(DomainThrottle::new( | 322 | self.max_concurrent_per_domain, |
| 323 | domain.to_string(), | 323 | self.max_per_minute_per_domain, |
| 324 | self.max_concurrent_per_domain, | 324 | )) |
| 325 | self.max_per_minute_per_domain, | 325 | }); |
| 326 | )) | ||
| 327 | }); | ||
| 328 | 326 | ||
| 329 | // Return the entry (we know it exists now) | 327 | // Return the entry (we know it exists now) |
| 330 | self.throttles.get(domain).unwrap() | 328 | self.throttles.get(domain).unwrap() |
| @@ -438,7 +436,9 @@ impl ThrottleManager { | |||
| 438 | let domain = domain.to_string(); | 436 | let domain = domain.to_string(); |
| 439 | 437 | ||
| 440 | tokio::spawn(async move { | 438 | tokio::spawn(async move { |
| 441 | manager.process_queued_identifier(&domain, &identifier).await; | 439 | manager |
| 440 | .process_queued_identifier(&domain, &identifier) | ||
| 441 | .await; | ||
| 442 | }); | 442 | }); |
| 443 | } | 443 | } |
| 444 | } | 444 | } |
| @@ -480,14 +480,9 @@ impl ThrottleManager { | |||
| 480 | }; | 480 | }; |
| 481 | 481 | ||
| 482 | // Get next URL for this identifier on this specific domain | 482 | // Get next URL for this identifier on this specific domain |
| 483 | let url = sync_identifier_next_url( | 483 | let url = |
| 484 | ctx.as_ref(), | 484 | sync_identifier_next_url(ctx.as_ref(), identifier, Some(domain), &tried_urls, self) |
| 485 | identifier, | 485 | .await; |
| 486 | Some(domain), | ||
| 487 | &tried_urls, | ||
| 488 | self, | ||
| 489 | ) | ||
| 490 | .await; | ||
| 491 | 486 | ||
| 492 | match url { | 487 | match url { |
| 493 | Some(url) => { | 488 | Some(url) => { |