From 33b716cf9eb88d95394423b77b779009056d4d5a Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 6 Jan 2026 17:15:16 +0000 Subject: docs: purgatory design improvements --- docs/explanation/purgatory-sync-redesign.md | 299 ++++++++++++++++++---------- 1 file changed, 194 insertions(+), 105 deletions(-) (limited to 'docs') diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index 7501da2..382f683 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md @@ -94,19 +94,21 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**, │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ DomainThrottle (per domain) │ │ │ │ │ │ │ │ │ │ │ │ -│ │ │ │ Rate limiting: │ Waiting queue: │ │ │ │ -│ │ │ │ - in_flight: u32 │ - waiting_queue: VecDeque │ │ │ │ -│ │ │ │ - request_times │ - identifier_state: HashMap │ │ │ │ -│ │ │ │ │ - tried_urls (this domain)│ │ │ │ -│ │ │ │ │ - in_progress │ │ │ │ +│ │ │ │ Rate limiting: │ Queue (IndexMap for ordering): │ │ │ │ +│ │ │ │ - in_flight: u32 │ - queue: IndexMap │ │ │ │ +│ │ │ │ - request_times │ - State: tried_urls, │ │ │ │ +│ │ │ │ - round_robin_index │ in_progress │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ │ -│ │ │ Domain Loop (per domain, runs independently): │ │ │ -│ │ │ 1. Wait for capacity │ │ │ -│ │ │ 2. Pick next identifier (round-robin, not in_progress) │ │ │ -│ │ │ 3. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │ -│ │ │ 4. If url: sync_identifier_from_url(url), mark tried │ │ │ -│ │ │ Else: remove identifier from queue │ │ │ +│ │ │ Trigger-based processing (no polling loop): │ │ │ +│ │ │ - enqueue_identifier() triggers if capacity available │ │ │ +│ │ │ - complete_request() triggers next item if capacity available │ │ │ +│ │ │ │ │ │ +│ │ │ process_queued_identifier(): │ │ │ +│ │ │ 1. Pick next identifier (round-robin, not in_progress) │ │ │ +│ │ │ 2. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │ +│ │ │ 3. If url: sync_identifier_from_url(url), mark tried │ │ │ +│ │ │ Else: remove identifier from queue, try next │ │ │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ └────────────────────────────────────────────────────────────────────────────┘ │ @@ -157,12 +159,12 @@ The `domain` parameter determines behavior: - When no non-throttled URLs remain: enqueue with throttled domains, return - When task completes: apply backoff or remove from queue -3. **ThrottleManager / DomainThrottle loops** (independent): - - Each domain has its own loop checking for capacity - - When capacity available: pick next queued identifier (round-robin, not in_progress) +3. **ThrottleManager / DomainThrottle** (trigger-based, no polling): + - Processing triggered by `enqueue_identifier()` or `complete_request()` + - When triggered and capacity available: pick next queued identifier (round-robin, not in_progress) - Call `sync_identifier_next_url(domain=Some(this_domain))` - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress - - If no URL: remove identifier from this domain's queue + - If no URL: remove identifier from queue, try next identifier ## Data Structures @@ -234,12 +236,16 @@ Manages all per-domain throttles and provides the interface for checking throttl /// /// Owns a collection of DomainThrottle instances and provides: /// - Throttle status checking for sync_identifier_next_url -/// - Domain throttle loop spawning /// - Identifier queue management +/// - Trigger-based processing when capacity frees up pub struct ThrottleManager { /// Per-domain throttle state throttles: DashMap, + /// Sync context for processing queued identifiers + /// Set once at startup via set_context() + ctx: OnceLock>, + /// Configuration max_concurrent_per_domain: u32, max_per_minute_per_domain: u32, @@ -249,11 +255,17 @@ impl ThrottleManager { pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { Self { throttles: DashMap::new(), + ctx: OnceLock::new(), max_concurrent_per_domain: max_concurrent, max_per_minute_per_domain: max_per_minute, } } + /// Set the sync context (called once at startup) + pub fn set_context(&self, ctx: Arc) { + let _ = self.ctx.set(ctx); + } + /// Check if a domain is currently throttled (at capacity) pub fn is_throttled(&self, domain: &str) -> bool { self.throttles @@ -277,68 +289,69 @@ impl ThrottleManager { self.get_or_create(domain).start_request(); } - /// Record that a request completed for a domain - pub fn complete_request(&self, domain: &str) { - if let Some(mut throttle) = self.throttles.get_mut(domain) { - throttle.complete_request(); + /// Record that a request completed for a domain. + /// Triggers processing of next queued identifier if capacity available. + pub fn complete_request(self: &Arc, domain: &str) { + let should_trigger = { + if let Some(mut throttle) = self.throttles.get_mut(domain) { + throttle.complete_request(); + throttle.has_capacity() && throttle.has_queued_work() + } else { + false + } + }; + + if should_trigger { + self.try_process_next(domain); } } - /// Add an identifier to a domain's waiting queue + /// Add an identifier to a domain's waiting queue. + /// Triggers processing if capacity is available. pub fn enqueue_identifier( - &self, + self: &Arc, domain: &str, identifier: String, tried_urls_for_domain: HashSet, ) { - self.get_or_create(domain) - .enqueue_identifier(identifier, tried_urls_for_domain); + let should_trigger = { + let mut throttle = self.get_or_create(domain); + throttle.enqueue_identifier(identifier, tried_urls_for_domain); + throttle.has_capacity() + }; + + if should_trigger { + self.try_process_next(domain); + } } - /// Start the throttle loop for all domains (called once at startup) - pub fn start_domain_loops( - self: Arc, - ctx: Arc, - ) -> Vec> { - // Spawn a single coordinator loop that manages all domains - vec![tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(100)); - - loop { - interval.tick().await; - - // Check each domain for capacity and queued work - for mut entry in self.throttles.iter_mut() { - let domain = entry.key().clone(); - let throttle = entry.value_mut(); - - if throttle.has_capacity() { - if let Some(identifier) = throttle.next_ready_identifier() { - let ctx = ctx.clone(); - let manager = self.clone(); - let domain_clone = domain.clone(); - - tokio::spawn(async move { - manager.process_queued_identifier( - &ctx, - &domain_clone, - &identifier, - ).await; - }); - } - } - } + /// Try to process the next queued identifier for a domain + fn try_process_next(self: &Arc, domain: &str) { + let identifier = { + if let Some(mut throttle) = self.throttles.get_mut(domain) { + throttle.next_ready_identifier() + } else { + None } - })] + }; + + if let Some(identifier) = identifier { + let manager = self.clone(); + let domain = domain.to_string(); + + tokio::spawn(async move { + manager.process_queued_identifier(&domain, &identifier).await; + }); + } } /// Process a single identifier from a domain's queue - async fn process_queued_identifier( - &self, - ctx: &C, - domain: &str, - identifier: &str, - ) { + async fn process_queued_identifier(self: &Arc, domain: &str, identifier: &str) { + let ctx = match self.ctx.get() { + Some(ctx) => ctx, + None => return, + }; + // Get next URL for this identifier on this domain let url = { let throttle = match self.throttles.get(domain) { @@ -348,7 +361,7 @@ impl ThrottleManager { let tried_urls = throttle.get_tried_urls(identifier); sync_identifier_next_url( - ctx, + ctx.as_ref(), identifier, Some(domain), &tried_urls, @@ -358,10 +371,11 @@ impl ThrottleManager { match url { Some(url) => { - // Fetch from this URL - sync_identifier_from_url(ctx, identifier, &url, self).await; + // Fetch from this URL (this calls start_request/complete_request internally) + sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await; - // Record URL as tried + // Record URL as tried and mark not in_progress + // complete_request() will trigger next item if capacity available if let Some(mut throttle) = self.throttles.get_mut(domain) { throttle.mark_url_tried(identifier, url); throttle.mark_identifier_not_in_progress(identifier); @@ -372,6 +386,8 @@ impl ThrottleManager { if let Some(mut throttle) = self.throttles.get_mut(domain) { throttle.remove_identifier(identifier); } + // Try next identifier since we didn't use any capacity + self.try_process_next(domain); } } } @@ -387,9 +403,10 @@ Per-domain rate limiting and waiting queue: /// /// Handles: /// - Rate limiting (concurrent requests, requests per minute) -/// - Queue of identifiers waiting for capacity +/// - Queue of identifiers waiting for capacity (using IndexMap for round-robin order) /// - Tracking tried URLs per identifier (for this domain only) -/// - In-progress flag per identifier (prevent concurrent fetches for same identifier) +/// - In-progress flag per identifier (prevents concurrent fetches for same identifier +/// on this domain, important when queue is small and we have multiple concurrent slots) pub struct DomainThrottle { /// Domain this throttle manages domain: String, @@ -400,12 +417,12 @@ pub struct DomainThrottle { /// Request timestamps (sliding window for rate limiting) request_times: VecDeque, - /// Identifiers waiting for capacity on this domain - /// Stored in order for round-robin processing - waiting_queue: VecDeque, + /// Queued identifiers with their state. + /// IndexMap preserves insertion order for round-robin processing. + queue: IndexMap, - /// Per-identifier state for queued identifiers - identifier_state: HashMap, + /// Round-robin index for fair processing across identifiers + round_robin_index: usize, /// Configuration max_concurrent: u32, @@ -418,7 +435,10 @@ struct IdentifierQueueState { /// URLs from this domain that have been tried tried_urls: HashSet, - /// Whether a fetch is currently in progress for this identifier on this domain + /// Whether a fetch is currently in progress for this identifier on this domain. + /// Prevents starting multiple concurrent fetches for the same identifier, + /// which is important when the queue is small (e.g., 2 identifiers with 5 + /// concurrent slots would otherwise try to process the same identifier multiple times). in_progress: bool, } @@ -428,8 +448,8 @@ impl DomainThrottle { domain, in_flight: 0, request_times: VecDeque::new(), - waiting_queue: VecDeque::new(), - identifier_state: HashMap::new(), + queue: IndexMap::new(), + round_robin_index: 0, max_concurrent, max_per_minute, } @@ -451,6 +471,11 @@ impl DomainThrottle { recent_count < self.max_per_minute as usize } + /// Check if there are any identifiers in the queue + pub fn has_queued_work(&self) -> bool { + !self.queue.is_empty() + } + /// Record that a request is starting pub fn start_request(&mut self) { self.in_flight += 1; @@ -469,15 +494,12 @@ impl DomainThrottle { } } - /// Add an identifier to the waiting queue + /// Add an identifier to the queue pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet) { - if !self.identifier_state.contains_key(&identifier) { - self.waiting_queue.push_back(identifier.clone()); - } - // Update or insert state (merge tried_urls if already exists) - self.identifier_state + self.queue .entry(identifier) .and_modify(|state| { + // Merge tried_urls if already exists state.tried_urls.extend(tried_urls.iter().cloned()); }) .or_insert(IdentifierQueueState { @@ -486,28 +508,35 @@ impl DomainThrottle { }); } - /// Get next identifier ready for processing (round-robin, not in_progress) + /// Get next identifier ready for processing (round-robin, not in_progress). + /// + /// Iterates through the queue starting from round_robin_index, skipping + /// any identifiers that are already in_progress. This ensures fair + /// distribution even when some identifiers have active fetches. pub fn next_ready_identifier(&mut self) -> Option { - // Find first identifier that's not in_progress - for _ in 0..self.waiting_queue.len() { - if let Some(identifier) = self.waiting_queue.pop_front() { - if let Some(state) = self.identifier_state.get_mut(&identifier) { - if !state.in_progress { - state.in_progress = true; - self.waiting_queue.push_back(identifier.clone()); // Re-add for round-robin - return Some(identifier); - } + let len = self.queue.len(); + if len == 0 { + return None; + } + + // Try each identifier starting from round_robin_index + for i in 0..len { + let index = (self.round_robin_index + i) % len; + if let Some((identifier, state)) = self.queue.get_index_mut(index) { + if !state.in_progress { + state.in_progress = true; + self.round_robin_index = (index + 1) % len; + return Some(identifier.clone()); } - // Put it back if in_progress - self.waiting_queue.push_back(identifier); } } - None + + None // All identifiers are in_progress } /// Get tried URLs for an identifier pub fn get_tried_urls(&self, identifier: &str) -> HashSet { - self.identifier_state + self.queue .get(identifier) .map(|s| s.tried_urls.clone()) .unwrap_or_default() @@ -515,22 +544,32 @@ impl DomainThrottle { /// Mark a URL as tried for an identifier pub fn mark_url_tried(&mut self, identifier: &str, url: String) { - if let Some(state) = self.identifier_state.get_mut(identifier) { + if let Some(state) = self.queue.get_mut(identifier) { state.tried_urls.insert(url); } } /// Mark identifier as not in progress (fetch completed) pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) { - if let Some(state) = self.identifier_state.get_mut(identifier) { + if let Some(state) = self.queue.get_mut(identifier) { state.in_progress = false; } } /// Remove an identifier from the queue entirely pub fn remove_identifier(&mut self, identifier: &str) { - self.identifier_state.remove(identifier); - self.waiting_queue.retain(|id| id != identifier); + if let Some((index, _, _)) = self.queue.shift_remove_full(identifier) { + // Adjust round_robin_index if we removed an entry before it + if index < self.round_robin_index && self.round_robin_index > 0 { + self.round_robin_index -= 1; + } + // Clamp to valid range + if !self.queue.is_empty() { + self.round_robin_index = self.round_robin_index % self.queue.len(); + } else { + self.round_robin_index = 0; + } + } } } ``` @@ -761,7 +800,7 @@ pub async fn sync_identifier_from_url( ctx: &C, identifier: &str, url: &str, - throttle_manager: &ThrottleManager, + throttle_manager: &Arc, ) -> usize { let domain = match extract_domain(url) { Some(d) => d, @@ -847,7 +886,7 @@ pub async fn sync_identifier_from_url( pub async fn sync_identifier( ctx: &C, identifier: &str, - throttle_manager: &ThrottleManager, + throttle_manager: &Arc, ) -> bool { let mut tried_urls: HashSet = HashSet::new(); @@ -1292,6 +1331,56 @@ mod tests { // Now id1 should be available again assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); } + + #[tokio::test] + async fn test_domain_throttle_remove_adjusts_index() { + let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); + + throttle.enqueue_identifier("id1".to_string(), HashSet::new()); + throttle.enqueue_identifier("id2".to_string(), HashSet::new()); + throttle.enqueue_identifier("id3".to_string(), HashSet::new()); + + // Advance to id2 + assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); + throttle.mark_identifier_not_in_progress("id1"); + + // Remove id1 (before current index) + throttle.remove_identifier("id1"); + + // Should continue with id2 (not skip to id3) + assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string())); + } + + #[tokio::test] + async fn test_domain_throttle_has_queued_work() { + let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); + + assert!(!throttle.has_queued_work()); + + throttle.enqueue_identifier("id1".to_string(), HashSet::new()); + assert!(throttle.has_queued_work()); + + throttle.remove_identifier("id1"); + assert!(!throttle.has_queued_work()); + } + + #[tokio::test] + async fn test_domain_throttle_tried_urls_merge() { + let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); + + let mut urls1 = HashSet::new(); + urls1.insert("url1".to_string()); + throttle.enqueue_identifier("id1".to_string(), urls1); + + // Enqueue again with different tried URLs - should merge + let mut urls2 = HashSet::new(); + urls2.insert("url2".to_string()); + throttle.enqueue_identifier("id1".to_string(), urls2); + + let tried = throttle.get_tried_urls("id1"); + assert!(tried.contains("url1")); + assert!(tried.contains("url2")); + } } ``` @@ -1308,7 +1397,7 @@ mod tests { 1. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) 2. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests 3. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` -4. **Phase 4**: Implement ThrottleManager domain loops +4. **Phase 4**: Implement ThrottleManager trigger-based processing 5. **Phase 5**: Add PR event syncing 6. **Phase 6**: Remove old `start_state_sync` code -- cgit v1.2.3