upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-06 17:15:16 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-06 17:15:16 +0000
commit33b716cf9eb88d95394423b77b779009056d4d5a (patch)
tree815bd0c3157802074db5778540e3330afe4f58cd /docs
parent0da412a255858d35cc2eb85158856abc736a236b (diff)
docs: purgatory design improvements
Diffstat (limited to 'docs')
-rw-r--r--docs/explanation/purgatory-sync-redesign.md299
1 files changed, 194 insertions, 105 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md
index 7501da2..382f683 100644
--- a/docs/explanation/purgatory-sync-redesign.md
+++ b/docs/explanation/purgatory-sync-redesign.md
@@ -94,19 +94,21 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**,
94│ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 94│ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │
95│ │ │ │ DomainThrottle (per domain) │ │ │ │ 95│ │ │ │ DomainThrottle (per domain) │ │ │ │
96│ │ │ │ │ │ │ │ 96│ │ │ │ │ │ │ │
97│ │ │ │ Rate limiting: │ Waiting queue: │ │ │ │ 97│ │ │ │ Rate limiting: │ Queue (IndexMap for ordering): │ │ │ │
98│ │ │ │ - in_flight: u32 │ - waiting_queue: VecDeque │ │ │ │ 98│ │ │ │ - in_flight: u32 │ - queue: IndexMap<id, State> │ │ │ │
99│ │ │ │ - request_times │ - identifier_state: HashMap │ │ │ │ 99│ │ │ │ - request_times │ - State: tried_urls, │ │ │ │
100│ │ │ │ │ - tried_urls (this domain)│ │ │ │ 100│ │ │ │ - round_robin_index │ in_progress │ │ │ │
101│ │ │ │ │ - in_progress │ │ │ │
102│ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ 101│ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │
103│ │ │ │ │ │ 102│ │ │ │ │ │
104│ │ │ Domain Loop (per domain, runs independently): │ │ │ 103│ │ │ Trigger-based processing (no polling loop): │ │ │
105│ │ │ 1. Wait for capacity │ │ │ 104│ │ │ - enqueue_identifier() triggers if capacity available │ │ │
106│ │ │ 2. Pick next identifier (round-robin, not in_progress) │ │ │ 105│ │ │ - complete_request() triggers next item if capacity available │ │ │
107│ │ │ 3. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │ 106│ │ │ │ │ │
108│ │ │ 4. If url: sync_identifier_from_url(url), mark tried │ │ │ 107│ │ │ process_queued_identifier(): │ │ │
109│ │ │ Else: remove identifier from queue │ │ │ 108│ │ │ 1. Pick next identifier (round-robin, not in_progress) │ │ │
109│ │ │ 2. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │
110│ │ │ 3. If url: sync_identifier_from_url(url), mark tried │ │ │
111│ │ │ Else: remove identifier from queue, try next │ │ │
110│ │ └─────────────────────────────────────────────────────────────────────┘ │ │ 112│ │ └─────────────────────────────────────────────────────────────────────┘ │ │
111│ │ │ │ 113│ │ │ │
112│ └────────────────────────────────────────────────────────────────────────────┘ │ 114│ └────────────────────────────────────────────────────────────────────────────┘ │
@@ -157,12 +159,12 @@ The `domain` parameter determines behavior:
157 - When no non-throttled URLs remain: enqueue with throttled domains, return 159 - When no non-throttled URLs remain: enqueue with throttled domains, return
158 - When task completes: apply backoff or remove from queue 160 - When task completes: apply backoff or remove from queue
159 161
1603. **ThrottleManager / DomainThrottle loops** (independent): 1623. **ThrottleManager / DomainThrottle** (trigger-based, no polling):
161 - Each domain has its own loop checking for capacity 163 - Processing triggered by `enqueue_identifier()` or `complete_request()`
162 - When capacity available: pick next queued identifier (round-robin, not in_progress) 164 - When triggered and capacity available: pick next queued identifier (round-robin, not in_progress)
163 - Call `sync_identifier_next_url(domain=Some(this_domain))` 165 - Call `sync_identifier_next_url(domain=Some(this_domain))`
164 - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress 166 - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress
165 - If no URL: remove identifier from this domain's queue 167 - If no URL: remove identifier from queue, try next identifier
166 168
167## Data Structures 169## Data Structures
168 170
@@ -234,12 +236,16 @@ Manages all per-domain throttles and provides the interface for checking throttl
234/// 236///
235/// Owns a collection of DomainThrottle instances and provides: 237/// Owns a collection of DomainThrottle instances and provides:
236/// - Throttle status checking for sync_identifier_next_url 238/// - Throttle status checking for sync_identifier_next_url
237/// - Domain throttle loop spawning
238/// - Identifier queue management 239/// - Identifier queue management
240/// - Trigger-based processing when capacity frees up
239pub struct ThrottleManager { 241pub struct ThrottleManager {
240 /// Per-domain throttle state 242 /// Per-domain throttle state
241 throttles: DashMap<String, DomainThrottle>, 243 throttles: DashMap<String, DomainThrottle>,
242 244
245 /// Sync context for processing queued identifiers
246 /// Set once at startup via set_context()
247 ctx: OnceLock<Arc<dyn SyncContext>>,
248
243 /// Configuration 249 /// Configuration
244 max_concurrent_per_domain: u32, 250 max_concurrent_per_domain: u32,
245 max_per_minute_per_domain: u32, 251 max_per_minute_per_domain: u32,
@@ -249,11 +255,17 @@ impl ThrottleManager {
249 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { 255 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self {
250 Self { 256 Self {
251 throttles: DashMap::new(), 257 throttles: DashMap::new(),
258 ctx: OnceLock::new(),
252 max_concurrent_per_domain: max_concurrent, 259 max_concurrent_per_domain: max_concurrent,
253 max_per_minute_per_domain: max_per_minute, 260 max_per_minute_per_domain: max_per_minute,
254 } 261 }
255 } 262 }
256 263
264 /// Set the sync context (called once at startup)
265 pub fn set_context(&self, ctx: Arc<dyn SyncContext>) {
266 let _ = self.ctx.set(ctx);
267 }
268
257 /// Check if a domain is currently throttled (at capacity) 269 /// Check if a domain is currently throttled (at capacity)
258 pub fn is_throttled(&self, domain: &str) -> bool { 270 pub fn is_throttled(&self, domain: &str) -> bool {
259 self.throttles 271 self.throttles
@@ -277,68 +289,69 @@ impl ThrottleManager {
277 self.get_or_create(domain).start_request(); 289 self.get_or_create(domain).start_request();
278 } 290 }
279 291
280 /// Record that a request completed for a domain 292 /// Record that a request completed for a domain.
281 pub fn complete_request(&self, domain: &str) { 293 /// Triggers processing of next queued identifier if capacity available.
282 if let Some(mut throttle) = self.throttles.get_mut(domain) { 294 pub fn complete_request(self: &Arc<Self>, domain: &str) {
283 throttle.complete_request(); 295 let should_trigger = {
296 if let Some(mut throttle) = self.throttles.get_mut(domain) {
297 throttle.complete_request();
298 throttle.has_capacity() && throttle.has_queued_work()
299 } else {
300 false
301 }
302 };
303
304 if should_trigger {
305 self.try_process_next(domain);
284 } 306 }
285 } 307 }
286 308
287 /// Add an identifier to a domain's waiting queue 309 /// Add an identifier to a domain's waiting queue.
310 /// Triggers processing if capacity is available.
288 pub fn enqueue_identifier( 311 pub fn enqueue_identifier(
289 &self, 312 self: &Arc<Self>,
290 domain: &str, 313 domain: &str,
291 identifier: String, 314 identifier: String,
292 tried_urls_for_domain: HashSet<String>, 315 tried_urls_for_domain: HashSet<String>,
293 ) { 316 ) {
294 self.get_or_create(domain) 317 let should_trigger = {
295 .enqueue_identifier(identifier, tried_urls_for_domain); 318 let mut throttle = self.get_or_create(domain);
319 throttle.enqueue_identifier(identifier, tried_urls_for_domain);
320 throttle.has_capacity()
321 };
322
323 if should_trigger {
324 self.try_process_next(domain);
325 }
296 } 326 }
297 327
298 /// Start the throttle loop for all domains (called once at startup) 328 /// Try to process the next queued identifier for a domain
299 pub fn start_domain_loops<C: SyncContext + 'static>( 329 fn try_process_next(self: &Arc<Self>, domain: &str) {
300 self: Arc<Self>, 330 let identifier = {
301 ctx: Arc<C>, 331 if let Some(mut throttle) = self.throttles.get_mut(domain) {
302 ) -> Vec<tokio::task::JoinHandle<()>> { 332 throttle.next_ready_identifier()
303 // Spawn a single coordinator loop that manages all domains 333 } else {
304 vec![tokio::spawn(async move { 334 None
305 let mut interval = tokio::time::interval(Duration::from_millis(100));
306
307 loop {
308 interval.tick().await;
309
310 // Check each domain for capacity and queued work
311 for mut entry in self.throttles.iter_mut() {
312 let domain = entry.key().clone();
313 let throttle = entry.value_mut();
314
315 if throttle.has_capacity() {
316 if let Some(identifier) = throttle.next_ready_identifier() {
317 let ctx = ctx.clone();
318 let manager = self.clone();
319 let domain_clone = domain.clone();
320
321 tokio::spawn(async move {
322 manager.process_queued_identifier(
323 &ctx,
324 &domain_clone,
325 &identifier,
326 ).await;
327 });
328 }
329 }
330 }
331 } 335 }
332 })] 336 };
337
338 if let Some(identifier) = identifier {
339 let manager = self.clone();
340 let domain = domain.to_string();
341
342 tokio::spawn(async move {
343 manager.process_queued_identifier(&domain, &identifier).await;
344 });
345 }
333 } 346 }
334 347
335 /// Process a single identifier from a domain's queue 348 /// Process a single identifier from a domain's queue
336 async fn process_queued_identifier<C: SyncContext>( 349 async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str) {
337 &self, 350 let ctx = match self.ctx.get() {
338 ctx: &C, 351 Some(ctx) => ctx,
339 domain: &str, 352 None => return,
340 identifier: &str, 353 };
341 ) { 354
342 // Get next URL for this identifier on this domain 355 // Get next URL for this identifier on this domain
343 let url = { 356 let url = {
344 let throttle = match self.throttles.get(domain) { 357 let throttle = match self.throttles.get(domain) {
@@ -348,7 +361,7 @@ impl ThrottleManager {
348 let tried_urls = throttle.get_tried_urls(identifier); 361 let tried_urls = throttle.get_tried_urls(identifier);
349 362
350 sync_identifier_next_url( 363 sync_identifier_next_url(
351 ctx, 364 ctx.as_ref(),
352 identifier, 365 identifier,
353 Some(domain), 366 Some(domain),
354 &tried_urls, 367 &tried_urls,
@@ -358,10 +371,11 @@ impl ThrottleManager {
358 371
359 match url { 372 match url {
360 Some(url) => { 373 Some(url) => {
361 // Fetch from this URL 374 // Fetch from this URL (this calls start_request/complete_request internally)
362 sync_identifier_from_url(ctx, identifier, &url, self).await; 375 sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await;
363 376
364 // Record URL as tried 377 // Record URL as tried and mark not in_progress
378 // complete_request() will trigger next item if capacity available
365 if let Some(mut throttle) = self.throttles.get_mut(domain) { 379 if let Some(mut throttle) = self.throttles.get_mut(domain) {
366 throttle.mark_url_tried(identifier, url); 380 throttle.mark_url_tried(identifier, url);
367 throttle.mark_identifier_not_in_progress(identifier); 381 throttle.mark_identifier_not_in_progress(identifier);
@@ -372,6 +386,8 @@ impl ThrottleManager {
372 if let Some(mut throttle) = self.throttles.get_mut(domain) { 386 if let Some(mut throttle) = self.throttles.get_mut(domain) {
373 throttle.remove_identifier(identifier); 387 throttle.remove_identifier(identifier);
374 } 388 }
389 // Try next identifier since we didn't use any capacity
390 self.try_process_next(domain);
375 } 391 }
376 } 392 }
377 } 393 }
@@ -387,9 +403,10 @@ Per-domain rate limiting and waiting queue:
387/// 403///
388/// Handles: 404/// Handles:
389/// - Rate limiting (concurrent requests, requests per minute) 405/// - Rate limiting (concurrent requests, requests per minute)
390/// - Queue of identifiers waiting for capacity 406/// - Queue of identifiers waiting for capacity (using IndexMap for round-robin order)
391/// - Tracking tried URLs per identifier (for this domain only) 407/// - Tracking tried URLs per identifier (for this domain only)
392/// - In-progress flag per identifier (prevent concurrent fetches for same identifier) 408/// - In-progress flag per identifier (prevents concurrent fetches for same identifier
409/// on this domain, important when queue is small and we have multiple concurrent slots)
393pub struct DomainThrottle { 410pub struct DomainThrottle {
394 /// Domain this throttle manages 411 /// Domain this throttle manages
395 domain: String, 412 domain: String,
@@ -400,12 +417,12 @@ pub struct DomainThrottle {
400 /// Request timestamps (sliding window for rate limiting) 417 /// Request timestamps (sliding window for rate limiting)
401 request_times: VecDeque<Instant>, 418 request_times: VecDeque<Instant>,
402 419
403 /// Identifiers waiting for capacity on this domain 420 /// Queued identifiers with their state.
404 /// Stored in order for round-robin processing 421 /// IndexMap preserves insertion order for round-robin processing.
405 waiting_queue: VecDeque<String>, 422 queue: IndexMap<String, IdentifierQueueState>,
406 423
407 /// Per-identifier state for queued identifiers 424 /// Round-robin index for fair processing across identifiers
408 identifier_state: HashMap<String, IdentifierQueueState>, 425 round_robin_index: usize,
409 426
410 /// Configuration 427 /// Configuration
411 max_concurrent: u32, 428 max_concurrent: u32,
@@ -418,7 +435,10 @@ struct IdentifierQueueState {
418 /// URLs from this domain that have been tried 435 /// URLs from this domain that have been tried
419 tried_urls: HashSet<String>, 436 tried_urls: HashSet<String>,
420 437
421 /// Whether a fetch is currently in progress for this identifier on this domain 438 /// Whether a fetch is currently in progress for this identifier on this domain.
439 /// Prevents starting multiple concurrent fetches for the same identifier,
440 /// which is important when the queue is small (e.g., 2 identifiers with 5
441 /// concurrent slots would otherwise try to process the same identifier multiple times).
422 in_progress: bool, 442 in_progress: bool,
423} 443}
424 444
@@ -428,8 +448,8 @@ impl DomainThrottle {
428 domain, 448 domain,
429 in_flight: 0, 449 in_flight: 0,
430 request_times: VecDeque::new(), 450 request_times: VecDeque::new(),
431 waiting_queue: VecDeque::new(), 451 queue: IndexMap::new(),
432 identifier_state: HashMap::new(), 452 round_robin_index: 0,
433 max_concurrent, 453 max_concurrent,
434 max_per_minute, 454 max_per_minute,
435 } 455 }
@@ -451,6 +471,11 @@ impl DomainThrottle {
451 recent_count < self.max_per_minute as usize 471 recent_count < self.max_per_minute as usize
452 } 472 }
453 473
474 /// Check if there are any identifiers in the queue
475 pub fn has_queued_work(&self) -> bool {
476 !self.queue.is_empty()
477 }
478
454 /// Record that a request is starting 479 /// Record that a request is starting
455 pub fn start_request(&mut self) { 480 pub fn start_request(&mut self) {
456 self.in_flight += 1; 481 self.in_flight += 1;
@@ -469,15 +494,12 @@ impl DomainThrottle {
469 } 494 }
470 } 495 }
471 496
472 /// Add an identifier to the waiting queue 497 /// Add an identifier to the queue
473 pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) { 498 pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) {
474 if !self.identifier_state.contains_key(&identifier) { 499 self.queue
475 self.waiting_queue.push_back(identifier.clone());
476 }
477 // Update or insert state (merge tried_urls if already exists)
478 self.identifier_state
479 .entry(identifier) 500 .entry(identifier)
480 .and_modify(|state| { 501 .and_modify(|state| {
502 // Merge tried_urls if already exists
481 state.tried_urls.extend(tried_urls.iter().cloned()); 503 state.tried_urls.extend(tried_urls.iter().cloned());
482 }) 504 })
483 .or_insert(IdentifierQueueState { 505 .or_insert(IdentifierQueueState {
@@ -486,28 +508,35 @@ impl DomainThrottle {
486 }); 508 });
487 } 509 }
488 510
489 /// Get next identifier ready for processing (round-robin, not in_progress) 511 /// Get next identifier ready for processing (round-robin, not in_progress).
512 ///
513 /// Iterates through the queue starting from round_robin_index, skipping
514 /// any identifiers that are already in_progress. This ensures fair
515 /// distribution even when some identifiers have active fetches.
490 pub fn next_ready_identifier(&mut self) -> Option<String> { 516 pub fn next_ready_identifier(&mut self) -> Option<String> {
491 // Find first identifier that's not in_progress 517 let len = self.queue.len();
492 for _ in 0..self.waiting_queue.len() { 518 if len == 0 {
493 if let Some(identifier) = self.waiting_queue.pop_front() { 519 return None;
494 if let Some(state) = self.identifier_state.get_mut(&identifier) { 520 }
495 if !state.in_progress { 521
496 state.in_progress = true; 522 // Try each identifier starting from round_robin_index
497 self.waiting_queue.push_back(identifier.clone()); // Re-add for round-robin 523 for i in 0..len {
498 return Some(identifier); 524 let index = (self.round_robin_index + i) % len;
499 } 525 if let Some((identifier, state)) = self.queue.get_index_mut(index) {
526 if !state.in_progress {
527 state.in_progress = true;
528 self.round_robin_index = (index + 1) % len;
529 return Some(identifier.clone());
500 } 530 }
501 // Put it back if in_progress
502 self.waiting_queue.push_back(identifier);
503 } 531 }
504 } 532 }
505 None 533
534 None // All identifiers are in_progress
506 } 535 }
507 536
508 /// Get tried URLs for an identifier 537 /// Get tried URLs for an identifier
509 pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> { 538 pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> {
510 self.identifier_state 539 self.queue
511 .get(identifier) 540 .get(identifier)
512 .map(|s| s.tried_urls.clone()) 541 .map(|s| s.tried_urls.clone())
513 .unwrap_or_default() 542 .unwrap_or_default()
@@ -515,22 +544,32 @@ impl DomainThrottle {
515 544
516 /// Mark a URL as tried for an identifier 545 /// Mark a URL as tried for an identifier
517 pub fn mark_url_tried(&mut self, identifier: &str, url: String) { 546 pub fn mark_url_tried(&mut self, identifier: &str, url: String) {
518 if let Some(state) = self.identifier_state.get_mut(identifier) { 547 if let Some(state) = self.queue.get_mut(identifier) {
519 state.tried_urls.insert(url); 548 state.tried_urls.insert(url);
520 } 549 }
521 } 550 }
522 551
523 /// Mark identifier as not in progress (fetch completed) 552 /// Mark identifier as not in progress (fetch completed)
524 pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) { 553 pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) {
525 if let Some(state) = self.identifier_state.get_mut(identifier) { 554 if let Some(state) = self.queue.get_mut(identifier) {
526 state.in_progress = false; 555 state.in_progress = false;
527 } 556 }
528 } 557 }
529 558
530 /// Remove an identifier from the queue entirely 559 /// Remove an identifier from the queue entirely
531 pub fn remove_identifier(&mut self, identifier: &str) { 560 pub fn remove_identifier(&mut self, identifier: &str) {
532 self.identifier_state.remove(identifier); 561 if let Some((index, _, _)) = self.queue.shift_remove_full(identifier) {
533 self.waiting_queue.retain(|id| id != identifier); 562 // Adjust round_robin_index if we removed an entry before it
563 if index < self.round_robin_index && self.round_robin_index > 0 {
564 self.round_robin_index -= 1;
565 }
566 // Clamp to valid range
567 if !self.queue.is_empty() {
568 self.round_robin_index = self.round_robin_index % self.queue.len();
569 } else {
570 self.round_robin_index = 0;
571 }
572 }
534 } 573 }
535} 574}
536``` 575```
@@ -761,7 +800,7 @@ pub async fn sync_identifier_from_url<C: SyncContext>(
761 ctx: &C, 800 ctx: &C,
762 identifier: &str, 801 identifier: &str,
763 url: &str, 802 url: &str,
764 throttle_manager: &ThrottleManager, 803 throttle_manager: &Arc<ThrottleManager>,
765) -> usize { 804) -> usize {
766 let domain = match extract_domain(url) { 805 let domain = match extract_domain(url) {
767 Some(d) => d, 806 Some(d) => d,
@@ -847,7 +886,7 @@ pub async fn sync_identifier_from_url<C: SyncContext>(
847pub async fn sync_identifier<C: SyncContext>( 886pub async fn sync_identifier<C: SyncContext>(
848 ctx: &C, 887 ctx: &C,
849 identifier: &str, 888 identifier: &str,
850 throttle_manager: &ThrottleManager, 889 throttle_manager: &Arc<ThrottleManager>,
851) -> bool { 890) -> bool {
852 let mut tried_urls: HashSet<String> = HashSet::new(); 891 let mut tried_urls: HashSet<String> = HashSet::new();
853 892
@@ -1292,6 +1331,56 @@ mod tests {
1292 // Now id1 should be available again 1331 // Now id1 should be available again
1293 assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); 1332 assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string()));
1294 } 1333 }
1334
1335 #[tokio::test]
1336 async fn test_domain_throttle_remove_adjusts_index() {
1337 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30);
1338
1339 throttle.enqueue_identifier("id1".to_string(), HashSet::new());
1340 throttle.enqueue_identifier("id2".to_string(), HashSet::new());
1341 throttle.enqueue_identifier("id3".to_string(), HashSet::new());
1342
1343 // Advance to id2
1344 assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string()));
1345 throttle.mark_identifier_not_in_progress("id1");
1346
1347 // Remove id1 (before current index)
1348 throttle.remove_identifier("id1");
1349
1350 // Should continue with id2 (not skip to id3)
1351 assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string()));
1352 }
1353
1354 #[tokio::test]
1355 async fn test_domain_throttle_has_queued_work() {
1356 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30);
1357
1358 assert!(!throttle.has_queued_work());
1359
1360 throttle.enqueue_identifier("id1".to_string(), HashSet::new());
1361 assert!(throttle.has_queued_work());
1362
1363 throttle.remove_identifier("id1");
1364 assert!(!throttle.has_queued_work());
1365 }
1366
1367 #[tokio::test]
1368 async fn test_domain_throttle_tried_urls_merge() {
1369 let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30);
1370
1371 let mut urls1 = HashSet::new();
1372 urls1.insert("url1".to_string());
1373 throttle.enqueue_identifier("id1".to_string(), urls1);
1374
1375 // Enqueue again with different tried URLs - should merge
1376 let mut urls2 = HashSet::new();
1377 urls2.insert("url2".to_string());
1378 throttle.enqueue_identifier("id1".to_string(), urls2);
1379
1380 let tried = throttle.get_tried_urls("id1");
1381 assert!(tried.contains("url1"));
1382 assert!(tried.contains("url2"));
1383 }
1295} 1384}
1296``` 1385```
1297 1386
@@ -1308,7 +1397,7 @@ mod tests {
13081. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) 13971. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait)
13092. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests 13982. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests
13103. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` 13993. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync`
13114. **Phase 4**: Implement ThrottleManager domain loops 14004. **Phase 4**: Implement ThrottleManager trigger-based processing
13125. **Phase 5**: Add PR event syncing 14015. **Phase 5**: Add PR event syncing
13136. **Phase 6**: Remove old `start_state_sync` code 14026. **Phase 6**: Remove old `start_state_sync` code
1314 1403