diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:26:48 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:26:48 +0000 |
| commit | 08eba888fe510896f1863be6f2b5a72a91616975 (patch) | |
| tree | c5c947246dd34fbffbc1cb15b48102bb472d3d4f /src | |
| parent | 2665811f54f62f147b7d773c76bd26d032b8f9cb (diff) | |
Add DomainThrottle for per-domain rate limiting
Implement per-domain throttling for purgatory sync operations:
- Concurrent request limit (max in-flight requests per domain)
- Rate limit (max requests per minute via sliding window)
- Fair round-robin queue processing across identifiers
- In-progress tracking to prevent duplicate fetches
- Tried URL tracking per identifier
Add indexmap dependency for ordered iteration in round-robin queue.
Includes 6 unit tests covering:
- Concurrent limit enforcement
- Rate limit enforcement (sliding window)
- Round-robin fair processing
- In-progress identifier skipping
- Round-robin index adjustment on removal
- Tried URL merging on re-enqueue
Diffstat (limited to 'src')
| -rw-r--r-- | src/purgatory/sync/mod.rs | 2 | ||||
| -rw-r--r-- | src/purgatory/sync/throttle.rs | 391 |
2 files changed, 393 insertions, 0 deletions
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index 7b6d64a..3c0d875 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs | |||
| @@ -7,5 +7,7 @@ | |||
| 7 | //! - Debouncing for burst event arrivals | 7 | //! - Debouncing for burst event arrivals |
| 8 | 8 | ||
| 9 | mod queue; | 9 | mod queue; |
| 10 | mod throttle; | ||
| 10 | 11 | ||
| 11 | pub use queue::SyncQueueEntry; | 12 | pub use queue::SyncQueueEntry; |
| 13 | pub use throttle::DomainThrottle; | ||
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs new file mode 100644 index 0000000..6f01c86 --- /dev/null +++ b/src/purgatory/sync/throttle.rs | |||
| @@ -0,0 +1,391 @@ | |||
| 1 | //! Domain-based rate limiting and identifier queue management. | ||
| 2 | //! | ||
| 3 | //! This module provides per-domain throttling to prevent overwhelming remote | ||
| 4 | //! git servers during purgatory sync operations. Each domain has: | ||
| 5 | //! - Concurrent request limit (max in-flight requests) | ||
| 6 | //! - Rate limit (max requests per minute) | ||
| 7 | //! - Queue of identifiers waiting for capacity (with round-robin processing) | ||
| 8 | |||
| 9 | use indexmap::IndexMap; | ||
| 10 | use std::collections::{HashSet, VecDeque}; | ||
| 11 | use std::time::{Duration, Instant}; | ||
| 12 | |||
| 13 | /// State for an identifier waiting in a domain's queue. | ||
| 14 | /// | ||
| 15 | /// Tracks which URLs from this domain have been tried and whether | ||
| 16 | /// a fetch is currently in progress for this identifier. | ||
| 17 | #[derive(Debug, Clone)] | ||
| 18 | struct IdentifierQueueState { | ||
| 19 | /// URLs from this domain that have been tried. | ||
| 20 | tried_urls: HashSet<String>, | ||
| 21 | |||
| 22 | /// Whether a fetch is currently in progress for this identifier on this domain. | ||
| 23 | /// | ||
| 24 | /// Prevents starting multiple concurrent fetches for the same identifier, | ||
| 25 | /// which is important when the queue is small (e.g., 2 identifiers with 5 | ||
| 26 | /// concurrent slots would otherwise try to process the same identifier multiple times). | ||
| 27 | in_progress: bool, | ||
| 28 | } | ||
| 29 | |||
| 30 | impl IdentifierQueueState { | ||
| 31 | fn new(tried_urls: HashSet<String>) -> Self { | ||
| 32 | Self { | ||
| 33 | tried_urls, | ||
| 34 | in_progress: false, | ||
| 35 | } | ||
| 36 | } | ||
| 37 | } | ||
| 38 | |||
| 39 | /// Per-domain rate limiting and identifier queue. | ||
| 40 | /// | ||
| 41 | /// Handles: | ||
| 42 | /// - Rate limiting (concurrent requests, requests per minute) | ||
| 43 | /// - Queue of identifiers waiting for capacity (using IndexMap for round-robin order) | ||
| 44 | /// - Tracking tried URLs per identifier (for this domain only) | ||
| 45 | /// - In-progress flag per identifier (prevents concurrent fetches for same identifier | ||
| 46 | /// on this domain, important when queue is small and we have multiple concurrent slots) | ||
| 47 | #[derive(Debug)] | ||
| 48 | pub struct DomainThrottle { | ||
| 49 | /// Domain this throttle manages (for debugging/logging). | ||
| 50 | #[allow(dead_code)] | ||
| 51 | domain: String, | ||
| 52 | |||
| 53 | /// Current in-flight request count. | ||
| 54 | in_flight: u32, | ||
| 55 | |||
| 56 | /// Request timestamps (sliding window for rate limiting). | ||
| 57 | request_times: VecDeque<Instant>, | ||
| 58 | |||
| 59 | /// Queued identifiers with their state. | ||
| 60 | /// IndexMap preserves insertion order for round-robin processing. | ||
| 61 | queue: IndexMap<String, IdentifierQueueState>, | ||
| 62 | |||
| 63 | /// Round-robin index for fair processing across identifiers. | ||
| 64 | round_robin_index: usize, | ||
| 65 | |||
| 66 | /// Maximum concurrent requests for this domain. | ||
| 67 | max_concurrent: u32, | ||
| 68 | |||
| 69 | /// Maximum requests per minute for this domain. | ||
| 70 | max_per_minute: u32, | ||
| 71 | } | ||
| 72 | |||
| 73 | impl DomainThrottle { | ||
| 74 | /// Create a new domain throttle with the specified limits. | ||
| 75 | /// | ||
| 76 | /// # Arguments | ||
| 77 | /// * `domain` - The domain name (for logging) | ||
| 78 | /// * `max_concurrent` - Maximum concurrent in-flight requests | ||
| 79 | /// * `max_per_minute` - Maximum requests per 60-second window | ||
| 80 | pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self { | ||
| 81 | Self { | ||
| 82 | domain, | ||
| 83 | in_flight: 0, | ||
| 84 | request_times: VecDeque::new(), | ||
| 85 | queue: IndexMap::new(), | ||
| 86 | round_robin_index: 0, | ||
| 87 | max_concurrent, | ||
| 88 | max_per_minute, | ||
| 89 | } | ||
| 90 | } | ||
| 91 | |||
| 92 | /// Check if domain has capacity for another request. | ||
| 93 | /// | ||
| 94 | /// Returns false if: | ||
| 95 | /// - Already at max concurrent requests | ||
| 96 | /// - Already at max requests per minute (sliding window) | ||
| 97 | pub fn has_capacity(&self) -> bool { | ||
| 98 | // Check concurrent limit | ||
| 99 | if self.in_flight >= self.max_concurrent { | ||
| 100 | return false; | ||
| 101 | } | ||
| 102 | |||
| 103 | // Check rate limit (sliding window of 60 seconds) | ||
| 104 | let now = Instant::now(); | ||
| 105 | let window = Duration::from_secs(60); | ||
| 106 | let recent_count = self | ||
| 107 | .request_times | ||
| 108 | .iter() | ||
| 109 | .filter(|t| now.duration_since(**t) < window) | ||
| 110 | .count(); | ||
| 111 | |||
| 112 | recent_count < self.max_per_minute as usize | ||
| 113 | } | ||
| 114 | |||
| 115 | /// Check if there are any identifiers in the queue. | ||
| 116 | pub fn has_queued_work(&self) -> bool { | ||
| 117 | !self.queue.is_empty() | ||
| 118 | } | ||
| 119 | |||
| 120 | /// Record that a request is starting. | ||
| 121 | /// | ||
| 122 | /// Increments in-flight count and records timestamp for rate limiting. | ||
| 123 | pub fn start_request(&mut self) { | ||
| 124 | self.in_flight += 1; | ||
| 125 | self.request_times.push_back(Instant::now()); | ||
| 126 | } | ||
| 127 | |||
| 128 | /// Record that a request completed. | ||
| 129 | /// | ||
| 130 | /// Decrements in-flight count and cleans up old timestamps. | ||
| 131 | pub fn complete_request(&mut self) { | ||
| 132 | self.in_flight = self.in_flight.saturating_sub(1); | ||
| 133 | |||
| 134 | // Clean old timestamps outside the 60-second window | ||
| 135 | let now = Instant::now(); | ||
| 136 | let window = Duration::from_secs(60); | ||
| 137 | while self | ||
| 138 | .request_times | ||
| 139 | .front() | ||
| 140 | .map_or(false, |t| now.duration_since(*t) >= window) | ||
| 141 | { | ||
| 142 | self.request_times.pop_front(); | ||
| 143 | } | ||
| 144 | } | ||
| 145 | |||
| 146 | /// Add an identifier to the queue. | ||
| 147 | /// | ||
| 148 | /// If the identifier is already queued, merges the tried_urls sets. | ||
| 149 | /// | ||
| 150 | /// # Arguments | ||
| 151 | /// * `identifier` - The repository identifier | ||
| 152 | /// * `tried_urls` - URLs from this domain that have already been tried | ||
| 153 | pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) { | ||
| 154 | self.queue | ||
| 155 | .entry(identifier) | ||
| 156 | .and_modify(|state| { | ||
| 157 | // Merge tried_urls if already exists | ||
| 158 | state.tried_urls.extend(tried_urls.iter().cloned()); | ||
| 159 | }) | ||
| 160 | .or_insert(IdentifierQueueState::new(tried_urls)); | ||
| 161 | } | ||
| 162 | |||
| 163 | /// Get next identifier ready for processing (round-robin, not in_progress). | ||
| 164 | /// | ||
| 165 | /// Iterates through the queue starting from round_robin_index, skipping | ||
| 166 | /// any identifiers that are already in_progress. This ensures fair | ||
| 167 | /// distribution even when some identifiers have active fetches. | ||
| 168 | /// | ||
| 169 | /// Returns the identifier and marks it as in_progress. | ||
| 170 | pub fn next_ready_identifier(&mut self) -> Option<String> { | ||
| 171 | let len = self.queue.len(); | ||
| 172 | if len == 0 { | ||
| 173 | return None; | ||
| 174 | } | ||
| 175 | |||
| 176 | // Try each identifier starting from round_robin_index | ||
| 177 | for i in 0..len { | ||
| 178 | let index = (self.round_robin_index + i) % len; | ||
| 179 | if let Some((identifier, state)) = self.queue.get_index_mut(index) { | ||
| 180 | if !state.in_progress { | ||
| 181 | state.in_progress = true; | ||
| 182 | self.round_robin_index = (index + 1) % len; | ||
| 183 | return Some(identifier.clone()); | ||
| 184 | } | ||
| 185 | } | ||
| 186 | } | ||
| 187 | |||
| 188 | None // All identifiers are in_progress | ||
| 189 | } | ||
| 190 | |||
| 191 | /// Get tried URLs for an identifier. | ||
| 192 | pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> { | ||
| 193 | self.queue | ||
| 194 | .get(identifier) | ||
| 195 | .map(|s| s.tried_urls.clone()) | ||
| 196 | .unwrap_or_default() | ||
| 197 | } | ||
| 198 | |||
| 199 | /// Mark a URL as tried for an identifier. | ||
| 200 | pub fn mark_url_tried(&mut self, identifier: &str, url: String) { | ||
| 201 | if let Some(state) = self.queue.get_mut(identifier) { | ||
| 202 | state.tried_urls.insert(url); | ||
| 203 | } | ||
| 204 | } | ||
| 205 | |||
| 206 | /// Mark identifier as not in progress (fetch completed). | ||
| 207 | pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) { | ||
| 208 | if let Some(state) = self.queue.get_mut(identifier) { | ||
| 209 | state.in_progress = false; | ||
| 210 | } | ||
| 211 | } | ||
| 212 | |||
| 213 | /// Remove an identifier from the queue entirely. | ||
| 214 | /// | ||
| 215 | /// Adjusts round_robin_index if needed to maintain fair processing. | ||
| 216 | pub fn remove_identifier(&mut self, identifier: &str) { | ||
| 217 | if let Some((index, _, _)) = self.queue.shift_remove_full(identifier) { | ||
| 218 | // Adjust round_robin_index if we removed an entry before it | ||
| 219 | if index < self.round_robin_index && self.round_robin_index > 0 { | ||
| 220 | self.round_robin_index -= 1; | ||
| 221 | } | ||
| 222 | // Clamp to valid range | ||
| 223 | if !self.queue.is_empty() { | ||
| 224 | self.round_robin_index %= self.queue.len(); | ||
| 225 | } else { | ||
| 226 | self.round_robin_index = 0; | ||
| 227 | } | ||
| 228 | } | ||
| 229 | } | ||
| 230 | } | ||
| 231 | |||
| 232 | #[cfg(test)] | ||
| 233 | mod tests { | ||
| 234 | use super::*; | ||
| 235 | |||
| 236 | #[test] | ||
| 237 | fn concurrent_limit_blocks_when_saturated() { | ||
| 238 | let mut throttle = DomainThrottle::new("example.com".to_string(), 3, 100); | ||
| 239 | |||
| 240 | // Initially has capacity | ||
| 241 | assert!(throttle.has_capacity()); | ||
| 242 | |||
| 243 | // Start 3 requests (at limit) | ||
| 244 | throttle.start_request(); | ||
| 245 | throttle.start_request(); | ||
| 246 | throttle.start_request(); | ||
| 247 | |||
| 248 | // Should be at capacity now | ||
| 249 | assert!(!throttle.has_capacity()); | ||
| 250 | |||
| 251 | // Complete one request | ||
| 252 | throttle.complete_request(); | ||
| 253 | |||
| 254 | // Should have capacity again | ||
| 255 | assert!(throttle.has_capacity()); | ||
| 256 | } | ||
| 257 | |||
| 258 | #[test] | ||
| 259 | fn rate_limit_blocks_when_window_full() { | ||
| 260 | // Use a very small rate limit for testing | ||
| 261 | let mut throttle = DomainThrottle::new("example.com".to_string(), 100, 2); | ||
| 262 | |||
| 263 | // Initially has capacity | ||
| 264 | assert!(throttle.has_capacity()); | ||
| 265 | |||
| 266 | // Make 2 requests (at rate limit) | ||
| 267 | throttle.start_request(); | ||
| 268 | throttle.complete_request(); | ||
| 269 | throttle.start_request(); | ||
| 270 | throttle.complete_request(); | ||
| 271 | |||
| 272 | // Should be at rate limit now (2 requests in last 60s) | ||
| 273 | assert!(!throttle.has_capacity()); | ||
| 274 | |||
| 275 | // Note: In a real test we'd need to wait 60 seconds or mock time | ||
| 276 | // For this test, we just verify the blocking behavior | ||
| 277 | } | ||
| 278 | |||
| 279 | #[test] | ||
| 280 | fn round_robin_processes_identifiers_fairly() { | ||
| 281 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100); | ||
| 282 | |||
| 283 | // Enqueue A, B, C | ||
| 284 | throttle.enqueue_identifier("A".to_string(), HashSet::new()); | ||
| 285 | throttle.enqueue_identifier("B".to_string(), HashSet::new()); | ||
| 286 | throttle.enqueue_identifier("C".to_string(), HashSet::new()); | ||
| 287 | |||
| 288 | // First round: should get A, B, C in order | ||
| 289 | let first = throttle.next_ready_identifier(); | ||
| 290 | assert_eq!(first, Some("A".to_string())); | ||
| 291 | throttle.mark_identifier_not_in_progress("A"); | ||
| 292 | |||
| 293 | let second = throttle.next_ready_identifier(); | ||
| 294 | assert_eq!(second, Some("B".to_string())); | ||
| 295 | throttle.mark_identifier_not_in_progress("B"); | ||
| 296 | |||
| 297 | let third = throttle.next_ready_identifier(); | ||
| 298 | assert_eq!(third, Some("C".to_string())); | ||
| 299 | throttle.mark_identifier_not_in_progress("C"); | ||
| 300 | |||
| 301 | // Second round: should cycle back to A, B, C | ||
| 302 | let fourth = throttle.next_ready_identifier(); | ||
| 303 | assert_eq!(fourth, Some("A".to_string())); | ||
| 304 | throttle.mark_identifier_not_in_progress("A"); | ||
| 305 | |||
| 306 | let fifth = throttle.next_ready_identifier(); | ||
| 307 | assert_eq!(fifth, Some("B".to_string())); | ||
| 308 | } | ||
| 309 | |||
| 310 | #[test] | ||
| 311 | fn skips_in_progress_identifiers() { | ||
| 312 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100); | ||
| 313 | |||
| 314 | // Enqueue A, B, C | ||
| 315 | throttle.enqueue_identifier("A".to_string(), HashSet::new()); | ||
| 316 | throttle.enqueue_identifier("B".to_string(), HashSet::new()); | ||
| 317 | throttle.enqueue_identifier("C".to_string(), HashSet::new()); | ||
| 318 | |||
| 319 | // Get A (marks it in_progress) | ||
| 320 | let first = throttle.next_ready_identifier(); | ||
| 321 | assert_eq!(first, Some("A".to_string())); | ||
| 322 | |||
| 323 | // Get B (A is still in_progress) | ||
| 324 | let second = throttle.next_ready_identifier(); | ||
| 325 | assert_eq!(second, Some("B".to_string())); | ||
| 326 | |||
| 327 | // Get C (A and B are in_progress) | ||
| 328 | let third = throttle.next_ready_identifier(); | ||
| 329 | assert_eq!(third, Some("C".to_string())); | ||
| 330 | |||
| 331 | // All are in_progress now, should return None | ||
| 332 | let fourth = throttle.next_ready_identifier(); | ||
| 333 | assert_eq!(fourth, None); | ||
| 334 | |||
| 335 | // Mark A as not in_progress | ||
| 336 | throttle.mark_identifier_not_in_progress("A"); | ||
| 337 | |||
| 338 | // Should get A again (it's the only one not in_progress) | ||
| 339 | let fifth = throttle.next_ready_identifier(); | ||
| 340 | assert_eq!(fifth, Some("A".to_string())); | ||
| 341 | } | ||
| 342 | |||
| 343 | #[test] | ||
| 344 | fn remove_identifier_adjusts_round_robin_index() { | ||
| 345 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100); | ||
| 346 | |||
| 347 | // Enqueue A, B, C, D | ||
| 348 | throttle.enqueue_identifier("A".to_string(), HashSet::new()); | ||
| 349 | throttle.enqueue_identifier("B".to_string(), HashSet::new()); | ||
| 350 | throttle.enqueue_identifier("C".to_string(), HashSet::new()); | ||
| 351 | throttle.enqueue_identifier("D".to_string(), HashSet::new()); | ||
| 352 | |||
| 353 | // Get A (round_robin_index now points to B) | ||
| 354 | let first = throttle.next_ready_identifier(); | ||
| 355 | assert_eq!(first, Some("A".to_string())); | ||
| 356 | throttle.mark_identifier_not_in_progress("A"); | ||
| 357 | |||
| 358 | // Get B (round_robin_index now points to C) | ||
| 359 | let second = throttle.next_ready_identifier(); | ||
| 360 | assert_eq!(second, Some("B".to_string())); | ||
| 361 | throttle.mark_identifier_not_in_progress("B"); | ||
| 362 | |||
| 363 | // Remove A (before current index) | ||
| 364 | throttle.remove_identifier("A"); | ||
| 365 | |||
| 366 | // Next should be C (not B again, index was adjusted) | ||
| 367 | let third = throttle.next_ready_identifier(); | ||
| 368 | assert_eq!(third, Some("C".to_string())); | ||
| 369 | } | ||
| 370 | |||
| 371 | #[test] | ||
| 372 | fn enqueue_merges_tried_urls() { | ||
| 373 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100); | ||
| 374 | |||
| 375 | // First enqueue with some tried URLs | ||
| 376 | let mut tried1 = HashSet::new(); | ||
| 377 | tried1.insert("url1".to_string()); | ||
| 378 | throttle.enqueue_identifier("A".to_string(), tried1); | ||
| 379 | |||
| 380 | // Second enqueue with different tried URLs | ||
| 381 | let mut tried2 = HashSet::new(); | ||
| 382 | tried2.insert("url2".to_string()); | ||
| 383 | throttle.enqueue_identifier("A".to_string(), tried2); | ||
| 384 | |||
| 385 | // Should have both URLs | ||
| 386 | let tried = throttle.get_tried_urls("A"); | ||
| 387 | assert!(tried.contains("url1")); | ||
| 388 | assert!(tried.contains("url2")); | ||
| 389 | assert_eq!(tried.len(), 2); | ||
| 390 | } | ||
| 391 | } | ||