upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml3
-rw-r--r--src/purgatory/sync/mod.rs2
-rw-r--r--src/purgatory/sync/throttle.rs391
4 files changed, 397 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index fbcc015..03afcbb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1435,6 +1435,7 @@ dependencies = [
1435 "http-body-util", 1435 "http-body-util",
1436 "hyper 1.8.1", 1436 "hyper 1.8.1",
1437 "hyper-util", 1437 "hyper-util",
1438 "indexmap",
1438 "lazy_static", 1439 "lazy_static",
1439 "nostr-lmdb", 1440 "nostr-lmdb",
1440 "nostr-relay-builder", 1441 "nostr-relay-builder",
diff --git a/Cargo.toml b/Cargo.toml
index e1e1627..eceebcb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,9 @@ prometheus = "0.13"
33dashmap = "5" 33dashmap = "5"
34lazy_static = "1.4" 34lazy_static = "1.4"
35 35
36# Data structures
37indexmap = "2"
38
36# Random (for startup jitter) 39# Random (for startup jitter)
37rand = "0.8" 40rand = "0.8"
38 41
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs
index 7b6d64a..3c0d875 100644
--- a/src/purgatory/sync/mod.rs
+++ b/src/purgatory/sync/mod.rs
@@ -7,5 +7,7 @@
7//! - Debouncing for burst event arrivals 7//! - Debouncing for burst event arrivals
8 8
9mod queue; 9mod queue;
10mod throttle;
10 11
11pub use queue::SyncQueueEntry; 12pub use queue::SyncQueueEntry;
13pub use throttle::DomainThrottle;
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs
new file mode 100644
index 0000000..6f01c86
--- /dev/null
+++ b/src/purgatory/sync/throttle.rs
@@ -0,0 +1,391 @@
1//! Domain-based rate limiting and identifier queue management.
2//!
3//! This module provides per-domain throttling to prevent overwhelming remote
4//! git servers during purgatory sync operations. Each domain has:
5//! - Concurrent request limit (max in-flight requests)
6//! - Rate limit (max requests per minute)
7//! - Queue of identifiers waiting for capacity (with round-robin processing)
8
9use indexmap::IndexMap;
10use std::collections::{HashSet, VecDeque};
11use std::time::{Duration, Instant};
12
13/// State for an identifier waiting in a domain's queue.
14///
15/// Tracks which URLs from this domain have been tried and whether
16/// a fetch is currently in progress for this identifier.
17#[derive(Debug, Clone)]
18struct IdentifierQueueState {
19 /// URLs from this domain that have been tried.
20 tried_urls: HashSet<String>,
21
22 /// Whether a fetch is currently in progress for this identifier on this domain.
23 ///
24 /// Prevents starting multiple concurrent fetches for the same identifier,
25 /// which is important when the queue is small (e.g., 2 identifiers with 5
26 /// concurrent slots would otherwise try to process the same identifier multiple times).
27 in_progress: bool,
28}
29
30impl IdentifierQueueState {
31 fn new(tried_urls: HashSet<String>) -> Self {
32 Self {
33 tried_urls,
34 in_progress: false,
35 }
36 }
37}
38
39/// Per-domain rate limiting and identifier queue.
40///
41/// Handles:
42/// - Rate limiting (concurrent requests, requests per minute)
43/// - Queue of identifiers waiting for capacity (using IndexMap for round-robin order)
44/// - Tracking tried URLs per identifier (for this domain only)
45/// - In-progress flag per identifier (prevents concurrent fetches for same identifier
46/// on this domain, important when queue is small and we have multiple concurrent slots)
47#[derive(Debug)]
48pub struct DomainThrottle {
49 /// Domain this throttle manages (for debugging/logging).
50 #[allow(dead_code)]
51 domain: String,
52
53 /// Current in-flight request count.
54 in_flight: u32,
55
56 /// Request timestamps (sliding window for rate limiting).
57 request_times: VecDeque<Instant>,
58
59 /// Queued identifiers with their state.
60 /// IndexMap preserves insertion order for round-robin processing.
61 queue: IndexMap<String, IdentifierQueueState>,
62
63 /// Round-robin index for fair processing across identifiers.
64 round_robin_index: usize,
65
66 /// Maximum concurrent requests for this domain.
67 max_concurrent: u32,
68
69 /// Maximum requests per minute for this domain.
70 max_per_minute: u32,
71}
72
73impl DomainThrottle {
74 /// Create a new domain throttle with the specified limits.
75 ///
76 /// # Arguments
77 /// * `domain` - The domain name (for logging)
78 /// * `max_concurrent` - Maximum concurrent in-flight requests
79 /// * `max_per_minute` - Maximum requests per 60-second window
80 pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self {
81 Self {
82 domain,
83 in_flight: 0,
84 request_times: VecDeque::new(),
85 queue: IndexMap::new(),
86 round_robin_index: 0,
87 max_concurrent,
88 max_per_minute,
89 }
90 }
91
92 /// Check if domain has capacity for another request.
93 ///
94 /// Returns false if:
95 /// - Already at max concurrent requests
96 /// - Already at max requests per minute (sliding window)
97 pub fn has_capacity(&self) -> bool {
98 // Check concurrent limit
99 if self.in_flight >= self.max_concurrent {
100 return false;
101 }
102
103 // Check rate limit (sliding window of 60 seconds)
104 let now = Instant::now();
105 let window = Duration::from_secs(60);
106 let recent_count = self
107 .request_times
108 .iter()
109 .filter(|t| now.duration_since(**t) < window)
110 .count();
111
112 recent_count < self.max_per_minute as usize
113 }
114
115 /// Check if there are any identifiers in the queue.
116 pub fn has_queued_work(&self) -> bool {
117 !self.queue.is_empty()
118 }
119
120 /// Record that a request is starting.
121 ///
122 /// Increments in-flight count and records timestamp for rate limiting.
123 pub fn start_request(&mut self) {
124 self.in_flight += 1;
125 self.request_times.push_back(Instant::now());
126 }
127
128 /// Record that a request completed.
129 ///
130 /// Decrements in-flight count and cleans up old timestamps.
131 pub fn complete_request(&mut self) {
132 self.in_flight = self.in_flight.saturating_sub(1);
133
134 // Clean old timestamps outside the 60-second window
135 let now = Instant::now();
136 let window = Duration::from_secs(60);
137 while self
138 .request_times
139 .front()
140 .map_or(false, |t| now.duration_since(*t) >= window)
141 {
142 self.request_times.pop_front();
143 }
144 }
145
146 /// Add an identifier to the queue.
147 ///
148 /// If the identifier is already queued, merges the tried_urls sets.
149 ///
150 /// # Arguments
151 /// * `identifier` - The repository identifier
152 /// * `tried_urls` - URLs from this domain that have already been tried
153 pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) {
154 self.queue
155 .entry(identifier)
156 .and_modify(|state| {
157 // Merge tried_urls if already exists
158 state.tried_urls.extend(tried_urls.iter().cloned());
159 })
160 .or_insert(IdentifierQueueState::new(tried_urls));
161 }
162
163 /// Get next identifier ready for processing (round-robin, not in_progress).
164 ///
165 /// Iterates through the queue starting from round_robin_index, skipping
166 /// any identifiers that are already in_progress. This ensures fair
167 /// distribution even when some identifiers have active fetches.
168 ///
169 /// Returns the identifier and marks it as in_progress.
170 pub fn next_ready_identifier(&mut self) -> Option<String> {
171 let len = self.queue.len();
172 if len == 0 {
173 return None;
174 }
175
176 // Try each identifier starting from round_robin_index
177 for i in 0..len {
178 let index = (self.round_robin_index + i) % len;
179 if let Some((identifier, state)) = self.queue.get_index_mut(index) {
180 if !state.in_progress {
181 state.in_progress = true;
182 self.round_robin_index = (index + 1) % len;
183 return Some(identifier.clone());
184 }
185 }
186 }
187
188 None // All identifiers are in_progress
189 }
190
191 /// Get tried URLs for an identifier.
192 pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> {
193 self.queue
194 .get(identifier)
195 .map(|s| s.tried_urls.clone())
196 .unwrap_or_default()
197 }
198
199 /// Mark a URL as tried for an identifier.
200 pub fn mark_url_tried(&mut self, identifier: &str, url: String) {
201 if let Some(state) = self.queue.get_mut(identifier) {
202 state.tried_urls.insert(url);
203 }
204 }
205
206 /// Mark identifier as not in progress (fetch completed).
207 pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) {
208 if let Some(state) = self.queue.get_mut(identifier) {
209 state.in_progress = false;
210 }
211 }
212
213 /// Remove an identifier from the queue entirely.
214 ///
215 /// Adjusts round_robin_index if needed to maintain fair processing.
216 pub fn remove_identifier(&mut self, identifier: &str) {
217 if let Some((index, _, _)) = self.queue.shift_remove_full(identifier) {
218 // Adjust round_robin_index if we removed an entry before it
219 if index < self.round_robin_index && self.round_robin_index > 0 {
220 self.round_robin_index -= 1;
221 }
222 // Clamp to valid range
223 if !self.queue.is_empty() {
224 self.round_robin_index %= self.queue.len();
225 } else {
226 self.round_robin_index = 0;
227 }
228 }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn concurrent_limit_blocks_when_saturated() {
238 let mut throttle = DomainThrottle::new("example.com".to_string(), 3, 100);
239
240 // Initially has capacity
241 assert!(throttle.has_capacity());
242
243 // Start 3 requests (at limit)
244 throttle.start_request();
245 throttle.start_request();
246 throttle.start_request();
247
248 // Should be at capacity now
249 assert!(!throttle.has_capacity());
250
251 // Complete one request
252 throttle.complete_request();
253
254 // Should have capacity again
255 assert!(throttle.has_capacity());
256 }
257
258 #[test]
259 fn rate_limit_blocks_when_window_full() {
260 // Use a very small rate limit for testing
261 let mut throttle = DomainThrottle::new("example.com".to_string(), 100, 2);
262
263 // Initially has capacity
264 assert!(throttle.has_capacity());
265
266 // Make 2 requests (at rate limit)
267 throttle.start_request();
268 throttle.complete_request();
269 throttle.start_request();
270 throttle.complete_request();
271
272 // Should be at rate limit now (2 requests in last 60s)
273 assert!(!throttle.has_capacity());
274
275 // Note: In a real test we'd need to wait 60 seconds or mock time
276 // For this test, we just verify the blocking behavior
277 }
278
279 #[test]
280 fn round_robin_processes_identifiers_fairly() {
281 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100);
282
283 // Enqueue A, B, C
284 throttle.enqueue_identifier("A".to_string(), HashSet::new());
285 throttle.enqueue_identifier("B".to_string(), HashSet::new());
286 throttle.enqueue_identifier("C".to_string(), HashSet::new());
287
288 // First round: should get A, B, C in order
289 let first = throttle.next_ready_identifier();
290 assert_eq!(first, Some("A".to_string()));
291 throttle.mark_identifier_not_in_progress("A");
292
293 let second = throttle.next_ready_identifier();
294 assert_eq!(second, Some("B".to_string()));
295 throttle.mark_identifier_not_in_progress("B");
296
297 let third = throttle.next_ready_identifier();
298 assert_eq!(third, Some("C".to_string()));
299 throttle.mark_identifier_not_in_progress("C");
300
301 // Second round: should cycle back to A, B, C
302 let fourth = throttle.next_ready_identifier();
303 assert_eq!(fourth, Some("A".to_string()));
304 throttle.mark_identifier_not_in_progress("A");
305
306 let fifth = throttle.next_ready_identifier();
307 assert_eq!(fifth, Some("B".to_string()));
308 }
309
310 #[test]
311 fn skips_in_progress_identifiers() {
312 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100);
313
314 // Enqueue A, B, C
315 throttle.enqueue_identifier("A".to_string(), HashSet::new());
316 throttle.enqueue_identifier("B".to_string(), HashSet::new());
317 throttle.enqueue_identifier("C".to_string(), HashSet::new());
318
319 // Get A (marks it in_progress)
320 let first = throttle.next_ready_identifier();
321 assert_eq!(first, Some("A".to_string()));
322
323 // Get B (A is still in_progress)
324 let second = throttle.next_ready_identifier();
325 assert_eq!(second, Some("B".to_string()));
326
327 // Get C (A and B are in_progress)
328 let third = throttle.next_ready_identifier();
329 assert_eq!(third, Some("C".to_string()));
330
331 // All are in_progress now, should return None
332 let fourth = throttle.next_ready_identifier();
333 assert_eq!(fourth, None);
334
335 // Mark A as not in_progress
336 throttle.mark_identifier_not_in_progress("A");
337
338 // Should get A again (it's the only one not in_progress)
339 let fifth = throttle.next_ready_identifier();
340 assert_eq!(fifth, Some("A".to_string()));
341 }
342
343 #[test]
344 fn remove_identifier_adjusts_round_robin_index() {
345 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100);
346
347 // Enqueue A, B, C, D
348 throttle.enqueue_identifier("A".to_string(), HashSet::new());
349 throttle.enqueue_identifier("B".to_string(), HashSet::new());
350 throttle.enqueue_identifier("C".to_string(), HashSet::new());
351 throttle.enqueue_identifier("D".to_string(), HashSet::new());
352
353 // Get A (round_robin_index now points to B)
354 let first = throttle.next_ready_identifier();
355 assert_eq!(first, Some("A".to_string()));
356 throttle.mark_identifier_not_in_progress("A");
357
358 // Get B (round_robin_index now points to C)
359 let second = throttle.next_ready_identifier();
360 assert_eq!(second, Some("B".to_string()));
361 throttle.mark_identifier_not_in_progress("B");
362
363 // Remove A (before current index)
364 throttle.remove_identifier("A");
365
366 // Next should be C (not B again, index was adjusted)
367 let third = throttle.next_ready_identifier();
368 assert_eq!(third, Some("C".to_string()));
369 }
370
371 #[test]
372 fn enqueue_merges_tried_urls() {
373 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 100);
374
375 // First enqueue with some tried URLs
376 let mut tried1 = HashSet::new();
377 tried1.insert("url1".to_string());
378 throttle.enqueue_identifier("A".to_string(), tried1);
379
380 // Second enqueue with different tried URLs
381 let mut tried2 = HashSet::new();
382 tried2.insert("url2".to_string());
383 throttle.enqueue_identifier("A".to_string(), tried2);
384
385 // Should have both URLs
386 let tried = throttle.get_tried_urls("A");
387 assert!(tried.contains("url1"));
388 assert!(tried.contains("url2"));
389 assert_eq!(tried.len(), 2);
390 }
391}