upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:33:03 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 11:33:03 +0000
commit09d9771acaffdc6e798cc32d2a68e6d46a323d3a (patch)
treec580f49311e81a9348e7b08d761708b93226b388 /src/purgatory/sync
parent08eba888fe510896f1863be6f2b5a72a91616975 (diff)
Add ThrottleManager for cross-domain rate limiting
Implements ThrottleManager which manages all per-domain DomainThrottle instances and provides: - Throttle status checking via is_throttled() for sync URL selection - Request tracking via start_request()/complete_request() - Identifier queue management via enqueue_identifier() - Automatic domain throttle creation on first access - Thread-safe access via DashMap with Mutex-wrapped throttles The manager uses the configured max_concurrent and max_per_minute limits for all domains. Trigger-based queue processing (set_context, process_queued_identifier) will be added after SyncContext is available. Tests verify: - is_throttled reflects domain capacity correctly - enqueue_identifier creates domain throttle if needed - start_request creates domain throttle if needed
Diffstat (limited to 'src/purgatory/sync')
-rw-r--r--src/purgatory/sync/mod.rs2
-rw-r--r--src/purgatory/sync/throttle.rs172
2 files changed, 173 insertions, 1 deletions
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs
index 3c0d875..b29f10e 100644
--- a/src/purgatory/sync/mod.rs
+++ b/src/purgatory/sync/mod.rs
@@ -10,4 +10,4 @@ mod queue;
10mod throttle; 10mod throttle;
11 11
12pub use queue::SyncQueueEntry; 12pub use queue::SyncQueueEntry;
13pub use throttle::DomainThrottle; 13pub use throttle::{DomainThrottle, ThrottleManager};
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs
index 6f01c86..94056b5 100644
--- a/src/purgatory/sync/throttle.rs
+++ b/src/purgatory/sync/throttle.rs
@@ -5,9 +5,14 @@
5//! - Concurrent request limit (max in-flight requests) 5//! - Concurrent request limit (max in-flight requests)
6//! - Rate limit (max requests per minute) 6//! - Rate limit (max requests per minute)
7//! - Queue of identifiers waiting for capacity (with round-robin processing) 7//! - Queue of identifiers waiting for capacity (with round-robin processing)
8//!
9//! The `ThrottleManager` owns all `DomainThrottle` instances and provides the
10//! interface for checking throttle status and managing identifier queues.
8 11
12use dashmap::DashMap;
9use indexmap::IndexMap; 13use indexmap::IndexMap;
10use std::collections::{HashSet, VecDeque}; 14use std::collections::{HashSet, VecDeque};
15use std::sync::Mutex;
11use std::time::{Duration, Instant}; 16use std::time::{Duration, Instant};
12 17
13/// State for an identifier waiting in a domain's queue. 18/// State for an identifier waiting in a domain's queue.
@@ -229,6 +234,119 @@ impl DomainThrottle {
229 } 234 }
230} 235}
231 236
237/// Manages rate limiting across all domains.
238///
239/// Owns a collection of `DomainThrottle` instances and provides:
240/// - Throttle status checking for `sync_identifier_next_url`
241/// - Identifier queue management
242/// - Request tracking (start/complete)
243///
244/// Note: Trigger-based queue processing (`set_context`, `process_queued_identifier`)
245/// will be added in Phase 6 after `SyncContext` is available.
246pub struct ThrottleManager {
247 /// Per-domain throttle state.
248 /// Uses DashMap for concurrent access from multiple sync tasks.
249 throttles: DashMap<String, Mutex<DomainThrottle>>,
250
251 /// Maximum concurrent requests per domain.
252 max_concurrent_per_domain: u32,
253
254 /// Maximum requests per minute per domain.
255 max_per_minute_per_domain: u32,
256}
257
258impl ThrottleManager {
259 /// Create a new throttle manager with the specified limits.
260 ///
261 /// # Arguments
262 /// * `max_concurrent` - Maximum concurrent in-flight requests per domain
263 /// * `max_per_minute` - Maximum requests per 60-second window per domain
264 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self {
265 Self {
266 throttles: DashMap::new(),
267 max_concurrent_per_domain: max_concurrent,
268 max_per_minute_per_domain: max_per_minute,
269 }
270 }
271
272 /// Check if a domain is currently throttled (at capacity).
273 ///
274 /// Returns true if the domain has no capacity for another request,
275 /// either due to concurrent limit or rate limit.
276 pub fn is_throttled(&self, domain: &str) -> bool {
277 self.throttles
278 .get(domain)
279 .map_or(false, |entry| {
280 let throttle = entry.lock().unwrap();
281 !throttle.has_capacity()
282 })
283 }
284
285 /// Get or create a throttle for a domain.
286 fn get_or_create_throttle(&self, domain: &str) -> dashmap::mapref::one::Ref<'_, String, Mutex<DomainThrottle>> {
287 // First, try to get existing
288 if let Some(entry) = self.throttles.get(domain) {
289 return entry;
290 }
291
292 // Create new throttle
293 self.throttles.entry(domain.to_string()).or_insert_with(|| {
294 Mutex::new(DomainThrottle::new(
295 domain.to_string(),
296 self.max_concurrent_per_domain,
297 self.max_per_minute_per_domain,
298 ))
299 });
300
301 // Return the entry (we know it exists now)
302 self.throttles.get(domain).unwrap()
303 }
304
305 /// Record that a request is starting for a domain.
306 ///
307 /// Increments in-flight count and records timestamp for rate limiting.
308 pub fn start_request(&self, domain: &str) {
309 let entry = self.get_or_create_throttle(domain);
310 let mut throttle = entry.lock().unwrap();
311 throttle.start_request();
312 }
313
314 /// Record that a request completed for a domain.
315 ///
316 /// Decrements in-flight count and cleans up old timestamps.
317 ///
318 /// Note: Trigger-based processing (spawning tasks when capacity frees)
319 /// will be added in Phase 6 after `SyncContext` is available.
320 pub fn complete_request(&self, domain: &str) {
321 if let Some(entry) = self.throttles.get(domain) {
322 let mut throttle = entry.lock().unwrap();
323 throttle.complete_request();
324 }
325 }
326
327 /// Add an identifier to a domain's waiting queue.
328 ///
329 /// If the identifier is already queued for this domain, merges the tried_urls sets.
330 ///
331 /// Note: Trigger-based processing (spawning tasks when capacity available)
332 /// will be added in Phase 6 after `SyncContext` is available.
333 ///
334 /// # Arguments
335 /// * `domain` - The domain to queue for
336 /// * `identifier` - The repository identifier
337 /// * `tried_urls_for_domain` - URLs from this domain that have already been tried
338 pub fn enqueue_identifier(
339 &self,
340 domain: &str,
341 identifier: String,
342 tried_urls_for_domain: HashSet<String>,
343 ) {
344 let entry = self.get_or_create_throttle(domain);
345 let mut throttle = entry.lock().unwrap();
346 throttle.enqueue_identifier(identifier, tried_urls_for_domain);
347 }
348}
349
232#[cfg(test)] 350#[cfg(test)]
233mod tests { 351mod tests {
234 use super::*; 352 use super::*;
@@ -388,4 +506,58 @@ mod tests {
388 assert!(tried.contains("url2")); 506 assert!(tried.contains("url2"));
389 assert_eq!(tried.len(), 2); 507 assert_eq!(tried.len(), 2);
390 } 508 }
509
510 // ThrottleManager tests
511
512 #[test]
513 fn is_throttled_reflects_domain_capacity() {
514 let manager = ThrottleManager::new(2, 100);
515
516 // New domain should not be throttled (has capacity)
517 assert!(!manager.is_throttled("example.com"));
518
519 // Start 2 requests (at concurrent limit)
520 manager.start_request("example.com");
521 manager.start_request("example.com");
522
523 // Should now be throttled
524 assert!(manager.is_throttled("example.com"));
525
526 // Complete one request
527 manager.complete_request("example.com");
528
529 // Should have capacity again
530 assert!(!manager.is_throttled("example.com"));
531
532 // Different domain should be independent
533 assert!(!manager.is_throttled("other.com"));
534 }
535
536 #[test]
537 fn enqueue_identifier_creates_domain_throttle() {
538 let manager = ThrottleManager::new(5, 100);
539
540 // Domain doesn't exist yet
541 assert!(!manager.throttles.contains_key("example.com"));
542
543 // Enqueue an identifier
544 manager.enqueue_identifier("example.com", "repo1".to_string(), HashSet::new());
545
546 // Domain throttle should now exist
547 assert!(manager.throttles.contains_key("example.com"));
548 }
549
550 #[test]
551 fn start_request_creates_domain_throttle() {
552 let manager = ThrottleManager::new(5, 100);
553
554 // Domain doesn't exist yet
555 assert!(!manager.throttles.contains_key("example.com"));
556
557 // Start a request
558 manager.start_request("example.com");
559
560 // Domain throttle should now exist
561 assert!(manager.throttles.contains_key("example.com"));
562 }
391} 563}