upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-08 00:50:54 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-08 00:50:54 +0000
commitf75e1c59aacf5ce668fd327e4e3d827511661c2a (patch)
tree867926c7503e7c587e86c67896a9e7347600447b /src/purgatory/sync
parent3f14f998d64b5fa15bdddd7570b4f72874eb9f29 (diff)
chore: cargo fmt
Diffstat (limited to 'src/purgatory/sync')
-rw-r--r--src/purgatory/sync/context.rs19
-rw-r--r--src/purgatory/sync/functions.rs69
-rw-r--r--src/purgatory/sync/loop.rs5
-rw-r--r--src/purgatory/sync/throttle.rs31
4 files changed, 57 insertions, 67 deletions
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 2922f10..9e195c7 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -119,12 +119,8 @@ pub trait SyncContext: Send + Sync {
119 /// 119 ///
120 /// # Returns 120 /// # Returns
121 /// List of OIDs that were successfully fetched 121 /// List of OIDs that were successfully fetched
122 async fn fetch_oids( 122 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String])
123 &self, 123 -> Result<Vec<String>>;
124 repo_path: &Path,
125 url: &str,
126 oids: &[String],
127 ) -> Result<Vec<String>>;
128 124
129 /// Process newly available git data. 125 /// Process newly available git data.
130 /// 126 ///
@@ -368,10 +364,7 @@ impl SyncContext for RealSyncContext {
368 .cloned() 364 .cloned()
369 .collect(); 365 .collect();
370 366
371 debug!( 367 debug!(fetched_count = fetched.len(), "Successfully fetched OIDs");
372 fetched_count = fetched.len(),
373 "Successfully fetched OIDs"
374 );
375 368
376 fetched 369 fetched
377 } 370 }
@@ -702,11 +695,7 @@ pub mod mock {
702 } 695 }
703 696
704 // Get OIDs this URL can provide 697 // Get OIDs this URL can provide
705 let provides = self 698 let provides = self.url_provides_oids.get(url).cloned().unwrap_or_default();
706 .url_provides_oids
707 .get(url)
708 .cloned()
709 .unwrap_or_default();
710 699
711 // Find which requested OIDs this URL can provide 700 // Find which requested OIDs this URL can provide
712 let fetched: Vec<String> = oids 701 let fetched: Vec<String> = oids
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
index bb7c0b9..370990e 100644
--- a/src/purgatory/sync/functions.rs
+++ b/src/purgatory/sync/functions.rs
@@ -32,15 +32,17 @@ use super::throttle::ThrottleManager;
32fn extract_domain(url: &str) -> Option<String> { 32fn extract_domain(url: &str) -> Option<String> {
33 // Simple URL parsing for HTTP(S) URLs 33 // Simple URL parsing for HTTP(S) URLs
34 // Format: scheme://[user@]host[:port]/path 34 // Format: scheme://[user@]host[:port]/path
35 let url = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://"))?; 35 let url = url
36 36 .strip_prefix("https://")
37 .or_else(|| url.strip_prefix("http://"))?;
38
37 // Remove user info if present (e.g., "user@host" -> "host") 39 // Remove user info if present (e.g., "user@host" -> "host")
38 let url = url.split('@').next_back()?; 40 let url = url.split('@').next_back()?;
39 41
40 // Extract host (before first '/' or ':') 42 // Extract host (before first '/' or ':')
41 let host = url.split('/').next()?; 43 let host = url.split('/').next()?;
42 let host = host.split(':').next()?; 44 let host = host.split(':').next()?;
43 45
44 if host.is_empty() { 46 if host.is_empty() {
45 None 47 None
46 } else { 48 } else {
@@ -112,17 +114,17 @@ pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>(
112 114
113 // 4. Collect clone URLs from announcements AND PR events in purgatory 115 // 4. Collect clone URLs from announcements AND PR events in purgatory
114 let our_domain = ctx.our_domain(); 116 let our_domain = ctx.our_domain();
115 117
116 // Get clone URLs from repository announcements 118 // Get clone URLs from repository announcements
117 let announcement_urls: HashSet<String> = repo_data 119 let announcement_urls: HashSet<String> = repo_data
118 .announcements 120 .announcements
119 .iter() 121 .iter()
120 .flat_map(|a| a.clone_urls.iter().cloned()) 122 .flat_map(|a| a.clone_urls.iter().cloned())
121 .collect(); 123 .collect();
122 124
123 // Get clone URLs from PR events in purgatory 125 // Get clone URLs from PR events in purgatory
124 let pr_urls = ctx.collect_pr_clone_urls(identifier); 126 let pr_urls = ctx.collect_pr_clone_urls(identifier);
125 127
126 // Merge and filter out our domain 128 // Merge and filter out our domain
127 let all_urls: HashSet<String> = announcement_urls 129 let all_urls: HashSet<String> = announcement_urls
128 .union(&pr_urls) 130 .union(&pr_urls)
@@ -151,11 +153,9 @@ pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>(
151 match domain { 153 match domain {
152 Some(specific_domain) => { 154 Some(specific_domain) => {
153 // Only look at URLs from this specific domain 155 // Only look at URLs from this specific domain
154 urls_by_domain.get(specific_domain).and_then(|urls| { 156 urls_by_domain
155 urls.iter() 157 .get(specific_domain)
156 .find(|url| !tried_urls.contains(*url)) 158 .and_then(|urls| urls.iter().find(|url| !tried_urls.contains(*url)).cloned())
157 .cloned()
158 })
159 } 159 }
160 None => { 160 None => {
161 // Try any non-throttled domain 161 // Try any non-throttled domain
@@ -217,17 +217,17 @@ pub async fn get_throttled_domains_with_untried_urls<C: SyncContext + ?Sized>(
217 }; 217 };
218 218
219 let our_domain = ctx.our_domain(); 219 let our_domain = ctx.our_domain();
220 220
221 // Get clone URLs from repository announcements 221 // Get clone URLs from repository announcements
222 let announcement_urls: HashSet<String> = repo_data 222 let announcement_urls: HashSet<String> = repo_data
223 .announcements 223 .announcements
224 .iter() 224 .iter()
225 .flat_map(|a| a.clone_urls.iter().cloned()) 225 .flat_map(|a| a.clone_urls.iter().cloned())
226 .collect(); 226 .collect();
227 227
228 // Get clone URLs from PR events in purgatory 228 // Get clone URLs from PR events in purgatory
229 let pr_urls = ctx.collect_pr_clone_urls(identifier); 229 let pr_urls = ctx.collect_pr_clone_urls(identifier);
230 230
231 // Merge and filter out our domain 231 // Merge and filter out our domain
232 let all_urls: HashSet<String> = announcement_urls 232 let all_urls: HashSet<String> = announcement_urls
233 .union(&pr_urls) 233 .union(&pr_urls)
@@ -766,9 +766,13 @@ mod tests {
766 let mut tried_urls = HashSet::new(); 766 let mut tried_urls = HashSet::new();
767 tried_urls.insert("https://github.com/foo/bar.git".to_string()); 767 tried_urls.insert("https://github.com/foo/bar.git".to_string());
768 768
769 let throttled = 769 let throttled = get_throttled_domains_with_untried_urls(
770 get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) 770 &mock,
771 .await; 771 "test-repo",
772 &tried_urls,
773 &throttle_manager,
774 )
775 .await;
772 776
773 // Should only include gitlab.com (throttled with untried URLs) 777 // Should only include gitlab.com (throttled with untried URLs)
774 // github.com is throttled but URL was tried 778 // github.com is throttled but URL was tried
@@ -885,11 +889,10 @@ mod tests {
885 #[tokio::test] 889 #[tokio::test]
886 async fn test_collect_pr_clone_urls_returns_configured_urls() { 890 async fn test_collect_pr_clone_urls_returns_configured_urls() {
887 // Test that MockSyncContext returns configured PR clone URLs 891 // Test that MockSyncContext returns configured PR clone URLs
888 let mock = MockSyncContext::new() 892 let mock = MockSyncContext::new().with_pr_clone_urls(&[
889 .with_pr_clone_urls(&[ 893 "https://pr-server.com/fork.git",
890 "https://pr-server.com/fork.git", 894 "https://another-server.com/fork.git",
891 "https://another-server.com/fork.git", 895 ]);
892 ]);
893 896
894 let pr_urls = mock.collect_pr_clone_urls("test-repo"); 897 let pr_urls = mock.collect_pr_clone_urls("test-repo");
895 898
@@ -945,7 +948,7 @@ mod tests {
945 .with_urls(&["https://github.com/owner/repo.git"]) 948 .with_urls(&["https://github.com/owner/repo.git"])
946 .with_pr_clone_urls(&[ 949 .with_pr_clone_urls(&[
947 "https://our-relay.com/fork.git", // Should be filtered 950 "https://our-relay.com/fork.git", // Should be filtered
948 "https://external.com/fork.git", // Should be included 951 "https://external.com/fork.git", // Should be included
949 ]) 952 ])
950 .with_our_domain("our-relay.com") 953 .with_our_domain("our-relay.com")
951 .with_needed_oids(&["abc123"]) 954 .with_needed_oids(&["abc123"])
@@ -957,8 +960,7 @@ mod tests {
957 // Collect all available URLs 960 // Collect all available URLs
958 let mut available_urls = Vec::new(); 961 let mut available_urls = Vec::new();
959 while let Some(url) = 962 while let Some(url) =
960 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager) 963 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager).await
961 .await
962 { 964 {
963 available_urls.push(url.clone()); 965 available_urls.push(url.clone());
964 tried_urls.insert(url); 966 tried_urls.insert(url);
@@ -1006,16 +1008,17 @@ mod tests {
1006 1008
1007 let tried_urls = HashSet::new(); 1009 let tried_urls = HashSet::new();
1008 1010
1009 let throttled = 1011 let throttled = get_throttled_domains_with_untried_urls(
1010 get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) 1012 &mock,
1011 .await; 1013 "test-repo",
1014 &tried_urls,
1015 &throttle_manager,
1016 )
1017 .await;
1012 1018
1013 // Should include both throttled domains 1019 // Should include both throttled domains
1014 let domains: Vec<&str> = throttled.iter().map(|t| t.domain.as_str()).collect(); 1020 let domains: Vec<&str> = throttled.iter().map(|t| t.domain.as_str()).collect();
1015 assert!( 1021 assert!(domains.contains(&"github.com"), "Should include github.com");
1016 domains.contains(&"github.com"),
1017 "Should include github.com"
1018 );
1019 assert!( 1022 assert!(
1020 domains.contains(&"pr-server.com"), 1023 domains.contains(&"pr-server.com"),
1021 "Should include pr-server.com from PR clone URLs" 1024 "Should include pr-server.com from PR clone URLs"
diff --git a/src/purgatory/sync/loop.rs b/src/purgatory/sync/loop.rs
index ebca766..92e0594 100644
--- a/src/purgatory/sync/loop.rs
+++ b/src/purgatory/sync/loop.rs
@@ -62,7 +62,10 @@ impl Purgatory {
62 ctx: Arc<dyn SyncContext>, 62 ctx: Arc<dyn SyncContext>,
63 throttle_manager: Arc<ThrottleManager>, 63 throttle_manager: Arc<ThrottleManager>,
64 ) -> JoinHandle<()> { 64 ) -> JoinHandle<()> {
65 info!("Starting purgatory sync loop (interval: {:?})", SYNC_LOOP_INTERVAL); 65 info!(
66 "Starting purgatory sync loop (interval: {:?})",
67 SYNC_LOOP_INTERVAL
68 );
66 69
67 tokio::spawn(async move { 70 tokio::spawn(async move {
68 let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL); 71 let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL);
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs
index e6efe1f..ad6e8ea 100644
--- a/src/purgatory/sync/throttle.rs
+++ b/src/purgatory/sync/throttle.rs
@@ -316,15 +316,13 @@ impl ThrottleManager {
316 } 316 }
317 317
318 // Create new throttle 318 // Create new throttle
319 self.throttles 319 self.throttles.entry(domain.to_string()).or_insert_with(|| {
320 .entry(domain.to_string()) 320 Mutex::new(DomainThrottle::new(
321 .or_insert_with(|| { 321 domain.to_string(),
322 Mutex::new(DomainThrottle::new( 322 self.max_concurrent_per_domain,
323 domain.to_string(), 323 self.max_per_minute_per_domain,
324 self.max_concurrent_per_domain, 324 ))
325 self.max_per_minute_per_domain, 325 });
326 ))
327 });
328 326
329 // Return the entry (we know it exists now) 327 // Return the entry (we know it exists now)
330 self.throttles.get(domain).unwrap() 328 self.throttles.get(domain).unwrap()
@@ -438,7 +436,9 @@ impl ThrottleManager {
438 let domain = domain.to_string(); 436 let domain = domain.to_string();
439 437
440 tokio::spawn(async move { 438 tokio::spawn(async move {
441 manager.process_queued_identifier(&domain, &identifier).await; 439 manager
440 .process_queued_identifier(&domain, &identifier)
441 .await;
442 }); 442 });
443 } 443 }
444 } 444 }
@@ -480,14 +480,9 @@ impl ThrottleManager {
480 }; 480 };
481 481
482 // Get next URL for this identifier on this specific domain 482 // Get next URL for this identifier on this specific domain
483 let url = sync_identifier_next_url( 483 let url =
484 ctx.as_ref(), 484 sync_identifier_next_url(ctx.as_ref(), identifier, Some(domain), &tried_urls, self)
485 identifier, 485 .await;
486 Some(domain),
487 &tried_urls,
488 self,
489 )
490 .await;
491 486
492 match url { 487 match url {
493 Some(url) => { 488 Some(url) => {