upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory/sync/functions.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:44:27 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:44:27 +0000
commit1c5dac680b5a446e26b161208a17030f5fbd8a88 (patch)
tree76a7113edcdd7436872665473727c1f535d7ab83 /src/purgatory/sync/functions.rs
parent5bd6b9b93cd52da2075bc00a08cf7feca4b85d5c (diff)
Add core sync functions for identifier-based purgatory synchronization
Implement sync_identifier_next_url and sync_identifier_from_url functions that provide the core URL selection and fetch logic for purgatory sync. sync_identifier_next_url: - Pure URL selection logic with no side effects - Filters out our own domain and already-tried URLs - Respects domain throttling when domain parameter is None - Can target a specific domain when domain parameter is Some sync_identifier_from_url: - Fetches OIDs from a specific URL via the SyncContext - Tracks request start/completion with ThrottleManager for rate limiting - Calls process_newly_available_git_data on successful fetch Also adds get_throttled_domains_with_untried_urls helper for the main sync loop to know which DomainThrottle queues to enqueue identifiers to. These functions are designed to be called by both: - Main sync loop (tries non-throttled URLs immediately) - DomainThrottle queue processing (when capacity frees up) Includes 10 unit tests covering: - Throttled domain skipping - Tried URL skipping - Our domain filtering - Specific domain targeting - Fetch success/failure handling - Throttle request tracking
Diffstat (limited to 'src/purgatory/sync/functions.rs')
-rw-r--r--src/purgatory/sync/functions.rs620
1 files changed, 620 insertions, 0 deletions
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
new file mode 100644
index 0000000..de846f3
--- /dev/null
+++ b/src/purgatory/sync/functions.rs
@@ -0,0 +1,620 @@
1//! Core sync functions for identifier-based purgatory synchronization.
2//!
3//! This module provides the two main functions that both the main sync loop
4//! and `DomainThrottle` queue processing use:
5//!
6//! - [`sync_identifier_next_url`]: Pure URL selection logic - finds next URL to try
7//! - [`sync_identifier_from_url`]: Pure fetch logic - fetches from a specific URL
8//!
9//! The separation enables:
10//! - Main sync loop to try non-throttled URLs immediately
11//! - DomainThrottle to process queued identifiers when capacity frees
12//! - Clean testability with mocked SyncContext
13
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tracing::debug;
17
18use super::context::SyncContext;
19use super::throttle::ThrottleManager;
20
21/// Extract domain from a URL.
22///
23/// # Examples
24///
25/// ```ignore
26/// assert_eq!(extract_domain("https://github.com/foo/bar.git"), Some("github.com".to_string()));
27/// assert_eq!(extract_domain("git@github.com:foo/bar.git"), None); // SSH URLs not supported
28/// ```
29fn extract_domain(url: &str) -> Option<String> {
30 url::Url::parse(url)
31 .ok()
32 .and_then(|u| u.host_str().map(|s| s.to_string()))
33}
34
35/// Find the next URL to try for an identifier.
36///
37/// This is pure URL selection logic with no side effects. It:
38/// 1. Checks if there are pending events for the identifier
39/// 2. Checks if there are OIDs still needed
40/// 3. Gets repository data and extracts clone URLs
41/// 4. Filters out our own domain and already-tried URLs
42/// 5. Returns the first non-throttled URL (when `domain` is None)
43/// or a URL from the specified domain (when `domain` is Some)
44///
45/// # Arguments
46///
47/// * `ctx` - The sync context providing repository data and OID information
48/// * `identifier` - The repository identifier (d-tag value)
49/// * `domain` - If Some, only return URLs from this specific domain.
50/// If None, return any non-throttled URL.
51/// * `tried_urls` - URLs that have already been tried (will be skipped)
52/// * `throttle_manager` - Used to check if domains are throttled (when domain is None)
53///
54/// # Returns
55///
56/// * `Some(url)` - The next URL to try
57/// * `None` - No suitable URL found (all tried, all throttled, or no URLs available)
58pub async fn sync_identifier_next_url<C: SyncContext>(
59 ctx: &C,
60 identifier: &str,
61 domain: Option<&str>,
62 tried_urls: &HashSet<String>,
63 throttle_manager: &ThrottleManager,
64) -> Option<String> {
65 // 1. Check if we still have pending events
66 if !ctx.has_pending_events(identifier) {
67 debug!(
68 identifier = %identifier,
69 "No pending events - skipping URL selection"
70 );
71 return None;
72 }
73
74 // 2. Collect needed OIDs
75 let needed_oids = ctx.collect_needed_oids(identifier);
76 if needed_oids.is_empty() {
77 debug!(
78 identifier = %identifier,
79 "No OIDs needed - sync is complete"
80 );
81 return None;
82 }
83
84 // 3. Get repository data
85 let repo_data = match ctx.fetch_repository_data(identifier).await {
86 Ok(data) => data,
87 Err(e) => {
88 debug!(
89 identifier = %identifier,
90 error = %e,
91 "Failed to fetch repository data"
92 );
93 return None;
94 }
95 };
96
97 // 4. Collect clone URLs, excluding our domain
98 let our_domain = ctx.our_domain();
99 let all_urls: HashSet<String> = repo_data
100 .announcements
101 .iter()
102 .flat_map(|a| a.clone_urls.iter().cloned())
103 .filter(|url| our_domain.map_or(true, |d| !url.contains(d)))
104 .collect();
105
106 if all_urls.is_empty() {
107 debug!(
108 identifier = %identifier,
109 "No clone URLs available (after filtering our domain)"
110 );
111 return None;
112 }
113
114 // 5. Group by domain
115 let urls_by_domain: HashMap<String, Vec<String>> =
116 all_urls.iter().fold(HashMap::new(), |mut acc, url| {
117 if let Some(d) = extract_domain(url) {
118 acc.entry(d).or_default().push(url.clone());
119 }
120 acc
121 });
122
123 // 6. Find an available URL
124 match domain {
125 Some(specific_domain) => {
126 // Only look at URLs from this specific domain
127 urls_by_domain.get(specific_domain).and_then(|urls| {
128 urls.iter()
129 .find(|url| !tried_urls.contains(*url))
130 .cloned()
131 })
132 }
133 None => {
134 // Try any non-throttled domain
135 for (d, domain_urls) in &urls_by_domain {
136 if throttle_manager.is_throttled(d) {
137 debug!(
138 identifier = %identifier,
139 domain = %d,
140 "Domain is throttled - skipping"
141 );
142 continue;
143 }
144 if let Some(url) = domain_urls.iter().find(|url| !tried_urls.contains(*url)) {
145 return Some(url.clone());
146 }
147 }
148 None
149 }
150 }
151}
152
153/// Information about throttled domains with untried URLs.
154///
155/// Used by the main sync loop to know which `DomainThrottle` queues
156/// to add the identifier to when it can't complete immediately.
157#[derive(Debug, Clone)]
158pub struct ThrottledDomainInfo {
159 /// The throttled domain name
160 pub domain: String,
161 /// URLs from this domain that have already been tried
162 pub tried_urls_for_domain: HashSet<String>,
163}
164
165/// Get information about throttled domains that have untried URLs.
166///
167/// Called by main sync loop to know which `DomainThrottle` queues to add
168/// the identifier to when non-throttled URLs are exhausted.
169///
170/// # Arguments
171///
172/// * `ctx` - The sync context providing repository data
173/// * `identifier` - The repository identifier
174/// * `tried_urls` - All URLs that have been tried (across all domains)
175/// * `throttle_manager` - Used to check which domains are throttled
176///
177/// # Returns
178///
179/// A list of throttled domains that still have untried URLs, along with
180/// the tried URLs for each domain (for proper queue state).
181pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>(
182 ctx: &C,
183 identifier: &str,
184 tried_urls: &HashSet<String>,
185 throttle_manager: &ThrottleManager,
186) -> Vec<ThrottledDomainInfo> {
187 let repo_data = match ctx.fetch_repository_data(identifier).await {
188 Ok(data) => data,
189 Err(_) => return vec![],
190 };
191
192 let our_domain = ctx.our_domain();
193 let all_urls: HashSet<String> = repo_data
194 .announcements
195 .iter()
196 .flat_map(|a| a.clone_urls.iter().cloned())
197 .filter(|url| our_domain.map_or(true, |d| !url.contains(d)))
198 .collect();
199
200 let urls_by_domain: HashMap<String, Vec<String>> =
201 all_urls.iter().fold(HashMap::new(), |mut acc, url| {
202 if let Some(d) = extract_domain(url) {
203 acc.entry(d).or_default().push(url.clone());
204 }
205 acc
206 });
207
208 urls_by_domain
209 .into_iter()
210 .filter_map(|(domain, domain_urls)| {
211 if !throttle_manager.is_throttled(&domain) {
212 return None; // Not throttled, skip
213 }
214
215 let untried: Vec<_> = domain_urls
216 .iter()
217 .filter(|url| !tried_urls.contains(*url))
218 .collect();
219
220 if untried.is_empty() {
221 return None; // All URLs tried for this domain
222 }
223
224 // Collect tried URLs that belong to this domain
225 let tried_urls_for_domain: HashSet<String> = tried_urls
226 .iter()
227 .filter(|url| extract_domain(url).as_deref() == Some(domain.as_str()))
228 .cloned()
229 .collect();
230
231 Some(ThrottledDomainInfo {
232 domain,
233 tried_urls_for_domain,
234 })
235 })
236 .collect()
237}
238
239/// Fetch git data from a specific URL for an identifier.
240///
241/// This function:
242/// 1. Records the request with the throttle manager (for rate limiting)
243/// 2. Performs the actual git fetch via the context
244/// 3. Processes any events that can now be satisfied
245/// 4. Records request completion
246///
247/// # Arguments
248///
249/// * `ctx` - The sync context providing fetch and processing capabilities
250/// * `identifier` - The repository identifier
251/// * `url` - The remote URL to fetch from
252/// * `throttle_manager` - Used to track request start/completion for rate limiting
253///
254/// # Returns
255///
256/// The number of OIDs successfully fetched (0 on failure)
257pub async fn sync_identifier_from_url<C: SyncContext>(
258 ctx: &C,
259 identifier: &str,
260 url: &str,
261 throttle_manager: &Arc<ThrottleManager>,
262) -> usize {
263 let domain = match extract_domain(url) {
264 Some(d) => d,
265 None => {
266 debug!(
267 identifier = %identifier,
268 url = %url,
269 "Could not extract domain from URL"
270 );
271 return 0;
272 }
273 };
274
275 // Get repository data for target repo path
276 let repo_data = match ctx.fetch_repository_data(identifier).await {
277 Ok(data) => data,
278 Err(e) => {
279 debug!(
280 identifier = %identifier,
281 error = %e,
282 "Failed to fetch repo data"
283 );
284 return 0;
285 }
286 };
287
288 let target_repo = match ctx.find_target_repo(&repo_data) {
289 Some(path) => path,
290 None => {
291 debug!(identifier = %identifier, "No target repo found");
292 return 0;
293 }
294 };
295
296 // Collect needed OIDs
297 let needed_oids: Vec<String> = ctx.collect_needed_oids(identifier).into_iter().collect();
298 if needed_oids.is_empty() {
299 debug!(
300 identifier = %identifier,
301 "No OIDs needed - nothing to fetch"
302 );
303 return 0;
304 }
305
306 // Perform the fetch with throttle tracking
307 throttle_manager.start_request(&domain);
308 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await;
309 throttle_manager.complete_request(&domain);
310
311 let oids_fetched = match fetch_result {
312 Ok(fetched) => {
313 debug!(
314 identifier = %identifier,
315 url = %url,
316 oids_fetched = fetched.len(),
317 "Fetch succeeded"
318 );
319 fetched.len()
320 }
321 Err(e) => {
322 debug!(
323 identifier = %identifier,
324 url = %url,
325 error = %e,
326 "Fetch failed"
327 );
328 0
329 }
330 };
331
332 // Try to process any events that can now be satisfied
333 if oids_fetched > 0 {
334 let new_oids: HashSet<String> = needed_oids.into_iter().collect();
335 if let Err(e) = ctx
336 .process_newly_available_git_data(&target_repo, &new_oids)
337 .await
338 {
339 debug!(
340 identifier = %identifier,
341 error = %e,
342 "Failed to process newly available git data"
343 );
344 }
345 }
346
347 oids_fetched
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use crate::purgatory::sync::MockSyncContext;
354
355 #[tokio::test]
356 async fn next_url_skips_throttled_domains() {
357 // Set up mock with URLs from two domains
358 let mock = MockSyncContext::new()
359 .with_urls(&[
360 "https://github.com/foo/bar.git",
361 "https://gitlab.com/foo/bar.git",
362 ])
363 .with_needed_oids(&["abc123"])
364 .with_pending_events(true);
365
366 // Create throttle manager and throttle github.com
367 let throttle_manager = ThrottleManager::new(1, 100);
368
369 // Saturate github.com by starting a request
370 throttle_manager.start_request("github.com");
371
372 // Should return gitlab.com URL since github.com is throttled
373 let tried_urls = HashSet::new();
374 let result =
375 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager)
376 .await;
377
378 assert!(result.is_some());
379 let url = result.unwrap();
380 assert!(
381 url.contains("gitlab.com"),
382 "Expected gitlab.com URL, got: {}",
383 url
384 );
385 }
386
387 #[tokio::test]
388 async fn next_url_skips_tried_urls() {
389 // Set up mock with two URLs from same domain
390 let mock = MockSyncContext::new()
391 .with_urls(&[
392 "https://github.com/foo/bar.git",
393 "https://github.com/foo/bar2.git",
394 ])
395 .with_needed_oids(&["abc123"])
396 .with_pending_events(true);
397
398 let throttle_manager = ThrottleManager::new(5, 100);
399
400 // Mark first URL as tried
401 let mut tried_urls = HashSet::new();
402 tried_urls.insert("https://github.com/foo/bar.git".to_string());
403
404 // Should return the second URL
405 let result =
406 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager)
407 .await;
408
409 assert!(result.is_some());
410 let url = result.unwrap();
411 assert_eq!(url, "https://github.com/foo/bar2.git");
412 }
413
414 #[tokio::test]
415 async fn next_url_returns_none_when_no_pending_events() {
416 let mock = MockSyncContext::new()
417 .with_urls(&["https://github.com/foo/bar.git"])
418 .with_needed_oids(&["abc123"])
419 .with_pending_events(false); // No pending events
420
421 let throttle_manager = ThrottleManager::new(5, 100);
422 let tried_urls = HashSet::new();
423
424 let result =
425 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager)
426 .await;
427
428 assert!(result.is_none());
429 }
430
431 #[tokio::test]
432 async fn next_url_returns_none_when_no_oids_needed() {
433 let mock = MockSyncContext::new()
434 .with_urls(&["https://github.com/foo/bar.git"])
435 .with_needed_oids(&[]) // No OIDs needed
436 .with_pending_events(true);
437
438 let throttle_manager = ThrottleManager::new(5, 100);
439 let tried_urls = HashSet::new();
440
441 let result =
442 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager)
443 .await;
444
445 assert!(result.is_none());
446 }
447
448 #[tokio::test]
449 async fn next_url_filters_our_domain() {
450 let mock = MockSyncContext::new()
451 .with_urls(&[
452 "https://our-relay.com/foo/bar.git",
453 "https://github.com/foo/bar.git",
454 ])
455 .with_needed_oids(&["abc123"])
456 .with_pending_events(true)
457 .with_our_domain("our-relay.com");
458
459 let throttle_manager = ThrottleManager::new(5, 100);
460 let tried_urls = HashSet::new();
461
462 let result =
463 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager)
464 .await;
465
466 assert!(result.is_some());
467 let url = result.unwrap();
468 assert!(
469 url.contains("github.com"),
470 "Expected github.com URL (our domain filtered), got: {}",
471 url
472 );
473 }
474
475 #[tokio::test]
476 async fn next_url_with_specific_domain() {
477 let mock = MockSyncContext::new()
478 .with_urls(&[
479 "https://github.com/foo/bar.git",
480 "https://gitlab.com/foo/bar.git",
481 ])
482 .with_needed_oids(&["abc123"])
483 .with_pending_events(true);
484
485 let throttle_manager = ThrottleManager::new(5, 100);
486 let tried_urls = HashSet::new();
487
488 // Request specific domain
489 let result = sync_identifier_next_url(
490 &mock,
491 "test-repo",
492 Some("gitlab.com"),
493 &tried_urls,
494 &throttle_manager,
495 )
496 .await;
497
498 assert!(result.is_some());
499 let url = result.unwrap();
500 assert!(
501 url.contains("gitlab.com"),
502 "Expected gitlab.com URL, got: {}",
503 url
504 );
505 }
506
507 #[tokio::test]
508 async fn from_url_fetches_and_processes_on_success() {
509 // Set up mock that can provide the needed OID
510 let mock = MockSyncContext::new()
511 .with_urls(&["https://github.com/foo/bar.git"])
512 .with_needed_oids(&["abc123"])
513 .with_pending_events(true)
514 .url_provides("https://github.com/foo/bar.git", &["abc123"]);
515
516 let throttle_manager = Arc::new(ThrottleManager::new(5, 100));
517
518 // Fetch from the URL
519 let fetched = sync_identifier_from_url(
520 &mock,
521 "test-repo",
522 "https://github.com/foo/bar.git",
523 &throttle_manager,
524 )
525 .await;
526
527 // Should have fetched 1 OID
528 assert_eq!(fetched, 1);
529
530 // Should have logged the fetch attempt
531 let fetch_log = mock.fetch_log();
532 assert_eq!(fetch_log.len(), 1);
533 assert_eq!(fetch_log[0], "https://github.com/foo/bar.git");
534
535 // OID should no longer be needed
536 assert!(mock.current_needed_oids().is_empty());
537 }
538
539 #[tokio::test]
540 async fn from_url_returns_zero_on_failure() {
541 let mock = MockSyncContext::new()
542 .with_urls(&["https://bad-server.com/repo.git"])
543 .with_needed_oids(&["abc123"])
544 .with_pending_events(true)
545 .url_should_fail("https://bad-server.com/repo.git");
546
547 let throttle_manager = Arc::new(ThrottleManager::new(5, 100));
548
549 let fetched = sync_identifier_from_url(
550 &mock,
551 "test-repo",
552 "https://bad-server.com/repo.git",
553 &throttle_manager,
554 )
555 .await;
556
557 // Should return 0 on failure
558 assert_eq!(fetched, 0);
559
560 // OID should still be needed
561 assert!(mock.current_needed_oids().contains("abc123"));
562 }
563
564 #[tokio::test]
565 async fn from_url_tracks_throttle_requests() {
566 let mock = MockSyncContext::new()
567 .with_urls(&["https://github.com/foo/bar.git"])
568 .with_needed_oids(&["abc123"])
569 .with_pending_events(true)
570 .url_provides("https://github.com/foo/bar.git", &["abc123"]);
571
572 let throttle_manager = Arc::new(ThrottleManager::new(1, 100));
573
574 // First request should work
575 let fetched = sync_identifier_from_url(
576 &mock,
577 "test-repo",
578 "https://github.com/foo/bar.git",
579 &throttle_manager,
580 )
581 .await;
582 assert_eq!(fetched, 1);
583
584 // After completion, domain should not be throttled
585 assert!(!throttle_manager.is_throttled("github.com"));
586 }
587
588 #[tokio::test]
589 async fn get_throttled_domains_returns_only_throttled_with_untried() {
590 let mock = MockSyncContext::new()
591 .with_urls(&[
592 "https://github.com/foo/bar.git",
593 "https://gitlab.com/foo/bar.git",
594 "https://bitbucket.org/foo/bar.git",
595 ])
596 .with_needed_oids(&["abc123"])
597 .with_pending_events(true);
598
599 let throttle_manager = ThrottleManager::new(1, 100);
600
601 // Throttle github.com and gitlab.com
602 throttle_manager.start_request("github.com");
603 throttle_manager.start_request("gitlab.com");
604
605 // Mark github.com URL as already tried
606 let mut tried_urls = HashSet::new();
607 tried_urls.insert("https://github.com/foo/bar.git".to_string());
608
609 let throttled =
610 get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager)
611 .await;
612
613 // Should only include gitlab.com (throttled with untried URLs)
614 // github.com is throttled but URL was tried
615 // bitbucket.org is not throttled
616 assert_eq!(throttled.len(), 1);
617 assert_eq!(throttled[0].domain, "gitlab.com");
618 assert!(throttled[0].tried_urls_for_domain.is_empty());
619 }
620}