upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory/sync/throttle.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/purgatory/sync/throttle.rs')
-rw-r--r--src/purgatory/sync/throttle.rs278
1 files changed, 242 insertions, 36 deletions
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs
index 94056b5..a310a91 100644
--- a/src/purgatory/sync/throttle.rs
+++ b/src/purgatory/sync/throttle.rs
@@ -8,12 +8,22 @@
8//! 8//!
9//! The `ThrottleManager` owns all `DomainThrottle` instances and provides the 9//! The `ThrottleManager` owns all `DomainThrottle` instances and provides the
10//! interface for checking throttle status and managing identifier queues. 10//! interface for checking throttle status and managing identifier queues.
11//!
12//! ## Trigger-based Processing
13//!
14//! When capacity frees up (via `complete_request`) or a new identifier is enqueued
15//! (via `enqueue_identifier`), the manager automatically spawns tasks to process
16//! queued identifiers. This is trigger-based, not polling-based.
11 17
12use dashmap::DashMap; 18use dashmap::DashMap;
13use indexmap::IndexMap; 19use indexmap::IndexMap;
14use std::collections::{HashSet, VecDeque}; 20use std::collections::{HashSet, VecDeque};
15use std::sync::Mutex; 21use std::sync::{Arc, Mutex, OnceLock};
16use std::time::{Duration, Instant}; 22use std::time::{Duration, Instant};
23use tracing::debug;
24
25use super::context::SyncContext;
26use super::functions::{sync_identifier_from_url, sync_identifier_next_url};
17 27
18/// State for an identifier waiting in a domain's queue. 28/// State for an identifier waiting in a domain's queue.
19/// 29///
@@ -240,9 +250,7 @@ impl DomainThrottle {
240/// - Throttle status checking for `sync_identifier_next_url` 250/// - Throttle status checking for `sync_identifier_next_url`
241/// - Identifier queue management 251/// - Identifier queue management
242/// - Request tracking (start/complete) 252/// - Request tracking (start/complete)
243/// 253/// - Trigger-based queue processing when capacity frees up
244/// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`)
245/// will be added in Phase 6 after `SyncContext` is available.
246pub struct ThrottleManager { 254pub struct ThrottleManager {
247 /// Per-domain throttle state. 255 /// Per-domain throttle state.
248 /// Uses DashMap for concurrent access from multiple sync tasks. 256 /// Uses DashMap for concurrent access from multiple sync tasks.
@@ -253,6 +261,10 @@ pub struct ThrottleManager {
253 261
254 /// Maximum requests per minute per domain. 262 /// Maximum requests per minute per domain.
255 max_per_minute_per_domain: u32, 263 max_per_minute_per_domain: u32,
264
265 /// Sync context for processing queued identifiers.
266 /// Set once at startup via `set_context()`.
267 ctx: OnceLock<Arc<dyn SyncContext>>,
256} 268}
257 269
258impl ThrottleManager { 270impl ThrottleManager {
@@ -266,38 +278,54 @@ impl ThrottleManager {
266 throttles: DashMap::new(), 278 throttles: DashMap::new(),
267 max_concurrent_per_domain: max_concurrent, 279 max_concurrent_per_domain: max_concurrent,
268 max_per_minute_per_domain: max_per_minute, 280 max_per_minute_per_domain: max_per_minute,
281 ctx: OnceLock::new(),
269 } 282 }
270 } 283 }
271 284
285 /// Set the sync context (called once at startup).
286 ///
287 /// The context is used for processing queued identifiers when capacity
288 /// becomes available. Must be called before any trigger-based processing
289 /// can occur.
290 ///
291 /// # Arguments
292 /// * `ctx` - The sync context implementation
293 pub fn set_context(&self, ctx: Arc<dyn SyncContext>) {
294 let _ = self.ctx.set(ctx);
295 }
296
272 /// Check if a domain is currently throttled (at capacity). 297 /// Check if a domain is currently throttled (at capacity).
273 /// 298 ///
274 /// Returns true if the domain has no capacity for another request, 299 /// Returns true if the domain has no capacity for another request,
275 /// either due to concurrent limit or rate limit. 300 /// either due to concurrent limit or rate limit.
276 pub fn is_throttled(&self, domain: &str) -> bool { 301 pub fn is_throttled(&self, domain: &str) -> bool {
277 self.throttles 302 self.throttles.get(domain).map_or(false, |entry| {
278 .get(domain) 303 let throttle = entry.lock().unwrap();
279 .map_or(false, |entry| { 304 !throttle.has_capacity()
280 let throttle = entry.lock().unwrap(); 305 })
281 !throttle.has_capacity()
282 })
283 } 306 }
284 307
285 /// Get or create a throttle for a domain. 308 /// Get or create a throttle for a domain.
286 fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> { 309 fn get_or_create_throttle(
310 &self,
311 domain: &str,
312 ) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> {
287 // First, try to get existing 313 // First, try to get existing
288 if let Some(entry) = self.throttles.get(domain) { 314 if let Some(entry) = self.throttles.get(domain) {
289 return entry; 315 return entry;
290 } 316 }
291 317
292 // Create new throttle 318 // Create new throttle
293 self.throttles.entry(domain.to_string()).or_insert_with(|| { 319 self.throttles
294 Mutex::new(DomainThrottle::new( 320 .entry(domain.to_string())
295 domain.to_string(), 321 .or_insert_with(|| {
296 self.max_concurrent_per_domain, 322 Mutex::new(DomainThrottle::new(
297 self.max_per_minute_per_domain, 323 domain.to_string(),
298 )) 324 self.max_concurrent_per_domain,
299 }); 325 self.max_per_minute_per_domain,
300 326 ))
327 });
328
301 // Return the entry (we know it exists now) 329 // Return the entry (we know it exists now)
302 self.throttles.get(domain).unwrap() 330 self.throttles.get(domain).unwrap()
303 } 331 }
@@ -311,39 +339,193 @@ impl ThrottleManager {
311 throttle.start_request(); 339 throttle.start_request();
312 } 340 }
313 341
314 /// Record that a request completed for a domain. 342 /// Record that a request completed for a domain (internal, no trigger).
315 /// 343 ///
316 /// Decrements in-flight count and cleans up old timestamps. 344 /// Decrements in-flight count and cleans up old timestamps.
317 /// 345 /// Does not trigger processing of queued identifiers.
318 /// Note: Trigger-based processing (spawning tasks when capacity frees) 346 fn complete_request_internal(&self, domain: &str) {
319 /// will be added in Phase 6 after `SyncContext` is available.
320 pub fn complete_request(&self, domain: &str) {
321 if let Some(entry) = self.throttles.get(domain) { 347 if let Some(entry) = self.throttles.get(domain) {
322 let mut throttle = entry.lock().unwrap(); 348 let mut throttle = entry.lock().unwrap();
323 throttle.complete_request(); 349 throttle.complete_request();
324 } 350 }
325 } 351 }
326 352
327 /// Add an identifier to a domain's waiting queue. 353 /// Record that a request completed for a domain.
354 ///
355 /// Decrements in-flight count, cleans up old timestamps, and triggers
356 /// processing of queued identifiers if capacity is available.
357 ///
358 /// # Arguments
359 /// * `domain` - The domain that completed a request
360 pub fn complete_request(self: &Arc<Self>, domain: &str) {
361 let should_trigger = {
362 if let Some(entry) = self.throttles.get(domain) {
363 let mut throttle = entry.lock().unwrap();
364 throttle.complete_request();
365 throttle.has_capacity() && throttle.has_queued_work()
366 } else {
367 false
368 }
369 };
370
371 if should_trigger {
372 self.try_process_next(domain);
373 }
374 }
375
376 /// Add an identifier to a domain's waiting queue (internal, no trigger).
328 /// 377 ///
329 /// If the identifier is already queued for this domain, merges the tried_urls sets. 378 /// If the identifier is already queued for this domain, merges the tried_urls sets.
379 /// Does not trigger processing.
380 fn enqueue_identifier_internal(
381 &self,
382 domain: &str,
383 identifier: String,
384 tried_urls_for_domain: HashSet<String>,
385 ) {
386 let entry = self.get_or_create_throttle(domain);
387 let mut throttle = entry.lock().unwrap();
388 throttle.enqueue_identifier(identifier, tried_urls_for_domain);
389 }
390
391 /// Add an identifier to a domain's waiting queue.
330 /// 392 ///
331 /// Note: Trigger-based processing (spawning tasks when capacity available) 393 /// If the identifier is already queued for this domain, merges the tried_urls sets.
332 /// will be added in Phase 6 after `SyncContext` is available. 394 /// Triggers processing if capacity is available.
333 /// 395 ///
334 /// # Arguments 396 /// # Arguments
335 /// * `domain` - The domain to queue for 397 /// * `domain` - The domain to queue for
336 /// * `identifier` - The repository identifier 398 /// * `identifier` - The repository identifier
337 /// * `tried_urls_for_domain` - URLs from this domain that have already been tried 399 /// * `tried_urls_for_domain` - URLs from this domain that have already been tried
338 pub fn enqueue_identifier( 400 pub fn enqueue_identifier(
339 &self, 401 self: &Arc<Self>,
340 domain: &str, 402 domain: &str,
341 identifier: String, 403 identifier: String,
342 tried_urls_for_domain: HashSet<String>, 404 tried_urls_for_domain: HashSet<String>,
343 ) { 405 ) {
344 let entry = self.get_or_create_throttle(domain); 406 let should_trigger = {
345 let mut throttle = entry.lock().unwrap(); 407 let entry = self.get_or_create_throttle(domain);
346 throttle.enqueue_identifier(identifier, tried_urls_for_domain); 408 let mut throttle = entry.lock().unwrap();
409 throttle.enqueue_identifier(identifier, tried_urls_for_domain);
410 throttle.has_capacity()
411 };
412
413 if should_trigger {
414 self.try_process_next(domain);
415 }
416 }
417
418 /// Try to process the next queued identifier for a domain.
419 ///
420 /// This is called when capacity becomes available (either via `complete_request`
421 /// or when a new identifier is enqueued). Spawns a task to process the next
422 /// ready identifier if one exists.
423 fn try_process_next(self: &Arc<Self>, domain: &str) {
424 // Get next ready identifier (not in_progress)
425 let identifier = {
426 if let Some(entry) = self.throttles.get(domain) {
427 let mut throttle = entry.lock().unwrap();
428 throttle.next_ready_identifier()
429 } else {
430 None
431 }
432 };
433
434 if let Some(identifier) = identifier {
435 let manager = self.clone();
436 let domain = domain.to_string();
437
438 tokio::spawn(async move {
439 manager.process_queued_identifier(&domain, &identifier).await;
440 });
441 }
442 }
443
444 /// Process a single identifier from a domain's queue.
445 ///
446 /// This function:
447 /// 1. Gets the next URL to try for this identifier on this domain
448 /// 2. If a URL is found, fetches from it and marks it as tried
449 /// 3. If no URL is found, removes the identifier from this domain's queue
450 /// 4. Triggers processing of the next identifier if capacity is available
451 async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str) {
452 let ctx = match self.ctx.get() {
453 Some(ctx) => ctx,
454 None => {
455 debug!(
456 domain = %domain,
457 identifier = %identifier,
458 "No sync context set - cannot process queued identifier"
459 );
460 // Mark not in progress so it can be retried
461 if let Some(entry) = self.throttles.get(domain) {
462 let mut throttle = entry.lock().unwrap();
463 throttle.mark_identifier_not_in_progress(identifier);
464 }
465 return;
466 }
467 };
468
469 // Get tried URLs for this identifier on this domain
470 let tried_urls = {
471 self.throttles
472 .get(domain)
473 .map(|entry| {
474 let throttle = entry.lock().unwrap();
475 throttle.get_tried_urls(identifier)
476 })
477 .unwrap_or_default()
478 };
479
480 // Get next URL for this identifier on this specific domain
481 let url = sync_identifier_next_url(
482 ctx.as_ref(),
483 identifier,
484 Some(domain),
485 &tried_urls,
486 self,
487 )
488 .await;
489
490 match url {
491 Some(url) => {
492 debug!(
493 domain = %domain,
494 identifier = %identifier,
495 url = %url,
496 "Processing queued identifier - fetching from URL"
497 );
498
499 // Fetch from this URL
500 sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await;
501
502 // Record URL as tried and mark not in_progress
503 if let Some(entry) = self.throttles.get(domain) {
504 let mut throttle = entry.lock().unwrap();
505 throttle.mark_url_tried(identifier, url);
506 throttle.mark_identifier_not_in_progress(identifier);
507 }
508
509 // complete_request was already called by sync_identifier_from_url,
510 // which will trigger try_process_next if capacity is available
511 }
512 None => {
513 debug!(
514 domain = %domain,
515 identifier = %identifier,
516 "No more URLs for identifier on this domain - removing from queue"
517 );
518
519 // No more URLs for this identifier on this domain - remove from queue
520 if let Some(entry) = self.throttles.get(domain) {
521 let mut throttle = entry.lock().unwrap();
522 throttle.remove_identifier(identifier);
523 }
524
525 // Try next identifier since we didn't use any capacity
526 self.try_process_next(domain);
527 }
528 }
347 } 529 }
348} 530}
349 531
@@ -523,8 +705,8 @@ mod tests {
523 // Should now be throttled 705 // Should now be throttled
524 assert!(manager.is_throttled("example.com")); 706 assert!(manager.is_throttled("example.com"));
525 707
526 // Complete one request 708 // Complete one request (using internal method for non-Arc test)
527 manager.complete_request("example.com"); 709 manager.complete_request_internal("example.com");
528 710
529 // Should have capacity again 711 // Should have capacity again
530 assert!(!manager.is_throttled("example.com")); 712 assert!(!manager.is_throttled("example.com"));
@@ -540,8 +722,8 @@ mod tests {
540 // Domain doesn't exist yet 722 // Domain doesn't exist yet
541 assert!(!manager.throttles.contains_key("example.com")); 723 assert!(!manager.throttles.contains_key("example.com"));
542 724
543 // Enqueue an identifier 725 // Enqueue an identifier (using internal method for non-Arc test)
544 manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new()); 726 manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new());
545 727
546 // Domain throttle should now exist 728 // Domain throttle should now exist
547 assert!(manager.throttles.contains_key("example.com")); 729 assert!(manager.throttles.contains_key("example.com"));
@@ -560,4 +742,28 @@ mod tests {
560 // Domain throttle should now exist 742 // Domain throttle should now exist
561 assert!(manager.throttles.contains_key("example.com")); 743 assert!(manager.throttles.contains_key("example.com"));
562 } 744 }
745
746 #[test]
747 fn has_queued_work_reflects_queue_state() {
748 let manager = ThrottleManager::new(5, 100);
749
750 // Initially no queued work
751 let has_work = manager
752 .throttles
753 .get("example.com")
754 .map(|e| e.lock().unwrap().has_queued_work())
755 .unwrap_or(false);
756 assert!(!has_work);
757
758 // Enqueue an identifier
759 manager.enqueue_identifier_internal("example.com", "repo1".to_string(), HashSet::new());
760
761 // Now should have queued work
762 let has_work = manager
763 .throttles
764 .get("example.com")
765 .map(|e| e.lock().unwrap().has_queued_work())
766 .unwrap_or(false);
767 assert!(has_work);
768 }
563} 769}