upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-06 16:41:26 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-06 16:41:26 +0000
commit0da412a255858d35cc2eb85158856abc736a236b (patch)
treedaac8d46f7d26f195b76ed48b773e757a18ddfa3 /docs
parent534b9e23524a1f651590eccbfef3398fa7fbc495 (diff)
docs: purgatory git sync design with throttle queuing
Diffstat (limited to 'docs')
-rw-r--r--docs/explanation/purgatory-sync-redesign.md1172
1 files changed, 806 insertions, 366 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md
index 76d01b8..7501da2 100644
--- a/docs/explanation/purgatory-sync-redesign.md
+++ b/docs/explanation/purgatory-sync-redesign.md
@@ -32,86 +32,114 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**,
32### Overview 32### Overview
33 33
34``` 34```
35┌──────────────────────────────────────────────────────────────────────────────┐ 35┌──────────────────────────────────────────────────────────────────────────────────┐
36│ Purgatory │ 36│ Purgatory │
37│ │ 37│ │
38│ ┌─────────────────┐ ┌─────────────────┐ │ 38│ ┌─────────────────┐ ┌─────────────────┐ │
39│ │ State Events │ │ PR Events │ │ 39│ │ State Events │ │ PR Events │ │
40│ │ (by identifier)│ │ (by event_id) │ │ 40│ │ (by identifier)│ │ (by event_id) │ │
41│ └────────┬────────┘ └────────┬────────┘ │ 41│ └────────┬────────┘ └────────┬────────┘ │
42│ │ │ │ 42│ │ │ │
43│ └──────────┬─────────┘ │ 43│ └──────────┬─────────┘ │
44│ │ add_state() / add_pr() / trigger_immediate_sync() │ 44│ │ add_state() / add_pr() / trigger_immediate_sync() │
45│ ▼ │ 45│ ▼ │
46│ ┌──────────────────────────┐ │ 46│ ┌──────────────────────────┐ │
47│ │ Sync Queue │ │ 47│ │ Sync Queue │ │
48│ │ DashMap<id, Entry> │ │ 48│ │ DashMap<id, Entry> │ │
49│ │ │ │ 49│ │ │ │
50│ │ Entry { │ │ 50│ │ Entry { │ │
51│ │ next_attempt, │ ← delay/backoff timer │ 51│ │ next_attempt, │ ← delay/backoff timer │
52│ │ attempt_count, │ ← for backoff calculation │ 52│ │ attempt_count, │ ← for backoff calculation │
53│ │ in_progress, │ ← prevents concurrent runs │ 53│ │ in_progress, │ ← prevents concurrent runs │
54│ │ } │ │ 54│ │ } │ │
55│ └────────────┬─────────────┘ │ 55│ └────────────┬─────────────┘ │
56│ │ │ 56│ │ │
57│ ┌─────────────────────┼─────────────────────────────────────────────────┐ │ 57│ ┌─────────────────────┼──────────────────────────────────────────────────────┐ │
58│ │ ▼ │ │ 58│ │ ▼ │ │
59│ │ ┌─────────────────────┐ │ │ 59│ │ ┌─────────────────────┐ │ │
60│ │ │ Main Sync Loop │ (every 1s) │ │ 60│ │ │ Main Sync Loop │ (every 1s) │ │
61│ │ │ │ │ │ 61│ │ │ │ │ │
62│ │ │ 1. Find ALL ready │ │ │ 62│ │ │ 1. Find ALL ready │ │ │
63│ │ │ identifiers │ │ │ 63│ │ │ identifiers │ │ │
64│ │ │ 2. Spawn parallel │ │ │ 64│ │ │ 2. Spawn parallel │───────┐ │ │
65│ │ │ tasks for each │───────┐ │ │ 65│ │ │ tasks for each │ │ (parallel tasks) │ │
66│ │ │ 3. Apply backoff │ │ (parallel tasks) │ │ 66│ │ │ 3. Apply backoff │ │ │ │
67│ │ │ when done │ │ │ │ 67│ │ │ when done │ │ │ │
68│ │ └─────────────────────┘ │ │ │ 68│ │ └─────────────────────┘ │ │ │
69│ │ ▼ │ │ 69│ │ ▼ │ │
70│ │ ┌─────────────────────────────────────┐ │ │ 70│ │ ┌──────────────────────────────────────────┐ │ │
71│ │ │ sync_identifier() │ │ │ 71│ │ │ sync_identifier() │ │ │
72│ │ │ │ │ │ 72│ │ │ │ │ │
73│ │ │ Owns its own tried_urls: HashSet │ │ │ 73│ │ │ Owns its own tried_urls: HashSet │ │ │
74│ │ │ │ │ │ 74│ │ │ │ │ │
75│ │ │ loop: │ │ │ 75│ │ │ loop: │ │ │
76│ │ │ sync_identifier_step() │ │ │ 76│ │ │ url = sync_identifier_next_url( │ │ │
77│ │ │ → (url_tried, complete) │ │ │ 77│ │ │ domain=None) │ │ │
78│ │ │ if complete: break │ │ │ 78│ │ │ if url is Some: │ │ │
79│ │ │ tried_urls.insert(url_tried) │ │ │ 79│ │ │ sync_identifier_from_url(url) │ │ │
80│ │ └─────────────────────────────────────┘ │ │ 80│ │ │ tried_urls.insert(url) │ │ │
81│ │ │ │ │ 81│ │ │ else: │ │ │
82│ │ ▼ │ │ 82│ │ │ break (no non-throttled URLs left) │ │ │
83│ │ ┌─────────────────────────────────────┐ │ │ 83│ │ │ │ │ │
84│ │ │ Domain Throttle │ │ │ 84│ │ │ Enqueue throttled domains then return │ │ │
85│ │ │ (rate limiting only) │ │ │ 85│ │ └──────────────────────────────────────────┘ │ │
86│ │ │ │ │ │ 86│ │ │ │ │
87│ │ │ Per-domain state: │ │ │ 87│ │ │ enqueue_identifier() │ │
88│ │ │ - in_flight: u32 │ │ │ 88│ │ ▼ │ │
89│ │ │ - request_times: VecDeque │ │ │ 89│ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │
90│ │ │ │ │ │ 90│ │ │ ThrottleManager │ │ │
91│ │ │ Round-robin via: │ │ │ 91│ │ │ │ │ │
92│ │ │ - last_used_index per domain │ │ │ 92│ │ │ DashMap<domain, DomainThrottle> │ │ │
93│ │ │ - Caller passes tried_urls │ │ │ 93│ │ │ │ │ │
94│ │ └─────────────────────────────────────┘ │ │ 94│ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │
95│ │ │ │ 95│ │ │ │ DomainThrottle (per domain) │ │ │ │
96│ └───────────────────────────────────────────────────────────────────────┘ │ 96│ │ │ │ │ │ │ │
97└──────────────────────────────────────────────────────────────────────────────┘ 97│ │ │ │ Rate limiting: │ Waiting queue: │ │ │ │
98│ │ │ │ - in_flight: u32 │ - waiting_queue: VecDeque │ │ │ │
99│ │ │ │ - request_times │ - identifier_state: HashMap │ │ │ │
100│ │ │ │ │ - tried_urls (this domain)│ │ │ │
101│ │ │ │ │ - in_progress │ │ │ │
102│ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │
103│ │ │ │ │ │
104│ │ │ Domain Loop (per domain, runs independently): │ │ │
105│ │ │ 1. Wait for capacity │ │ │
106│ │ │ 2. Pick next identifier (round-robin, not in_progress) │ │ │
107│ │ │ 3. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │
108│ │ │ 4. If url: sync_identifier_from_url(url), mark tried │ │ │
109│ │ │ Else: remove identifier from queue │ │ │
110│ │ └─────────────────────────────────────────────────────────────────────┘ │ │
111│ │ │ │
112│ └────────────────────────────────────────────────────────────────────────────┘ │
113└───────────────────────────────────────────────────────────────────────────────────┘
98``` 114```
99 115
100### Key Design Principle: Separation of Concerns 116### Key Design Principles
101 117
102The previous design conflated two concerns in `DomainThrottle`: 118**1. Two Independent Execution Paths**
1031. **Rate limiting** (per-domain): How many requests can we make to a domain?
1042. **URL tracking** (per-identifier): Which URLs have we tried for this sync?
105 119
106The new design cleanly separates these: 120The main sync loop and DomainThrottle loops run independently:
121- **Main sync**: Tries non-throttled URLs, completes quickly, applies backoff, retries later
122- **DomainThrottle**: Processes queued identifiers when capacity frees, doesn't block main sync
107 123
108- **`DomainThrottle`**: Only handles rate limiting. Tracks in-flight requests and request timestamps per domain. Uses round-robin internally to distribute load across URLs. 124**2. Two Separate tried_urls Tracking**
109- **`sync_identifier`**: Owns its `tried_urls: HashSet<String>`. Passes this to the throttle when requesting a URL to try.
110 125
111This separation enables: 126Each path tracks its own tried URLs:
112- **Unit testing** of sync logic with a mock throttle 127- **sync_identifier**: Local `HashSet<String>` for current attempt (all domains)
113- **Simpler state management** - throttle doesn't need cleanup when identifiers complete 128- **DomainThrottle**: Per-identifier `HashSet<String>` for URLs tried via throttle (this domain only)
114- **Clearer reasoning** about each component's responsibility 129
130These don't need to merge because:
131- Main sync skips throttled domains anyway
132- DomainThrottle only processes its own domain's URLs
133
134**3. Shared Functions**
135
136Both paths use the same core functions:
137- **`sync_identifier_next_url`**: Pure URL selection logic
138- **`sync_identifier_from_url`**: Pure fetch logic
139
140The `domain` parameter determines behavior:
141- `None`: Return any non-throttled URL
142- `Some(domain)`: Return URL from that specific domain only
115 143
116### Flow Summary 144### Flow Summary
117 145
@@ -124,16 +152,17 @@ This separation enables:
124 - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`) 152 - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`)
125 - Spawns parallel tasks for each (marks `in_progress = true`) 153 - Spawns parallel tasks for each (marks `in_progress = true`)
126 - Each `sync_identifier()` task: 154 - Each `sync_identifier()` task:
127 - Creates fresh `tried_urls: HashSet<String>` 155 - Creates fresh `tried_urls: HashSet<String>`
128 - Loops calling `sync_identifier_step()` until complete 156 - Loops calling `sync_identifier_next_url(domain=None)` + `sync_identifier_from_url`
129 - Step returns `(Option<url_tried>, complete)` - clean testable interface 157 - When no non-throttled URLs remain: enqueue with throttled domains, return
130 - When task completes: apply backoff or remove from queue 158 - When task completes: apply backoff or remove from queue
131 159
1323. **Domain throttle**: 1603. **ThrottleManager / DomainThrottle loops** (independent):
133 - Pure rate limiting: tracks in_flight count and request timestamps per domain 161 - Each domain has its own loop checking for capacity
134 - `get_next_url()` takes `available_urls` and `tried_urls`, returns next URL to try 162 - When capacity available: pick next queued identifier (round-robin, not in_progress)
135 - Uses round-robin internally to distribute load 163 - Call `sync_identifier_next_url(domain=Some(this_domain))`
136 - No per-identifier state needed 164 - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress
165 - If no URL: remove identifier from this domain's queue
137 166
138## Data Structures 167## Data Structures
139 168
@@ -196,142 +225,312 @@ impl SyncQueueEntry {
196} 225}
197``` 226```
198 227
199### DomainThrottle (Rate Limiting Only) 228### ThrottleManager
229
230Manages all per-domain throttles and provides the interface for checking throttle status:
200 231
201```rust 232```rust
202/// Domain-level rate limiting with round-robin URL selection. 233/// Manages rate limiting across all domains.
203/// 234///
204/// This struct ONLY handles rate limiting. It does not track which URLs 235/// Owns a collection of DomainThrottle instances and provides:
205/// have been tried - that's the caller's responsibility. 236/// - Throttle status checking for sync_identifier_next_url
237/// - Domain throttle loop spawning
238/// - Identifier queue management
239pub struct ThrottleManager {
240 /// Per-domain throttle state
241 throttles: DashMap<String, DomainThrottle>,
242
243 /// Configuration
244 max_concurrent_per_domain: u32,
245 max_per_minute_per_domain: u32,
246}
247
248impl ThrottleManager {
249 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self {
250 Self {
251 throttles: DashMap::new(),
252 max_concurrent_per_domain: max_concurrent,
253 max_per_minute_per_domain: max_per_minute,
254 }
255 }
256
257 /// Check if a domain is currently throttled (at capacity)
258 pub fn is_throttled(&self, domain: &str) -> bool {
259 self.throttles
260 .get(domain)
261 .map_or(false, |t| !t.has_capacity())
262 }
263
264 /// Get or create throttle for a domain
265 fn get_or_create(&self, domain: &str) -> dashmap::mapref::one::RefMut<String, DomainThrottle> {
266 self.throttles
267 .entry(domain.to_string())
268 .or_insert_with(|| DomainThrottle::new(
269 domain.to_string(),
270 self.max_concurrent_per_domain,
271 self.max_per_minute_per_domain,
272 ))
273 }
274
275 /// Record that a request is starting for a domain
276 pub fn start_request(&self, domain: &str) {
277 self.get_or_create(domain).start_request();
278 }
279
280 /// Record that a request completed for a domain
281 pub fn complete_request(&self, domain: &str) {
282 if let Some(mut throttle) = self.throttles.get_mut(domain) {
283 throttle.complete_request();
284 }
285 }
286
287 /// Add an identifier to a domain's waiting queue
288 pub fn enqueue_identifier(
289 &self,
290 domain: &str,
291 identifier: String,
292 tried_urls_for_domain: HashSet<String>,
293 ) {
294 self.get_or_create(domain)
295 .enqueue_identifier(identifier, tried_urls_for_domain);
296 }
297
298 /// Start the throttle loop for all domains (called once at startup)
299 pub fn start_domain_loops<C: SyncContext + 'static>(
300 self: Arc<Self>,
301 ctx: Arc<C>,
302 ) -> Vec<tokio::task::JoinHandle<()>> {
303 // Spawn a single coordinator loop that manages all domains
304 vec![tokio::spawn(async move {
305 let mut interval = tokio::time::interval(Duration::from_millis(100));
306
307 loop {
308 interval.tick().await;
309
310 // Check each domain for capacity and queued work
311 for mut entry in self.throttles.iter_mut() {
312 let domain = entry.key().clone();
313 let throttle = entry.value_mut();
314
315 if throttle.has_capacity() {
316 if let Some(identifier) = throttle.next_ready_identifier() {
317 let ctx = ctx.clone();
318 let manager = self.clone();
319 let domain_clone = domain.clone();
320
321 tokio::spawn(async move {
322 manager.process_queued_identifier(
323 &ctx,
324 &domain_clone,
325 &identifier,
326 ).await;
327 });
328 }
329 }
330 }
331 }
332 })]
333 }
334
335 /// Process a single identifier from a domain's queue
336 async fn process_queued_identifier<C: SyncContext>(
337 &self,
338 ctx: &C,
339 domain: &str,
340 identifier: &str,
341 ) {
342 // Get next URL for this identifier on this domain
343 let url = {
344 let throttle = match self.throttles.get(domain) {
345 Some(t) => t,
346 None => return,
347 };
348 let tried_urls = throttle.get_tried_urls(identifier);
349
350 sync_identifier_next_url(
351 ctx,
352 identifier,
353 Some(domain),
354 &tried_urls,
355 self,
356 ).await
357 };
358
359 match url {
360 Some(url) => {
361 // Fetch from this URL
362 sync_identifier_from_url(ctx, identifier, &url, self).await;
363
364 // Record URL as tried
365 if let Some(mut throttle) = self.throttles.get_mut(domain) {
366 throttle.mark_url_tried(identifier, url);
367 throttle.mark_identifier_not_in_progress(identifier);
368 }
369 }
370 None => {
371 // No more URLs for this identifier on this domain - remove from queue
372 if let Some(mut throttle) = self.throttles.get_mut(domain) {
373 throttle.remove_identifier(identifier);
374 }
375 }
376 }
377 }
378}
379```
380
381### DomainThrottle
382
383Per-domain rate limiting and waiting queue:
384
385```rust
386/// Per-domain rate limiting and identifier queue.
206/// 387///
207/// Rate limits: 5 concurrent requests, 30 requests/minute per domain 388/// Handles:
389/// - Rate limiting (concurrent requests, requests per minute)
390/// - Queue of identifiers waiting for capacity
391/// - Tracking tried URLs per identifier (for this domain only)
392/// - In-progress flag per identifier (prevent concurrent fetches for same identifier)
208pub struct DomainThrottle { 393pub struct DomainThrottle {
209 /// In-flight request count per domain 394 /// Domain this throttle manages
210 in_flight: DashMap<String, u32>, 395 domain: String,
211 396
212 /// Request timestamps per domain (sliding window) 397 /// Current in-flight request count
213 request_times: DashMap<String, VecDeque<Instant>>, 398 in_flight: u32,
214 399
215 /// Round-robin index per domain (for fair URL distribution) 400 /// Request timestamps (sliding window for rate limiting)
216 round_robin_index: DashMap<String, usize>, 401 request_times: VecDeque<Instant>,
217 402
403 /// Identifiers waiting for capacity on this domain
404 /// Stored in order for round-robin processing
405 waiting_queue: VecDeque<String>,
406
407 /// Per-identifier state for queued identifiers
408 identifier_state: HashMap<String, IdentifierQueueState>,
409
410 /// Configuration
218 max_concurrent: u32, 411 max_concurrent: u32,
219 max_per_minute: u32, 412 max_per_minute: u32,
220} 413}
221 414
415/// State for an identifier waiting in a domain's queue
416#[derive(Debug, Clone)]
417struct IdentifierQueueState {
418 /// URLs from this domain that have been tried
419 tried_urls: HashSet<String>,
420
421 /// Whether a fetch is currently in progress for this identifier on this domain
422 in_progress: bool,
423}
424
222impl DomainThrottle { 425impl DomainThrottle {
223 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { 426 pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self {
224 Self { 427 Self {
225 in_flight: DashMap::new(), 428 domain,
226 request_times: DashMap::new(), 429 in_flight: 0,
227 round_robin_index: DashMap::new(), 430 request_times: VecDeque::new(),
431 waiting_queue: VecDeque::new(),
432 identifier_state: HashMap::new(),
228 max_concurrent, 433 max_concurrent,
229 max_per_minute, 434 max_per_minute,
230 } 435 }
231 } 436 }
232 437
233 /// Check if domain has capacity for another request 438 /// Check if domain has capacity for another request
234 pub fn has_capacity(&self, domain: &str) -> bool { 439 pub fn has_capacity(&self) -> bool {
235 let in_flight = self.in_flight.get(domain).map_or(0, |v| *v); 440 if self.in_flight >= self.max_concurrent {
236 if in_flight >= self.max_concurrent {
237 return false; 441 return false;
238 } 442 }
239 443
240 let now = Instant::now(); 444 let now = Instant::now();
241 let window = Duration::from_secs(60); 445 let window = Duration::from_secs(60);
242 self.request_times 446 let recent_count = self.request_times
243 .get(domain)
244 .map_or(true, |times| {
245 times.iter().filter(|t| now.duration_since(**t) < window).count()
246 < self.max_per_minute as usize
247 })
248 }
249
250 /// Get next URL to try from available URLs, excluding already-tried URLs.
251 /// Uses round-robin to distribute load across URLs for a domain.
252 ///
253 /// Returns None if:
254 /// - Domain is at capacity (rate limited)
255 /// - All available URLs have been tried
256 pub fn get_next_url(
257 &self,
258 domain: &str,
259 available_urls: &[String],
260 tried_urls: &HashSet<String>,
261 ) -> Option<String> {
262 if !self.has_capacity(domain) {
263 return None;
264 }
265
266 // Filter to untried URLs
267 let untried: Vec<_> = available_urls
268 .iter() 447 .iter()
269 .filter(|url| !tried_urls.contains(*url)) 448 .filter(|t| now.duration_since(**t) < window)
270 .collect(); 449 .count();
271
272 if untried.is_empty() {
273 return None;
274 }
275 450
276 // Round-robin selection 451 recent_count < self.max_per_minute as usize
277 let mut index = self.round_robin_index.entry(domain.to_string()).or_insert(0);
278 let selected_index = *index % untried.len();
279 *index = (*index + 1) % untried.len();
280
281 Some(untried[selected_index].clone())
282 } 452 }
283 453
284 /// Record that a request is starting 454 /// Record that a request is starting
285 pub fn start_request(&self, domain: &str) { 455 pub fn start_request(&mut self) {
286 *self.in_flight.entry(domain.to_string()).or_insert(0) += 1; 456 self.in_flight += 1;
287 self.request_times 457 self.request_times.push_back(Instant::now());
288 .entry(domain.to_string())
289 .or_default()
290 .push_back(Instant::now());
291 } 458 }
292 459
293 /// Record that a request completed 460 /// Record that a request completed
294 pub fn complete_request(&self, domain: &str) { 461 pub fn complete_request(&mut self) {
295 if let Some(mut count) = self.in_flight.get_mut(domain) { 462 self.in_flight = self.in_flight.saturating_sub(1);
296 *count = count.saturating_sub(1);
297 }
298 463
299 // Clean old timestamps 464 // Clean old timestamps
300 let now = Instant::now(); 465 let now = Instant::now();
301 let window = Duration::from_secs(60); 466 let window = Duration::from_secs(60);
302 if let Some(mut times) = self.request_times.get_mut(domain) { 467 while self.request_times.front().map_or(false, |t| now.duration_since(*t) >= window) {
303 while times.front().map_or(false, |t| now.duration_since(*t) >= window) { 468 self.request_times.pop_front();
304 times.pop_front();
305 }
306 } 469 }
307 } 470 }
308 471
309 /// Get time until domain has capacity (for scheduling retries) 472 /// Add an identifier to the waiting queue
310 pub fn time_until_capacity(&self, domain: &str) -> Option<Duration> { 473 pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) {
311 // Check concurrent limit first 474 if !self.identifier_state.contains_key(&identifier) {
312 let in_flight = self.in_flight.get(domain).map_or(0, |v| *v); 475 self.waiting_queue.push_back(identifier.clone());
313 if in_flight >= self.max_concurrent {
314 // Can't predict when a request will complete
315 return Some(Duration::from_millis(100));
316 } 476 }
317 477 // Update or insert state (merge tried_urls if already exists)
318 // Check rate limit 478 self.identifier_state
319 let now = Instant::now(); 479 .entry(identifier)
320 let window = Duration::from_secs(60); 480 .and_modify(|state| {
321 if let Some(times) = self.request_times.get(domain) { 481 state.tried_urls.extend(tried_urls.iter().cloned());
322 let recent_count = times.iter().filter(|t| now.duration_since(**t) < window).count(); 482 })
323 if recent_count >= self.max_per_minute as usize { 483 .or_insert(IdentifierQueueState {
324 // Find oldest request in window, wait until it expires 484 tried_urls,
325 if let Some(oldest) = times.front() { 485 in_progress: false,
326 let age = now.duration_since(*oldest); 486 });
327 if age < window { 487 }
328 return Some(window - age); 488
489 /// Get next identifier ready for processing (round-robin, not in_progress)
490 pub fn next_ready_identifier(&mut self) -> Option<String> {
491 // Find first identifier that's not in_progress
492 for _ in 0..self.waiting_queue.len() {
493 if let Some(identifier) = self.waiting_queue.pop_front() {
494 if let Some(state) = self.identifier_state.get_mut(&identifier) {
495 if !state.in_progress {
496 state.in_progress = true;
497 self.waiting_queue.push_back(identifier.clone()); // Re-add for round-robin
498 return Some(identifier);
329 } 499 }
330 } 500 }
501 // Put it back if in_progress
502 self.waiting_queue.push_back(identifier);
331 } 503 }
332 } 504 }
333 505 None
334 None // Has capacity now 506 }
507
508 /// Get tried URLs for an identifier
509 pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> {
510 self.identifier_state
511 .get(identifier)
512 .map(|s| s.tried_urls.clone())
513 .unwrap_or_default()
514 }
515
516 /// Mark a URL as tried for an identifier
517 pub fn mark_url_tried(&mut self, identifier: &str, url: String) {
518 if let Some(state) = self.identifier_state.get_mut(identifier) {
519 state.tried_urls.insert(url);
520 }
521 }
522
523 /// Mark identifier as not in progress (fetch completed)
524 pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) {
525 if let Some(state) = self.identifier_state.get_mut(identifier) {
526 state.in_progress = false;
527 }
528 }
529
530 /// Remove an identifier from the queue entirely
531 pub fn remove_identifier(&mut self, identifier: &str) {
532 self.identifier_state.remove(identifier);
533 self.waiting_queue.retain(|id| id != identifier);
335 } 534 }
336} 535}
337``` 536```
@@ -378,58 +577,58 @@ pub trait SyncContext: Send + Sync {
378 577
379## Core Sync Logic 578## Core Sync Logic
380 579
381### The Sync Step Function 580### Two-Function Design
382 581
383This is the key abstraction that enables clean testing: 582The sync logic is split into two functions that can be called by either the main sync loop or by DomainThrottle:
384 583
385```rust 5841. **`sync_identifier_next_url`**: Pure selection logic - finds next URL to try
386/// Result of a single sync step 5852. **`sync_identifier_from_url`**: Pure fetch logic - fetches from a specific URL
387#[derive(Debug, Clone, PartialEq)] 586
388pub enum SyncStepResult { 587This separation enables:
389 /// Successfully tried a URL, may or may not have fetched OIDs 588- Main sync loop to try non-throttled URLs immediately
390 TriedUrl { url: String, oids_fetched: usize }, 589- DomainThrottle to process queued identifiers when capacity frees
391 590- Clean testability with mocked SyncContext
392 /// All available URLs have been tried, need to wait for throttle
393 AllUrlsThrottled { wait_duration: Duration },
394
395 /// No more URLs to try (all exhausted)
396 NoMoreUrls,
397
398 /// All OIDs are now available, sync complete
399 Complete,
400
401 /// No pending events remain
402 NoPendingEvents,
403}
404 591
405/// Execute one step of the sync process. 592### sync_identifier_next_url
593
594```rust
595/// Find the next URL to try for an identifier.
406/// 596///
407/// This function is pure logic - all I/O goes through the SyncContext trait. 597/// When `domain` is None: returns any non-throttled URL not in tried_urls
408/// This makes it trivially unit testable. 598/// When `domain` is Some: returns a URL from that specific domain not in tried_urls
409pub async fn sync_identifier_step<C: SyncContext>( 599///
600/// Returns None if:
601/// - No pending events for this identifier
602/// - No OIDs needed (sync complete)
603/// - No untried URLs available (for the specified domain or all domains)
604/// - All available domains are throttled (when domain is None)
605pub async fn sync_identifier_next_url<C: SyncContext>(
410 ctx: &C, 606 ctx: &C,
411 identifier: &str, 607 identifier: &str,
608 domain: Option<&str>,
412 tried_urls: &HashSet<String>, 609 tried_urls: &HashSet<String>,
413 throttle: &DomainThrottle, 610 throttle_manager: &ThrottleManager,
414) -> Result<SyncStepResult> { 611) -> Option<String> {
415 // 1. Check if we still have pending events 612 // 1. Check if we still have pending events
416 if !ctx.has_pending_events(identifier) { 613 if !ctx.has_pending_events(identifier) {
417 return Ok(SyncStepResult::NoPendingEvents); 614 return None;
418 } 615 }
419 616
420 // 2. Collect needed OIDs (fresh each step - may have changed) 617 // 2. Collect needed OIDs
421 let needed_oids = ctx.collect_needed_oids(identifier); 618 let needed_oids = ctx.collect_needed_oids(identifier);
422 if needed_oids.is_empty() { 619 if needed_oids.is_empty() {
423 // No OIDs needed - try to process events 620 // No OIDs needed - sync is complete
424 ctx.process_satisfiable_events(identifier).await?; 621 return None;
425 return Ok(SyncStepResult::Complete);
426 } 622 }
427 623
428 // 3. Get repository data (fresh each step - announcements may have arrived) 624 // 3. Get repository data
429 let db_repo_data = ctx.fetch_repository_data(identifier).await?; 625 let repo_data = match ctx.fetch_repository_data(identifier).await {
626 Ok(data) => data,
627 Err(_) => return None,
628 };
430 629
431 // 4. Collect clone URLs, excluding our domain 630 // 4. Collect clone URLs, excluding our domain
432 let all_urls: Vec<String> = db_repo_data 631 let all_urls: Vec<String> = repo_data
433 .announcements 632 .announcements
434 .iter() 633 .iter()
435 .flat_map(|a| a.clone_urls.iter().cloned()) 634 .flat_map(|a| a.clone_urls.iter().cloned())
@@ -438,132 +637,292 @@ pub async fn sync_identifier_step<C: SyncContext>(
438 .into_iter() 637 .into_iter()
439 .collect(); 638 .collect();
440 639
441 if all_urls.is_empty() { 640 // 5. Group by domain
442 return Ok(SyncStepResult::NoMoreUrls); 641 let urls_by_domain: HashMap<String, Vec<String>> = all_urls
642 .iter()
643 .fold(HashMap::new(), |mut acc, url| {
644 if let Some(d) = extract_domain(url) {
645 acc.entry(d).or_default().push(url.clone());
646 }
647 acc
648 });
649
650 // 6. Find an available URL
651 match domain {
652 Some(specific_domain) => {
653 // Only look at URLs from this specific domain
654 urls_by_domain
655 .get(specific_domain)
656 .and_then(|urls| {
657 urls.iter()
658 .find(|url| !tried_urls.contains(*url))
659 .cloned()
660 })
661 }
662 None => {
663 // Try any non-throttled domain
664 for (d, domain_urls) in &urls_by_domain {
665 if throttle_manager.is_throttled(d) {
666 continue;
667 }
668 if let Some(url) = domain_urls.iter().find(|url| !tried_urls.contains(*url)) {
669 return Some(url.clone());
670 }
671 }
672 None
673 }
443 } 674 }
675}
676
677/// Information about throttled domains with untried URLs
678#[derive(Debug, Clone)]
679pub struct ThrottledDomainInfo {
680 pub domain: String,
681 pub tried_urls_for_domain: HashSet<String>,
682}
683
684/// Get information about throttled domains that have untried URLs.
685///
686/// Called by main sync loop to know which DomainThrottle queues to add the identifier to.
687pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>(
688 ctx: &C,
689 identifier: &str,
690 tried_urls: &HashSet<String>,
691 throttle_manager: &ThrottleManager,
692) -> Vec<ThrottledDomainInfo> {
693 let repo_data = match ctx.fetch_repository_data(identifier).await {
694 Ok(data) => data,
695 Err(_) => return vec![],
696 };
697
698 let all_urls: Vec<String> = repo_data
699 .announcements
700 .iter()
701 .flat_map(|a| a.clone_urls.iter().cloned())
702 .filter(|url| ctx.our_domain().map_or(true, |d| !url.contains(d)))
703 .collect::<HashSet<_>>()
704 .into_iter()
705 .collect();
444 706
445 // 5. Group by domain and find an available URL
446 let urls_by_domain: HashMap<String, Vec<String>> = all_urls 707 let urls_by_domain: HashMap<String, Vec<String>> = all_urls
447 .iter() 708 .iter()
448 .fold(HashMap::new(), |mut acc, url| { 709 .fold(HashMap::new(), |mut acc, url| {
449 acc.entry(extract_domain(url)).or_default().push(url.clone()); 710 if let Some(d) = extract_domain(url) {
711 acc.entry(d).or_default().push(url.clone());
712 }
450 acc 713 acc
451 }); 714 });
452 715
453 // 6. Try to get a URL from any domain that has capacity 716 urls_by_domain
454 let mut min_wait: Option<Duration> = None; 717 .into_iter()
455 718 .filter_map(|(domain, domain_urls)| {
456 for (domain, domain_urls) in &urls_by_domain { 719 if !throttle_manager.is_throttled(&domain) {
457 if let Some(url) = throttle.get_next_url(domain, domain_urls, tried_urls) { 720 return None; // Not throttled, skip
458 // Found a URL to try! 721 }
459 let target_repo = match ctx.find_target_repo(&db_repo_data) {
460 Some(path) => path,
461 None => return Ok(SyncStepResult::NoMoreUrls),
462 };
463
464 // Start the fetch
465 throttle.start_request(domain);
466 let oids_to_fetch: Vec<String> = needed_oids.iter().cloned().collect();
467 let fetch_result = ctx.fetch_oids(&target_repo, &url, &oids_to_fetch).await;
468 throttle.complete_request(domain);
469 722
470 let oids_fetched = match fetch_result { 723 let untried: Vec<_> = domain_urls
471 Ok(fetched) => fetched.len(), 724 .iter()
472 Err(e) => { 725 .filter(|url| !tried_urls.contains(*url))
473 tracing::debug!(url = %url, error = %e, "Fetch failed"); 726 .collect();
474 0
475 }
476 };
477 727
478 // Try to process any events that can now be satisfied 728 if untried.is_empty() {
479 if oids_fetched > 0 { 729 return None; // All URLs tried for this domain
480 let _ = ctx.process_satisfiable_events(identifier).await;
481 } 730 }
482 731
483 return Ok(SyncStepResult::TriedUrl { url, oids_fetched }); 732 // Collect tried URLs that belong to this domain
484 } else { 733 let tried_urls_for_domain: HashSet<String> = tried_urls
485 // Domain throttled or all URLs tried 734 .iter()
486 let untried_exist = domain_urls.iter().any(|u| !tried_urls.contains(u)); 735 .filter(|url| extract_domain(url).as_deref() == Some(&domain))
487 if untried_exist { 736 .cloned()
488 // URLs exist but domain is throttled 737 .collect();
489 if let Some(wait) = throttle.time_until_capacity(domain) { 738
490 min_wait = Some(min_wait.map_or(wait, |m| m.min(wait))); 739 Some(ThrottledDomainInfo {
491 } 740 domain,
492 } 741 tried_urls_for_domain,
742 })
743 })
744 .collect()
745}
746```
747
748### sync_identifier_from_url
749
750```rust
751/// Fetch git data from a specific URL for an identifier.
752///
753/// This function:
754/// 1. Records the request with the throttle manager
755/// 2. Performs the actual git fetch
756/// 3. Processes any events that can now be satisfied
757/// 4. Records request completion
758///
759/// Returns the number of OIDs successfully fetched.
760pub async fn sync_identifier_from_url<C: SyncContext>(
761 ctx: &C,
762 identifier: &str,
763 url: &str,
764 throttle_manager: &ThrottleManager,
765) -> usize {
766 let domain = match extract_domain(url) {
767 Some(d) => d,
768 None => return 0,
769 };
770
771 // Get repository data for target repo path
772 let repo_data = match ctx.fetch_repository_data(identifier).await {
773 Ok(data) => data,
774 Err(e) => {
775 tracing::debug!(identifier = %identifier, error = %e, "Failed to fetch repo data");
776 return 0;
493 } 777 }
778 };
779
780 let target_repo = match ctx.find_target_repo(&repo_data) {
781 Some(path) => path,
782 None => {
783 tracing::debug!(identifier = %identifier, "No target repo found");
784 return 0;
785 }
786 };
787
788 // Collect needed OIDs
789 let needed_oids: Vec<String> = ctx.collect_needed_oids(identifier).into_iter().collect();
790 if needed_oids.is_empty() {
791 return 0;
494 } 792 }
495 793
496 // Check if all URLs have been tried 794 // Perform the fetch
497 let all_tried = all_urls.iter().all(|url| tried_urls.contains(url)); 795 throttle_manager.start_request(&domain);
498 if all_tried { 796 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await;
499 return Ok(SyncStepResult::NoMoreUrls); 797 throttle_manager.complete_request(&domain);
798
799 let oids_fetched = match fetch_result {
800 Ok(fetched) => {
801 tracing::debug!(
802 identifier = %identifier,
803 url = %url,
804 oids_fetched = fetched.len(),
805 "Fetch succeeded"
806 );
807 fetched.len()
808 }
809 Err(e) => {
810 tracing::debug!(
811 identifier = %identifier,
812 url = %url,
813 error = %e,
814 "Fetch failed"
815 );
816 0
817 }
818 };
819
820 // Try to process any events that can now be satisfied
821 if oids_fetched > 0 {
822 if let Err(e) = ctx.process_satisfiable_events(identifier).await {
823 tracing::warn!(
824 identifier = %identifier,
825 error = %e,
826 "Failed to process satisfiable events"
827 );
828 }
500 } 829 }
501 830
502 // Some URLs exist but all domains are throttled 831 oids_fetched
503 Ok(SyncStepResult::AllUrlsThrottled {
504 wait_duration: min_wait.unwrap_or(Duration::from_millis(100)),
505 })
506} 832}
507``` 833```
508 834
509### The Sync Identifier Loop 835### The Sync Identifier Loop (Main Sync)
510 836
511```rust 837```rust
512/// Sync git data for an identifier. 838/// Sync git data for an identifier.
513/// 839///
514/// Returns true if sync completed successfully (no more pending events), 840/// This is called by the main sync loop. It:
515/// false if we exhausted all options but events remain. 841/// 1. Tries all non-throttled URLs
842/// 2. Enqueues with throttled domains for later processing
843/// 3. Returns without waiting for throttled domains
844///
845/// Returns true if sync completed (no pending events or no OIDs needed),
846/// false if events remain (will be retried after backoff).
516pub async fn sync_identifier<C: SyncContext>( 847pub async fn sync_identifier<C: SyncContext>(
517 ctx: &C, 848 ctx: &C,
518 identifier: &str, 849 identifier: &str,
519 throttle: &DomainThrottle, 850 throttle_manager: &ThrottleManager,
520) -> bool { 851) -> bool {
521 let mut tried_urls: HashSet<String> = HashSet::new(); 852 let mut tried_urls: HashSet<String> = HashSet::new();
522 853
854 // Try all non-throttled URLs
523 loop { 855 loop {
524 match sync_identifier_step(ctx, identifier, &tried_urls, throttle).await { 856 match sync_identifier_next_url(
525 Ok(SyncStepResult::TriedUrl { url, oids_fetched }) => { 857 ctx,
526 tried_urls.insert(url.clone()); 858 identifier,
527 tracing::debug!( 859 None, // Any domain
528 identifier = %identifier, 860 &tried_urls,
529 url = %url, 861 throttle_manager,
530 oids_fetched = oids_fetched, 862 ).await {
531 "Tried URL" 863 Some(url) => {
532 ); 864 // Found a non-throttled URL to try
533 // Continue looping 865 sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await;
534 } 866 tried_urls.insert(url);
535 867
536 Ok(SyncStepResult::AllUrlsThrottled { wait_duration }) => { 868 // Check if sync is now complete
537 tracing::debug!( 869 if !ctx.has_pending_events(identifier) {
538 identifier = %identifier, 870 tracing::info!(identifier = %identifier, "Sync complete - no pending events");
539 wait_ms = wait_duration.as_millis(), 871 return true;
540 "All domains throttled, waiting" 872 }
541 ); 873
542 tokio::time::sleep(wait_duration).await; 874 let needed_oids = ctx.collect_needed_oids(identifier);
543 // Continue looping 875 if needed_oids.is_empty() {
544 } 876 // Process any remaining satisfiable events
545 877 let _ = ctx.process_satisfiable_events(identifier).await;
546 Ok(SyncStepResult::NoMoreUrls) => { 878 tracing::info!(identifier = %identifier, "Sync complete - all OIDs available");
547 tracing::debug!(identifier = %identifier, "No more URLs to try"); 879 return true;
548 return false; // Events remain but no URLs left 880 }
549 } 881
550 882 // Continue trying more URLs
551 Ok(SyncStepResult::Complete) => {
552 tracing::info!(identifier = %identifier, "Sync complete");
553 return true;
554 }
555
556 Ok(SyncStepResult::NoPendingEvents) => {
557 tracing::debug!(identifier = %identifier, "No pending events");
558 return true;
559 } 883 }
560 884 None => {
561 Err(e) => { 885 // No more non-throttled URLs available
562 tracing::warn!(identifier = %identifier, error = %e, "Sync step error"); 886 break;
563 return false;
564 } 887 }
565 } 888 }
566 } 889 }
890
891 // Check if we're done (no pending events or no needed OIDs)
892 if !ctx.has_pending_events(identifier) {
893 return true;
894 }
895
896 let needed_oids = ctx.collect_needed_oids(identifier);
897 if needed_oids.is_empty() {
898 let _ = ctx.process_satisfiable_events(identifier).await;
899 return true;
900 }
901
902 // Enqueue with any throttled domains that have untried URLs
903 let throttled_domains = get_throttled_domains_with_untried_urls(
904 ctx,
905 identifier,
906 &tried_urls,
907 throttle_manager,
908 ).await;
909
910 for info in throttled_domains {
911 tracing::debug!(
912 identifier = %identifier,
913 domain = %info.domain,
914 "Enqueueing with throttled domain"
915 );
916 throttle_manager.enqueue_identifier(
917 &info.domain,
918 identifier.to_string(),
919 info.tried_urls_for_domain,
920 );
921 }
922
923 // Return false - events remain, will retry after backoff
924 // (throttled domains will process independently)
925 false
567} 926}
568``` 927```
569 928
@@ -576,7 +935,7 @@ impl Purgatory {
576 database: SharedDatabase, 935 database: SharedDatabase,
577 our_domain: Option<String>, 936 our_domain: Option<String>,
578 local_relay: Option<nostr_relay_builder::LocalRelay>, 937 local_relay: Option<nostr_relay_builder::LocalRelay>,
579 throttle: Arc<DomainThrottle>, 938 throttle_manager: Arc<ThrottleManager>,
580 ) -> tokio::task::JoinHandle<()> { 939 ) -> tokio::task::JoinHandle<()> {
581 tokio::spawn(async move { 940 tokio::spawn(async move {
582 let mut interval = tokio::time::interval(Duration::from_secs(1)); 941 let mut interval = tokio::time::interval(Duration::from_secs(1));
@@ -613,7 +972,7 @@ impl Purgatory {
613 let db = database.clone(); 972 let db = database.clone();
614 let domain = our_domain.clone(); 973 let domain = our_domain.clone();
615 let relay = local_relay.clone(); 974 let relay = local_relay.clone();
616 let throttle = throttle.clone(); 975 let throttle_manager = throttle_manager.clone();
617 let id = identifier.clone(); 976 let id = identifier.clone();
618 977
619 tokio::spawn(async move { 978 tokio::spawn(async move {
@@ -625,13 +984,14 @@ impl Purgatory {
625 relay, 984 relay,
626 ); 985 );
627 986
628 let complete = sync_identifier(&ctx, &id, &throttle).await; 987 let complete = sync_identifier(&ctx, &id, &throttle_manager).await;
629 988
630 if complete || !purgatory.has_pending_events(&id) { 989 if complete || !purgatory.has_pending_events(&id) {
631 purgatory.sync_queue.remove(&id); 990 purgatory.sync_queue.remove(&id);
632 tracing::info!(identifier = %id, "Removed from sync queue"); 991 tracing::info!(identifier = %id, "Removed from sync queue");
633 } else { 992 } else {
634 // Apply backoff 993 // Apply backoff - will retry later
994 // (throttled domains are being processed independently)
635 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { 995 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) {
636 entry.on_sync_complete(); 996 entry.on_sync_complete();
637 } 997 }
@@ -667,7 +1027,6 @@ mod tests {
667 #[async_trait] 1027 #[async_trait]
668 impl SyncContext for MockSyncContext { 1028 impl SyncContext for MockSyncContext {
669 async fn fetch_repository_data(&self, _id: &str) -> Result<RepositoryData> { 1029 async fn fetch_repository_data(&self, _id: &str) -> Result<RepositoryData> {
670 // Return mock data with our available_urls
671 Ok(RepositoryData { 1030 Ok(RepositoryData {
672 announcements: vec![MockAnnouncement { 1031 announcements: vec![MockAnnouncement {
673 clone_urls: self.available_urls.clone(), 1032 clone_urls: self.available_urls.clone(),
@@ -686,7 +1045,6 @@ mod tests {
686 } 1045 }
687 1046
688 async fn fetch_oids(&self, _path: &Path, url: &str, _oids: &[String]) -> Result<Vec<String>> { 1047 async fn fetch_oids(&self, _path: &Path, url: &str, _oids: &[String]) -> Result<Vec<String>> {
689 // Return pre-configured fetch result for this URL
690 Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) 1048 Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default())
691 } 1049 }
692 1050
@@ -709,67 +1067,102 @@ mod tests {
709 } 1067 }
710 1068
711 #[tokio::test] 1069 #[tokio::test]
712 async fn test_sync_step_no_pending_events() { 1070 async fn test_next_url_no_pending_events() {
713 let ctx = MockSyncContext { 1071 let ctx = MockSyncContext {
714 pending_events: RefCell::new(false), 1072 pending_events: RefCell::new(false),
1073 needed_oids: RefCell::new(HashSet::new()),
1074 available_urls: vec!["https://example.com/repo.git".to_string()],
715 ..Default::default() 1075 ..Default::default()
716 }; 1076 };
717 let throttle = DomainThrottle::new(5, 30); 1077 let throttle_manager = ThrottleManager::new(5, 30);
718 let tried = HashSet::new(); 1078 let tried = HashSet::new();
719 1079
720 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); 1080 let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await;
721 assert_eq!(result, SyncStepResult::NoPendingEvents); 1081 assert!(result.is_none());
722 } 1082 }
723 1083
724 #[tokio::test] 1084 #[tokio::test]
725 async fn test_sync_step_no_oids_needed() { 1085 async fn test_next_url_no_oids_needed() {
726 let ctx = MockSyncContext { 1086 let ctx = MockSyncContext {
727 pending_events: RefCell::new(true), 1087 pending_events: RefCell::new(true),
728 needed_oids: RefCell::new(HashSet::new()), // Empty = no OIDs needed 1088 needed_oids: RefCell::new(HashSet::new()), // Empty = no OIDs needed
1089 available_urls: vec!["https://example.com/repo.git".to_string()],
729 ..Default::default() 1090 ..Default::default()
730 }; 1091 };
731 let throttle = DomainThrottle::new(5, 30); 1092 let throttle_manager = ThrottleManager::new(5, 30);
732 let tried = HashSet::new(); 1093 let tried = HashSet::new();
733 1094
734 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); 1095 let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await;
735 assert_eq!(result, SyncStepResult::Complete); 1096 assert!(result.is_none()); // No URL needed, sync is complete
736 assert_eq!(*ctx.processed_count.borrow(), 1);
737 } 1097 }
738 1098
739 #[tokio::test] 1099 #[tokio::test]
740 async fn test_sync_step_tries_url() { 1100 async fn test_next_url_returns_non_throttled() {
741 let mut needed = HashSet::new(); 1101 let mut needed = HashSet::new();
742 needed.insert("abc123".to_string()); 1102 needed.insert("abc123".to_string());
743 1103
744 let mut fetch_results = HashMap::new();
745 fetch_results.insert(
746 "https://example.com/repo.git".to_string(),
747 vec!["abc123".to_string()],
748 );
749
750 let ctx = MockSyncContext { 1104 let ctx = MockSyncContext {
751 pending_events: RefCell::new(true), 1105 pending_events: RefCell::new(true),
752 needed_oids: RefCell::new(needed), 1106 needed_oids: RefCell::new(needed),
753 available_urls: vec!["https://example.com/repo.git".to_string()], 1107 available_urls: vec!["https://example.com/repo.git".to_string()],
754 fetch_results: RefCell::new(fetch_results), 1108 ..Default::default()
755 processed_count: RefCell::new(0),
756 }; 1109 };
757 let throttle = DomainThrottle::new(5, 30); 1110 let throttle_manager = ThrottleManager::new(5, 30);
758 let tried = HashSet::new(); 1111 let tried = HashSet::new();
759 1112
760 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); 1113 let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await;
1114 assert_eq!(result, Some("https://example.com/repo.git".to_string()));
1115 }
1116
1117 #[tokio::test]
1118 async fn test_next_url_skips_tried() {
1119 let mut needed = HashSet::new();
1120 needed.insert("abc123".to_string());
761 1121
762 match result { 1122 let ctx = MockSyncContext {
763 SyncStepResult::TriedUrl { url, oids_fetched } => { 1123 pending_events: RefCell::new(true),
764 assert_eq!(url, "https://example.com/repo.git"); 1124 needed_oids: RefCell::new(needed),
765 assert_eq!(oids_fetched, 1); 1125 available_urls: vec![
766 } 1126 "https://example.com/repo.git".to_string(),
767 _ => panic!("Expected TriedUrl, got {:?}", result), 1127 "https://other.com/repo.git".to_string(),
768 } 1128 ],
1129 ..Default::default()
1130 };
1131 let throttle_manager = ThrottleManager::new(5, 30);
1132
1133 let mut tried = HashSet::new();
1134 tried.insert("https://example.com/repo.git".to_string());
1135
1136 let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await;
1137 assert_eq!(result, Some("https://other.com/repo.git".to_string()));
1138 }
1139
1140 #[tokio::test]
1141 async fn test_next_url_specific_domain() {
1142 let mut needed = HashSet::new();
1143 needed.insert("abc123".to_string());
1144
1145 let ctx = MockSyncContext {
1146 pending_events: RefCell::new(true),
1147 needed_oids: RefCell::new(needed),
1148 available_urls: vec![
1149 "https://example.com/repo.git".to_string(),
1150 "https://other.com/repo.git".to_string(),
1151 ],
1152 ..Default::default()
1153 };
1154 let throttle_manager = ThrottleManager::new(5, 30);
1155 let tried = HashSet::new();
1156
1157 // Request specific domain
1158 let result = sync_identifier_next_url(
1159 &ctx, "test", Some("other.com"), &tried, &throttle_manager
1160 ).await;
1161 assert_eq!(result, Some("https://other.com/repo.git".to_string()));
769 } 1162 }
770 1163
771 #[tokio::test] 1164 #[tokio::test]
772 async fn test_sync_step_all_urls_tried() { 1165 async fn test_next_url_none_when_all_tried() {
773 let mut needed = HashSet::new(); 1166 let mut needed = HashSet::new();
774 needed.insert("abc123".to_string()); 1167 needed.insert("abc123".to_string());
775 1168
@@ -777,60 +1170,58 @@ mod tests {
777 pending_events: RefCell::new(true), 1170 pending_events: RefCell::new(true),
778 needed_oids: RefCell::new(needed), 1171 needed_oids: RefCell::new(needed),
779 available_urls: vec!["https://example.com/repo.git".to_string()], 1172 available_urls: vec!["https://example.com/repo.git".to_string()],
780 fetch_results: RefCell::new(HashMap::new()), 1173 ..Default::default()
781 processed_count: RefCell::new(0),
782 }; 1174 };
783 let throttle = DomainThrottle::new(5, 30); 1175 let throttle_manager = ThrottleManager::new(5, 30);
784 1176
785 // Mark the only URL as tried
786 let mut tried = HashSet::new(); 1177 let mut tried = HashSet::new();
787 tried.insert("https://example.com/repo.git".to_string()); 1178 tried.insert("https://example.com/repo.git".to_string());
788 1179
789 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); 1180 let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await;
790 assert_eq!(result, SyncStepResult::NoMoreUrls); 1181 assert!(result.is_none());
791 } 1182 }
792 1183
793 #[tokio::test] 1184 #[tokio::test]
794 async fn test_sync_step_domain_throttled() { 1185 async fn test_from_url_fetches_and_processes() {
795 let mut needed = HashSet::new(); 1186 let mut needed = HashSet::new();
796 needed.insert("abc123".to_string()); 1187 needed.insert("abc123".to_string());
797 1188
1189 let mut fetch_results = HashMap::new();
1190 fetch_results.insert(
1191 "https://example.com/repo.git".to_string(),
1192 vec!["abc123".to_string()],
1193 );
1194
798 let ctx = MockSyncContext { 1195 let ctx = MockSyncContext {
799 pending_events: RefCell::new(true), 1196 pending_events: RefCell::new(true),
800 needed_oids: RefCell::new(needed), 1197 needed_oids: RefCell::new(needed),
801 available_urls: vec!["https://example.com/repo.git".to_string()], 1198 available_urls: vec!["https://example.com/repo.git".to_string()],
802 fetch_results: RefCell::new(HashMap::new()), 1199 fetch_results: RefCell::new(fetch_results),
803 processed_count: RefCell::new(0), 1200 processed_count: RefCell::new(0),
804 }; 1201 };
1202 let throttle_manager = Arc::new(ThrottleManager::new(5, 30));
805 1203
806 // Create throttle with 0 concurrent limit 1204 let oids_fetched = sync_identifier_from_url(
807 let throttle = DomainThrottle::new(0, 30); 1205 &ctx, "test", "https://example.com/repo.git", &throttle_manager
808 let tried = HashSet::new(); 1206 ).await;
809
810 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap();
811 1207
812 match result { 1208 assert_eq!(oids_fetched, 1);
813 SyncStepResult::AllUrlsThrottled { .. } => {} 1209 assert_eq!(*ctx.processed_count.borrow(), 1);
814 _ => panic!("Expected AllUrlsThrottled, got {:?}", result),
815 }
816 } 1210 }
817 1211
818 #[tokio::test] 1212 #[tokio::test]
819 async fn test_full_sync_loop() { 1213 async fn test_full_sync_with_throttled_domains() {
820 let mut needed = HashSet::new(); 1214 let mut needed = HashSet::new();
821 needed.insert("abc123".to_string()); 1215 needed.insert("abc123".to_string());
822 needed.insert("def456".to_string());
823 1216
824 let mut fetch_results = HashMap::new(); 1217 let mut fetch_results = HashMap::new();
825 // First URL returns one OID
826 fetch_results.insert( 1218 fetch_results.insert(
827 "https://server1.com/repo.git".to_string(), 1219 "https://server1.com/repo.git".to_string(),
828 vec!["abc123".to_string()], 1220 vec![], // First server doesn't have the OID
829 ); 1221 );
830 // Second URL returns the other
831 fetch_results.insert( 1222 fetch_results.insert(
832 "https://server2.com/repo.git".to_string(), 1223 "https://server2.com/repo.git".to_string(),
833 vec!["def456".to_string()], 1224 vec!["abc123".to_string()], // Second server has it
834 ); 1225 );
835 1226
836 let ctx = MockSyncContext { 1227 let ctx = MockSyncContext {
@@ -844,15 +1235,63 @@ mod tests {
844 processed_count: RefCell::new(0), 1235 processed_count: RefCell::new(0),
845 }; 1236 };
846 1237
847 // Simulate OIDs being removed as they're fetched 1238 let throttle_manager = Arc::new(ThrottleManager::new(5, 30));
848 // (In real code, collect_needed_oids would return fewer OIDs) 1239
1240 // Manually throttle server2.com to test enqueueing
1241 // (In real code, this would happen due to rate limits)
1242 // For this test, we just verify the sync tries available URLs
849 1243
850 let throttle = DomainThrottle::new(5, 30); 1244 let complete = sync_identifier(&ctx, "test", &throttle_manager).await;
851 let complete = sync_identifier(&ctx, "test", &throttle).await;
852 1245
853 // Should have tried both URLs 1246 // Should have processed events (found OID from server2)
854 assert!(*ctx.processed_count.borrow() >= 1); 1247 assert!(*ctx.processed_count.borrow() >= 1);
855 } 1248 }
1249
1250 #[tokio::test]
1251 async fn test_domain_throttle_queue_round_robin() {
1252 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30);
1253
1254 // Enqueue three identifiers
1255 throttle.enqueue_identifier("id1".to_string(), HashSet::new());
1256 throttle.enqueue_identifier("id2".to_string(), HashSet::new());
1257 throttle.enqueue_identifier("id3".to_string(), HashSet::new());
1258
1259 // Should get them in round-robin order
1260 assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string()));
1261 throttle.mark_identifier_not_in_progress("id1");
1262
1263 assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string()));
1264 throttle.mark_identifier_not_in_progress("id2");
1265
1266 assert_eq!(throttle.next_ready_identifier(), Some("id3".to_string()));
1267 throttle.mark_identifier_not_in_progress("id3");
1268
1269 // Back to id1
1270 assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string()));
1271 }
1272
1273 #[tokio::test]
1274 async fn test_domain_throttle_skips_in_progress() {
1275 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30);
1276
1277 throttle.enqueue_identifier("id1".to_string(), HashSet::new());
1278 throttle.enqueue_identifier("id2".to_string(), HashSet::new());
1279
1280 // Get id1 (marks it in_progress)
1281 assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string()));
1282
1283 // Next should skip id1 and return id2
1284 assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string()));
1285
1286 // Both in progress, should return None
1287 assert_eq!(throttle.next_ready_identifier(), None);
1288
1289 // Mark id1 not in progress
1290 throttle.mark_identifier_not_in_progress("id1");
1291
1292 // Now id1 should be available again
1293 assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string()));
1294 }
856} 1295}
857``` 1296```
858 1297
@@ -866,11 +1305,12 @@ mod tests {
866 1305
867## Migration Path 1306## Migration Path
868 1307
8691. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle, SyncContext trait) 13081. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait)
8702. **Phase 2**: Implement `sync_identifier_step` with unit tests 13092. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests
8713. **Phase 3**: Implement main sync loop alongside existing `start_state_sync` 13103. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync`
8724. **Phase 4**: Add PR event syncing 13114. **Phase 4**: Implement ThrottleManager domain loops
8735. **Phase 5**: Remove old `start_state_sync` code 13125. **Phase 5**: Add PR event syncing
13136. **Phase 6**: Remove old `start_state_sync` code
874 1314
875## Configuration 1315## Configuration
876 1316