diff options
| -rw-r--r-- | docs/explanation/purgatory-sync-redesign.md | 299 |
1 files changed, 194 insertions, 105 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index 7501da2..382f683 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md | |||
| @@ -94,19 +94,21 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**, | |||
| 94 | │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ | 94 | │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ |
| 95 | │ │ │ │ DomainThrottle (per domain) │ │ │ │ | 95 | │ │ │ │ DomainThrottle (per domain) │ │ │ │ |
| 96 | │ │ │ │ │ │ │ │ | 96 | │ │ │ │ │ │ │ │ |
| 97 | │ │ │ │ Rate limiting: │ Waiting queue: │ │ │ │ | 97 | │ │ │ │ Rate limiting: │ Queue (IndexMap for ordering): │ │ │ │ |
| 98 | │ │ │ │ - in_flight: u32 │ - waiting_queue: VecDeque │ │ │ │ | 98 | │ │ │ │ - in_flight: u32 │ - queue: IndexMap<id, State> │ │ │ │ |
| 99 | │ │ │ │ - request_times │ - identifier_state: HashMap │ │ │ │ | 99 | │ │ │ │ - request_times │ - State: tried_urls, │ │ │ │ |
| 100 | │ │ │ │ │ - tried_urls (this domain)│ │ │ │ | 100 | │ │ │ │ - round_robin_index │ in_progress │ │ │ │ |
| 101 | │ │ │ │ │ - in_progress │ │ │ │ | ||
| 102 | │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ | 101 | │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ |
| 103 | │ │ │ │ │ │ | 102 | │ │ │ │ │ │ |
| 104 | │ │ │ Domain Loop (per domain, runs independently): │ │ │ | 103 | │ │ │ Trigger-based processing (no polling loop): │ │ │ |
| 105 | │ │ │ 1. Wait for capacity │ │ │ | 104 | │ │ │ - enqueue_identifier() triggers if capacity available │ │ │ |
| 106 | │ │ │ 2. Pick next identifier (round-robin, not in_progress) │ │ │ | 105 | │ │ │ - complete_request() triggers next item if capacity available │ │ │ |
| 107 | │ │ │ 3. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │ | 106 | │ │ │ │ │ │ |
| 108 | │ │ │ 4. If url: sync_identifier_from_url(url), mark tried │ │ │ | 107 | │ │ │ process_queued_identifier(): │ │ │ |
| 109 | │ │ │ Else: remove identifier from queue │ │ │ | 108 | │ │ │ 1. Pick next identifier (round-robin, not in_progress) │ │ │ |
| 109 | │ │ │ 2. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │ | ||
| 110 | │ │ │ 3. If url: sync_identifier_from_url(url), mark tried │ │ │ | ||
| 111 | │ │ │ Else: remove identifier from queue, try next │ │ │ | ||
| 110 | │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ | 112 | │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ |
| 111 | │ │ │ │ | 113 | │ │ │ │ |
| 112 | │ └────────────────────────────────────────────────────────────────────────────┘ │ | 114 | │ └────────────────────────────────────────────────────────────────────────────┘ │ |
| @@ -157,12 +159,12 @@ The `domain` parameter determines behavior: | |||
| 157 | - When no non-throttled URLs remain: enqueue with throttled domains, return | 159 | - When no non-throttled URLs remain: enqueue with throttled domains, return |
| 158 | - When task completes: apply backoff or remove from queue | 160 | - When task completes: apply backoff or remove from queue |
| 159 | 161 | ||
| 160 | 3. **ThrottleManager / DomainThrottle loops** (independent): | 162 | 3. **ThrottleManager / DomainThrottle** (trigger-based, no polling): |
| 161 | - Each domain has its own loop checking for capacity | 163 | - Processing triggered by `enqueue_identifier()` or `complete_request()` |
| 162 | - When capacity available: pick next queued identifier (round-robin, not in_progress) | 164 | - When triggered and capacity available: pick next queued identifier (round-robin, not in_progress) |
| 163 | - Call `sync_identifier_next_url(domain=Some(this_domain))` | 165 | - Call `sync_identifier_next_url(domain=Some(this_domain))` |
| 164 | - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress | 166 | - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress |
| 165 | - If no URL: remove identifier from this domain's queue | 167 | - If no URL: remove identifier from queue, try next identifier |
| 166 | 168 | ||
| 167 | ## Data Structures | 169 | ## Data Structures |
| 168 | 170 | ||
| @@ -234,12 +236,16 @@ Manages all per-domain throttles and provides the interface for checking throttl | |||
| 234 | /// | 236 | /// |
| 235 | /// Owns a collection of DomainThrottle instances and provides: | 237 | /// Owns a collection of DomainThrottle instances and provides: |
| 236 | /// - Throttle status checking for sync_identifier_next_url | 238 | /// - Throttle status checking for sync_identifier_next_url |
| 237 | /// - Domain throttle loop spawning | ||
| 238 | /// - Identifier queue management | 239 | /// - Identifier queue management |
| 240 | /// - Trigger-based processing when capacity frees up | ||
| 239 | pub struct ThrottleManager { | 241 | pub struct ThrottleManager { |
| 240 | /// Per-domain throttle state | 242 | /// Per-domain throttle state |
| 241 | throttles: DashMap<String, DomainThrottle>, | 243 | throttles: DashMap<String, DomainThrottle>, |
| 242 | 244 | ||
| 245 | /// Sync context for processing queued identifiers | ||
| 246 | /// Set once at startup via set_context() | ||
| 247 | ctx: OnceLock<Arc<dyn SyncContext>>, | ||
| 248 | |||
| 243 | /// Configuration | 249 | /// Configuration |
| 244 | max_concurrent_per_domain: u32, | 250 | max_concurrent_per_domain: u32, |
| 245 | max_per_minute_per_domain: u32, | 251 | max_per_minute_per_domain: u32, |
| @@ -249,11 +255,17 @@ impl ThrottleManager { | |||
| 249 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { | 255 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { |
| 250 | Self { | 256 | Self { |
| 251 | throttles: DashMap::new(), | 257 | throttles: DashMap::new(), |
| 258 | ctx: OnceLock::new(), | ||
| 252 | max_concurrent_per_domain: max_concurrent, | 259 | max_concurrent_per_domain: max_concurrent, |
| 253 | max_per_minute_per_domain: max_per_minute, | 260 | max_per_minute_per_domain: max_per_minute, |
| 254 | } | 261 | } |
| 255 | } | 262 | } |
| 256 | 263 | ||
| 264 | /// Set the sync context (called once at startup) | ||
| 265 | pub fn set_context(&self, ctx: Arc<dyn SyncContext>) { | ||
| 266 | let _ = self.ctx.set(ctx); | ||
| 267 | } | ||
| 268 | |||
| 257 | /// Check if a domain is currently throttled (at capacity) | 269 | /// Check if a domain is currently throttled (at capacity) |
| 258 | pub fn is_throttled(&self, domain: &str) -> bool { | 270 | pub fn is_throttled(&self, domain: &str) -> bool { |
| 259 | self.throttles | 271 | self.throttles |
| @@ -277,68 +289,69 @@ impl ThrottleManager { | |||
| 277 | self.get_or_create(domain).start_request(); | 289 | self.get_or_create(domain).start_request(); |
| 278 | } | 290 | } |
| 279 | 291 | ||
| 280 | /// Record that a request completed for a domain | 292 | /// Record that a request completed for a domain. |
| 281 | pub fn complete_request(&self, domain: &str) { | 293 | /// Triggers processing of next queued identifier if capacity available. |
| 282 | if let Some(mut throttle) = self.throttles.get_mut(domain) { | 294 | pub fn complete_request(self: &Arc<Self>, domain: &str) { |
| 283 | throttle.complete_request(); | 295 | let should_trigger = { |
| 296 | if let Some(mut throttle) = self.throttles.get_mut(domain) { | ||
| 297 | throttle.complete_request(); | ||
| 298 | throttle.has_capacity() && throttle.has_queued_work() | ||
| 299 | } else { | ||
| 300 | false | ||
| 301 | } | ||
| 302 | }; | ||
| 303 | |||
| 304 | if should_trigger { | ||
| 305 | self.try_process_next(domain); | ||
| 284 | } | 306 | } |
| 285 | } | 307 | } |
| 286 | 308 | ||
| 287 | /// Add an identifier to a domain's waiting queue | 309 | /// Add an identifier to a domain's waiting queue. |
| 310 | /// Triggers processing if capacity is available. | ||
| 288 | pub fn enqueue_identifier( | 311 | pub fn enqueue_identifier( |
| 289 | &self, | 312 | self: &Arc<Self>, |
| 290 | domain: &str, | 313 | domain: &str, |
| 291 | identifier: String, | 314 | identifier: String, |
| 292 | tried_urls_for_domain: HashSet<String>, | 315 | tried_urls_for_domain: HashSet<String>, |
| 293 | ) { | 316 | ) { |
| 294 | self.get_or_create(domain) | 317 | let should_trigger = { |
| 295 | .enqueue_identifier(identifier, tried_urls_for_domain); | 318 | let mut throttle = self.get_or_create(domain); |
| 319 | throttle.enqueue_identifier(identifier, tried_urls_for_domain); | ||
| 320 | throttle.has_capacity() | ||
| 321 | }; | ||
| 322 | |||
| 323 | if should_trigger { | ||
| 324 | self.try_process_next(domain); | ||
| 325 | } | ||
| 296 | } | 326 | } |
| 297 | 327 | ||
| 298 | /// Start the throttle loop for all domains (called once at startup) | 328 | /// Try to process the next queued identifier for a domain |
| 299 | pub fn start_domain_loops<C: SyncContext + 'static>( | 329 | fn try_process_next(self: &Arc<Self>, domain: &str) { |
| 300 | self: Arc<Self>, | 330 | let identifier = { |
| 301 | ctx: Arc<C>, | 331 | if let Some(mut throttle) = self.throttles.get_mut(domain) { |
| 302 | ) -> Vec<tokio::task::JoinHandle<()>> { | 332 | throttle.next_ready_identifier() |
| 303 | // Spawn a single coordinator loop that manages all domains | 333 | } else { |
| 304 | vec![tokio::spawn(async move { | 334 | None |
| 305 | let mut interval = tokio::time::interval(Duration::from_millis(100)); | ||
| 306 | |||
| 307 | loop { | ||
| 308 | interval.tick().await; | ||
| 309 | |||
| 310 | // Check each domain for capacity and queued work | ||
| 311 | for mut entry in self.throttles.iter_mut() { | ||
| 312 | let domain = entry.key().clone(); | ||
| 313 | let throttle = entry.value_mut(); | ||
| 314 | |||
| 315 | if throttle.has_capacity() { | ||
| 316 | if let Some(identifier) = throttle.next_ready_identifier() { | ||
| 317 | let ctx = ctx.clone(); | ||
| 318 | let manager = self.clone(); | ||
| 319 | let domain_clone = domain.clone(); | ||
| 320 | |||
| 321 | tokio::spawn(async move { | ||
| 322 | manager.process_queued_identifier( | ||
| 323 | &ctx, | ||
| 324 | &domain_clone, | ||
| 325 | &identifier, | ||
| 326 | ).await; | ||
| 327 | }); | ||
| 328 | } | ||
| 329 | } | ||
| 330 | } | ||
| 331 | } | 335 | } |
| 332 | })] | 336 | }; |
| 337 | |||
| 338 | if let Some(identifier) = identifier { | ||
| 339 | let manager = self.clone(); | ||
| 340 | let domain = domain.to_string(); | ||
| 341 | |||
| 342 | tokio::spawn(async move { | ||
| 343 | manager.process_queued_identifier(&domain, &identifier).await; | ||
| 344 | }); | ||
| 345 | } | ||
| 333 | } | 346 | } |
| 334 | 347 | ||
| 335 | /// Process a single identifier from a domain's queue | 348 | /// Process a single identifier from a domain's queue |
| 336 | async fn process_queued_identifier<C: SyncContext>( | 349 | async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str) { |
| 337 | &self, | 350 | let ctx = match self.ctx.get() { |
| 338 | ctx: &C, | 351 | Some(ctx) => ctx, |
| 339 | domain: &str, | 352 | None => return, |
| 340 | identifier: &str, | 353 | }; |
| 341 | ) { | 354 | |
| 342 | // Get next URL for this identifier on this domain | 355 | // Get next URL for this identifier on this domain |
| 343 | let url = { | 356 | let url = { |
| 344 | let throttle = match self.throttles.get(domain) { | 357 | let throttle = match self.throttles.get(domain) { |
| @@ -348,7 +361,7 @@ impl ThrottleManager { | |||
| 348 | let tried_urls = throttle.get_tried_urls(identifier); | 361 | let tried_urls = throttle.get_tried_urls(identifier); |
| 349 | 362 | ||
| 350 | sync_identifier_next_url( | 363 | sync_identifier_next_url( |
| 351 | ctx, | 364 | ctx.as_ref(), |
| 352 | identifier, | 365 | identifier, |
| 353 | Some(domain), | 366 | Some(domain), |
| 354 | &tried_urls, | 367 | &tried_urls, |
| @@ -358,10 +371,11 @@ impl ThrottleManager { | |||
| 358 | 371 | ||
| 359 | match url { | 372 | match url { |
| 360 | Some(url) => { | 373 | Some(url) => { |
| 361 | // Fetch from this URL | 374 | // Fetch from this URL (this calls start_request/complete_request internally) |
| 362 | sync_identifier_from_url(ctx, identifier, &url, self).await; | 375 | sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await; |
| 363 | 376 | ||
| 364 | // Record URL as tried | 377 | // Record URL as tried and mark not in_progress |
| 378 | // complete_request() will trigger next item if capacity available | ||
| 365 | if let Some(mut throttle) = self.throttles.get_mut(domain) { | 379 | if let Some(mut throttle) = self.throttles.get_mut(domain) { |
| 366 | throttle.mark_url_tried(identifier, url); | 380 | throttle.mark_url_tried(identifier, url); |
| 367 | throttle.mark_identifier_not_in_progress(identifier); | 381 | throttle.mark_identifier_not_in_progress(identifier); |
| @@ -372,6 +386,8 @@ impl ThrottleManager { | |||
| 372 | if let Some(mut throttle) = self.throttles.get_mut(domain) { | 386 | if let Some(mut throttle) = self.throttles.get_mut(domain) { |
| 373 | throttle.remove_identifier(identifier); | 387 | throttle.remove_identifier(identifier); |
| 374 | } | 388 | } |
| 389 | // Try next identifier since we didn't use any capacity | ||
| 390 | self.try_process_next(domain); | ||
| 375 | } | 391 | } |
| 376 | } | 392 | } |
| 377 | } | 393 | } |
| @@ -387,9 +403,10 @@ Per-domain rate limiting and waiting queue: | |||
| 387 | /// | 403 | /// |
| 388 | /// Handles: | 404 | /// Handles: |
| 389 | /// - Rate limiting (concurrent requests, requests per minute) | 405 | /// - Rate limiting (concurrent requests, requests per minute) |
| 390 | /// - Queue of identifiers waiting for capacity | 406 | /// - Queue of identifiers waiting for capacity (using IndexMap for round-robin order) |
| 391 | /// - Tracking tried URLs per identifier (for this domain only) | 407 | /// - Tracking tried URLs per identifier (for this domain only) |
| 392 | /// - In-progress flag per identifier (prevent concurrent fetches for same identifier) | 408 | /// - In-progress flag per identifier (prevents concurrent fetches for same identifier |
| 409 | /// on this domain, important when queue is small and we have multiple concurrent slots) | ||
| 393 | pub struct DomainThrottle { | 410 | pub struct DomainThrottle { |
| 394 | /// Domain this throttle manages | 411 | /// Domain this throttle manages |
| 395 | domain: String, | 412 | domain: String, |
| @@ -400,12 +417,12 @@ pub struct DomainThrottle { | |||
| 400 | /// Request timestamps (sliding window for rate limiting) | 417 | /// Request timestamps (sliding window for rate limiting) |
| 401 | request_times: VecDeque<Instant>, | 418 | request_times: VecDeque<Instant>, |
| 402 | 419 | ||
| 403 | /// Identifiers waiting for capacity on this domain | 420 | /// Queued identifiers with their state. |
| 404 | /// Stored in order for round-robin processing | 421 | /// IndexMap preserves insertion order for round-robin processing. |
| 405 | waiting_queue: VecDeque<String>, | 422 | queue: IndexMap<String, IdentifierQueueState>, |
| 406 | 423 | ||
| 407 | /// Per-identifier state for queued identifiers | 424 | /// Round-robin index for fair processing across identifiers |
| 408 | identifier_state: HashMap<String, IdentifierQueueState>, | 425 | round_robin_index: usize, |
| 409 | 426 | ||
| 410 | /// Configuration | 427 | /// Configuration |
| 411 | max_concurrent: u32, | 428 | max_concurrent: u32, |
| @@ -418,7 +435,10 @@ struct IdentifierQueueState { | |||
| 418 | /// URLs from this domain that have been tried | 435 | /// URLs from this domain that have been tried |
| 419 | tried_urls: HashSet<String>, | 436 | tried_urls: HashSet<String>, |
| 420 | 437 | ||
| 421 | /// Whether a fetch is currently in progress for this identifier on this domain | 438 | /// Whether a fetch is currently in progress for this identifier on this domain. |
| 439 | /// Prevents starting multiple concurrent fetches for the same identifier, | ||
| 440 | /// which is important when the queue is small (e.g., 2 identifiers with 5 | ||
| 441 | /// concurrent slots would otherwise try to process the same identifier multiple times). | ||
| 422 | in_progress: bool, | 442 | in_progress: bool, |
| 423 | } | 443 | } |
| 424 | 444 | ||
| @@ -428,8 +448,8 @@ impl DomainThrottle { | |||
| 428 | domain, | 448 | domain, |
| 429 | in_flight: 0, | 449 | in_flight: 0, |
| 430 | request_times: VecDeque::new(), | 450 | request_times: VecDeque::new(), |
| 431 | waiting_queue: VecDeque::new(), | 451 | queue: IndexMap::new(), |
| 432 | identifier_state: HashMap::new(), | 452 | round_robin_index: 0, |
| 433 | max_concurrent, | 453 | max_concurrent, |
| 434 | max_per_minute, | 454 | max_per_minute, |
| 435 | } | 455 | } |
| @@ -451,6 +471,11 @@ impl DomainThrottle { | |||
| 451 | recent_count < self.max_per_minute as usize | 471 | recent_count < self.max_per_minute as usize |
| 452 | } | 472 | } |
| 453 | 473 | ||
| 474 | /// Check if there are any identifiers in the queue | ||
| 475 | pub fn has_queued_work(&self) -> bool { | ||
| 476 | !self.queue.is_empty() | ||
| 477 | } | ||
| 478 | |||
| 454 | /// Record that a request is starting | 479 | /// Record that a request is starting |
| 455 | pub fn start_request(&mut self) { | 480 | pub fn start_request(&mut self) { |
| 456 | self.in_flight += 1; | 481 | self.in_flight += 1; |
| @@ -469,15 +494,12 @@ impl DomainThrottle { | |||
| 469 | } | 494 | } |
| 470 | } | 495 | } |
| 471 | 496 | ||
| 472 | /// Add an identifier to the waiting queue | 497 | /// Add an identifier to the queue |
| 473 | pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) { | 498 | pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) { |
| 474 | if !self.identifier_state.contains_key(&identifier) { | 499 | self.queue |
| 475 | self.waiting_queue.push_back(identifier.clone()); | ||
| 476 | } | ||
| 477 | // Update or insert state (merge tried_urls if already exists) | ||
| 478 | self.identifier_state | ||
| 479 | .entry(identifier) | 500 | .entry(identifier) |
| 480 | .and_modify(|state| { | 501 | .and_modify(|state| { |
| 502 | // Merge tried_urls if already exists | ||
| 481 | state.tried_urls.extend(tried_urls.iter().cloned()); | 503 | state.tried_urls.extend(tried_urls.iter().cloned()); |
| 482 | }) | 504 | }) |
| 483 | .or_insert(IdentifierQueueState { | 505 | .or_insert(IdentifierQueueState { |
| @@ -486,28 +508,35 @@ impl DomainThrottle { | |||
| 486 | }); | 508 | }); |
| 487 | } | 509 | } |
| 488 | 510 | ||
| 489 | /// Get next identifier ready for processing (round-robin, not in_progress) | 511 | /// Get next identifier ready for processing (round-robin, not in_progress). |
| 512 | /// | ||
| 513 | /// Iterates through the queue starting from round_robin_index, skipping | ||
| 514 | /// any identifiers that are already in_progress. This ensures fair | ||
| 515 | /// distribution even when some identifiers have active fetches. | ||
| 490 | pub fn next_ready_identifier(&mut self) -> Option<String> { | 516 | pub fn next_ready_identifier(&mut self) -> Option<String> { |
| 491 | // Find first identifier that's not in_progress | 517 | let len = self.queue.len(); |
| 492 | for _ in 0..self.waiting_queue.len() { | 518 | if len == 0 { |
| 493 | if let Some(identifier) = self.waiting_queue.pop_front() { | 519 | return None; |
| 494 | if let Some(state) = self.identifier_state.get_mut(&identifier) { | 520 | } |
| 495 | if !state.in_progress { | 521 | |
| 496 | state.in_progress = true; | 522 | // Try each identifier starting from round_robin_index |
| 497 | self.waiting_queue.push_back(identifier.clone()); // Re-add for round-robin | 523 | for i in 0..len { |
| 498 | return Some(identifier); | 524 | let index = (self.round_robin_index + i) % len; |
| 499 | } | 525 | if let Some((identifier, state)) = self.queue.get_index_mut(index) { |
| 526 | if !state.in_progress { | ||
| 527 | state.in_progress = true; | ||
| 528 | self.round_robin_index = (index + 1) % len; | ||
| 529 | return Some(identifier.clone()); | ||
| 500 | } | 530 | } |
| 501 | // Put it back if in_progress | ||
| 502 | self.waiting_queue.push_back(identifier); | ||
| 503 | } | 531 | } |
| 504 | } | 532 | } |
| 505 | None | 533 | |
| 534 | None // All identifiers are in_progress | ||
| 506 | } | 535 | } |
| 507 | 536 | ||
| 508 | /// Get tried URLs for an identifier | 537 | /// Get tried URLs for an identifier |
| 509 | pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> { | 538 | pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> { |
| 510 | self.identifier_state | 539 | self.queue |
| 511 | .get(identifier) | 540 | .get(identifier) |
| 512 | .map(|s| s.tried_urls.clone()) | 541 | .map(|s| s.tried_urls.clone()) |
| 513 | .unwrap_or_default() | 542 | .unwrap_or_default() |
| @@ -515,22 +544,32 @@ impl DomainThrottle { | |||
| 515 | 544 | ||
| 516 | /// Mark a URL as tried for an identifier | 545 | /// Mark a URL as tried for an identifier |
| 517 | pub fn mark_url_tried(&mut self, identifier: &str, url: String) { | 546 | pub fn mark_url_tried(&mut self, identifier: &str, url: String) { |
| 518 | if let Some(state) = self.identifier_state.get_mut(identifier) { | 547 | if let Some(state) = self.queue.get_mut(identifier) { |
| 519 | state.tried_urls.insert(url); | 548 | state.tried_urls.insert(url); |
| 520 | } | 549 | } |
| 521 | } | 550 | } |
| 522 | 551 | ||
| 523 | /// Mark identifier as not in progress (fetch completed) | 552 | /// Mark identifier as not in progress (fetch completed) |
| 524 | pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) { | 553 | pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) { |
| 525 | if let Some(state) = self.identifier_state.get_mut(identifier) { | 554 | if let Some(state) = self.queue.get_mut(identifier) { |
| 526 | state.in_progress = false; | 555 | state.in_progress = false; |
| 527 | } | 556 | } |
| 528 | } | 557 | } |
| 529 | 558 | ||
| 530 | /// Remove an identifier from the queue entirely | 559 | /// Remove an identifier from the queue entirely |
| 531 | pub fn remove_identifier(&mut self, identifier: &str) { | 560 | pub fn remove_identifier(&mut self, identifier: &str) { |
| 532 | self.identifier_state.remove(identifier); | 561 | if let Some((index, _, _)) = self.queue.shift_remove_full(identifier) { |
| 533 | self.waiting_queue.retain(|id| id != identifier); | 562 | // Adjust round_robin_index if we removed an entry before it |
| 563 | if index < self.round_robin_index && self.round_robin_index > 0 { | ||
| 564 | self.round_robin_index -= 1; | ||
| 565 | } | ||
| 566 | // Clamp to valid range | ||
| 567 | if !self.queue.is_empty() { | ||
| 568 | self.round_robin_index = self.round_robin_index % self.queue.len(); | ||
| 569 | } else { | ||
| 570 | self.round_robin_index = 0; | ||
| 571 | } | ||
| 572 | } | ||
| 534 | } | 573 | } |
| 535 | } | 574 | } |
| 536 | ``` | 575 | ``` |
| @@ -761,7 +800,7 @@ pub async fn sync_identifier_from_url<C: SyncContext>( | |||
| 761 | ctx: &C, | 800 | ctx: &C, |
| 762 | identifier: &str, | 801 | identifier: &str, |
| 763 | url: &str, | 802 | url: &str, |
| 764 | throttle_manager: &ThrottleManager, | 803 | throttle_manager: &Arc<ThrottleManager>, |
| 765 | ) -> usize { | 804 | ) -> usize { |
| 766 | let domain = match extract_domain(url) { | 805 | let domain = match extract_domain(url) { |
| 767 | Some(d) => d, | 806 | Some(d) => d, |
| @@ -847,7 +886,7 @@ pub async fn sync_identifier_from_url<C: SyncContext>( | |||
| 847 | pub async fn sync_identifier<C: SyncContext>( | 886 | pub async fn sync_identifier<C: SyncContext>( |
| 848 | ctx: &C, | 887 | ctx: &C, |
| 849 | identifier: &str, | 888 | identifier: &str, |
| 850 | throttle_manager: &ThrottleManager, | 889 | throttle_manager: &Arc<ThrottleManager>, |
| 851 | ) -> bool { | 890 | ) -> bool { |
| 852 | let mut tried_urls: HashSet<String> = HashSet::new(); | 891 | let mut tried_urls: HashSet<String> = HashSet::new(); |
| 853 | 892 | ||
| @@ -1292,6 +1331,56 @@ mod tests { | |||
| 1292 | // Now id1 should be available again | 1331 | // Now id1 should be available again |
| 1293 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | 1332 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); |
| 1294 | } | 1333 | } |
| 1334 | |||
| 1335 | #[tokio::test] | ||
| 1336 | async fn test_domain_throttle_remove_adjusts_index() { | ||
| 1337 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | ||
| 1338 | |||
| 1339 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | ||
| 1340 | throttle.enqueue_identifier("id2".to_string(), HashSet::new()); | ||
| 1341 | throttle.enqueue_identifier("id3".to_string(), HashSet::new()); | ||
| 1342 | |||
| 1343 | // Advance to id2 | ||
| 1344 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | ||
| 1345 | throttle.mark_identifier_not_in_progress("id1"); | ||
| 1346 | |||
| 1347 | // Remove id1 (before current index) | ||
| 1348 | throttle.remove_identifier("id1"); | ||
| 1349 | |||
| 1350 | // Should continue with id2 (not skip to id3) | ||
| 1351 | assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string())); | ||
| 1352 | } | ||
| 1353 | |||
| 1354 | #[tokio::test] | ||
| 1355 | async fn test_domain_throttle_has_queued_work() { | ||
| 1356 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | ||
| 1357 | |||
| 1358 | assert!(!throttle.has_queued_work()); | ||
| 1359 | |||
| 1360 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | ||
| 1361 | assert!(throttle.has_queued_work()); | ||
| 1362 | |||
| 1363 | throttle.remove_identifier("id1"); | ||
| 1364 | assert!(!throttle.has_queued_work()); | ||
| 1365 | } | ||
| 1366 | |||
| 1367 | #[tokio::test] | ||
| 1368 | async fn test_domain_throttle_tried_urls_merge() { | ||
| 1369 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | ||
| 1370 | |||
| 1371 | let mut urls1 = HashSet::new(); | ||
| 1372 | urls1.insert("url1".to_string()); | ||
| 1373 | throttle.enqueue_identifier("id1".to_string(), urls1); | ||
| 1374 | |||
| 1375 | // Enqueue again with different tried URLs - should merge | ||
| 1376 | let mut urls2 = HashSet::new(); | ||
| 1377 | urls2.insert("url2".to_string()); | ||
| 1378 | throttle.enqueue_identifier("id1".to_string(), urls2); | ||
| 1379 | |||
| 1380 | let tried = throttle.get_tried_urls("id1"); | ||
| 1381 | assert!(tried.contains("url1")); | ||
| 1382 | assert!(tried.contains("url2")); | ||
| 1383 | } | ||
| 1295 | } | 1384 | } |
| 1296 | ``` | 1385 | ``` |
| 1297 | 1386 | ||
| @@ -1308,7 +1397,7 @@ mod tests { | |||
| 1308 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) | 1397 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) |
| 1309 | 2. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests | 1398 | 2. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests |
| 1310 | 3. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` | 1399 | 3. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` |
| 1311 | 4. **Phase 4**: Implement ThrottleManager domain loops | 1400 | 4. **Phase 4**: Implement ThrottleManager trigger-based processing |
| 1312 | 5. **Phase 5**: Add PR event syncing | 1401 | 5. **Phase 5**: Add PR event syncing |
| 1313 | 6. **Phase 6**: Remove old `start_state_sync` code | 1402 | 6. **Phase 6**: Remove old `start_state_sync` code |
| 1314 | 1403 | ||