diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-06 16:41:26 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-06 16:41:26 +0000 |
| commit | 0da412a255858d35cc2eb85158856abc736a236b (patch) | |
| tree | daac8d46f7d26f195b76ed48b773e757a18ddfa3 /docs/explanation | |
| parent | 534b9e23524a1f651590eccbfef3398fa7fbc495 (diff) | |
docs: purgatory git sync design with throttle queuing
Diffstat (limited to 'docs/explanation')
| -rw-r--r-- | docs/explanation/purgatory-sync-redesign.md | 1172 |
1 files changed, 806 insertions, 366 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index 76d01b8..7501da2 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md | |||
| @@ -32,86 +32,114 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**, | |||
| 32 | ### Overview | 32 | ### Overview |
| 33 | 33 | ||
| 34 | ``` | 34 | ``` |
| 35 | ┌──────────────────────────────────────────────────────────────────────────────┐ | 35 | ┌──────────────────────────────────────────────────────────────────────────────────┐ |
| 36 | │ Purgatory │ | 36 | │ Purgatory │ |
| 37 | │ │ | 37 | │ │ |
| 38 | │ ┌─────────────────┐ ┌─────────────────┐ │ | 38 | │ ┌─────────────────┐ ┌─────────────────┐ │ |
| 39 | │ │ State Events │ │ PR Events │ │ | 39 | │ │ State Events │ │ PR Events │ │ |
| 40 | │ │ (by identifier)│ │ (by event_id) │ │ | 40 | │ │ (by identifier)│ │ (by event_id) │ │ |
| 41 | │ └────────┬────────┘ └────────┬────────┘ │ | 41 | │ └────────┬────────┘ └────────┬────────┘ │ |
| 42 | │ │ │ │ | 42 | │ │ │ │ |
| 43 | │ └──────────┬─────────┘ │ | 43 | │ └──────────┬─────────┘ │ |
| 44 | │ │ add_state() / add_pr() / trigger_immediate_sync() │ | 44 | │ │ add_state() / add_pr() / trigger_immediate_sync() │ |
| 45 | │ ▼ │ | 45 | │ ▼ │ |
| 46 | │ ┌──────────────────────────┐ │ | 46 | │ ┌──────────────────────────┐ │ |
| 47 | │ │ Sync Queue │ │ | 47 | │ │ Sync Queue │ │ |
| 48 | │ │ DashMap<id, Entry> │ │ | 48 | │ │ DashMap<id, Entry> │ │ |
| 49 | │ │ │ │ | 49 | │ │ │ │ |
| 50 | │ │ Entry { │ │ | 50 | │ │ Entry { │ │ |
| 51 | │ │ next_attempt, │ ← delay/backoff timer │ | 51 | │ │ next_attempt, │ ← delay/backoff timer │ |
| 52 | │ │ attempt_count, │ ← for backoff calculation │ | 52 | │ │ attempt_count, │ ← for backoff calculation │ |
| 53 | │ │ in_progress, │ ← prevents concurrent runs │ | 53 | │ │ in_progress, │ ← prevents concurrent runs │ |
| 54 | │ │ } │ │ | 54 | │ │ } │ │ |
| 55 | │ └────────────┬─────────────┘ │ | 55 | │ └────────────┬─────────────┘ │ |
| 56 | │ │ │ | 56 | │ │ │ |
| 57 | │ ┌─────────────────────┼─────────────────────────────────────────────────┐ │ | 57 | │ ┌─────────────────────┼──────────────────────────────────────────────────────┐ │ |
| 58 | │ │ ▼ │ │ | 58 | │ │ ▼ │ │ |
| 59 | │ │ ┌─────────────────────┐ │ │ | 59 | │ │ ┌─────────────────────┐ │ │ |
| 60 | │ │ │ Main Sync Loop │ (every 1s) │ │ | 60 | │ │ │ Main Sync Loop │ (every 1s) │ │ |
| 61 | │ │ │ │ │ │ | 61 | │ │ │ │ │ │ |
| 62 | │ │ │ 1. Find ALL ready │ │ │ | 62 | │ │ │ 1. Find ALL ready │ │ │ |
| 63 | │ │ │ identifiers │ │ │ | 63 | │ │ │ identifiers │ │ │ |
| 64 | │ │ │ 2. Spawn parallel │ │ │ | 64 | │ │ │ 2. Spawn parallel │───────┐ │ │ |
| 65 | │ │ │ tasks for each │───────┐ │ │ | 65 | │ │ │ tasks for each │ │ (parallel tasks) │ │ |
| 66 | │ │ │ 3. Apply backoff │ │ (parallel tasks) │ │ | 66 | │ │ │ 3. Apply backoff │ │ │ │ |
| 67 | │ │ │ when done │ │ │ │ | 67 | │ │ │ when done │ │ │ │ |
| 68 | │ │ └─────────────────────┘ │ │ │ | 68 | │ │ └─────────────────────┘ │ │ │ |
| 69 | │ │ ▼ │ │ | 69 | │ │ ▼ │ │ |
| 70 | │ │ ┌─────────────────────────────────────┐ │ │ | 70 | │ │ ┌──────────────────────────────────────────┐ │ │ |
| 71 | │ │ │ sync_identifier() │ │ │ | 71 | │ │ │ sync_identifier() │ │ │ |
| 72 | │ │ │ │ │ │ | 72 | │ │ │ │ │ │ |
| 73 | │ │ │ Owns its own tried_urls: HashSet │ │ │ | 73 | │ │ │ Owns its own tried_urls: HashSet │ │ │ |
| 74 | │ │ │ │ │ │ | 74 | │ │ │ │ │ │ |
| 75 | │ │ │ loop: │ │ │ | 75 | │ │ │ loop: │ │ │ |
| 76 | │ │ │ sync_identifier_step() │ │ │ | 76 | │ │ │ url = sync_identifier_next_url( │ │ │ |
| 77 | │ │ │ → (url_tried, complete) │ │ │ | 77 | │ │ │ domain=None) │ │ │ |
| 78 | │ │ │ if complete: break │ │ │ | 78 | │ │ │ if url is Some: │ │ │ |
| 79 | │ │ │ tried_urls.insert(url_tried) │ │ │ | 79 | │ │ │ sync_identifier_from_url(url) │ │ │ |
| 80 | │ │ └─────────────────────────────────────┘ │ │ | 80 | │ │ │ tried_urls.insert(url) │ │ │ |
| 81 | │ │ │ │ │ | 81 | │ │ │ else: │ │ │ |
| 82 | │ │ ▼ │ │ | 82 | │ │ │ break (no non-throttled URLs left) │ │ │ |
| 83 | │ │ ┌─────────────────────────────────────┐ │ │ | 83 | │ │ │ │ │ │ |
| 84 | │ │ │ Domain Throttle │ │ │ | 84 | │ │ │ Enqueue throttled domains then return │ │ │ |
| 85 | │ │ │ (rate limiting only) │ │ │ | 85 | │ │ └──────────────────────────────────────────┘ │ │ |
| 86 | │ │ │ │ │ │ | 86 | │ │ │ │ │ |
| 87 | │ │ │ Per-domain state: │ │ │ | 87 | │ │ │ enqueue_identifier() │ │ |
| 88 | │ │ │ - in_flight: u32 │ │ │ | 88 | │ │ ▼ │ │ |
| 89 | │ │ │ - request_times: VecDeque │ │ │ | 89 | │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ |
| 90 | │ │ │ │ │ │ | 90 | │ │ │ ThrottleManager │ │ │ |
| 91 | │ │ │ Round-robin via: │ │ │ | 91 | │ │ │ │ │ │ |
| 92 | │ │ │ - last_used_index per domain │ │ │ | 92 | │ │ │ DashMap<domain, DomainThrottle> │ │ │ |
| 93 | │ │ │ - Caller passes tried_urls │ │ │ | 93 | │ │ │ │ │ │ |
| 94 | │ │ └─────────────────────────────────────┘ │ │ | 94 | │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ |
| 95 | │ │ │ │ | 95 | │ │ │ │ DomainThrottle (per domain) │ │ │ │ |
| 96 | │ └───────────────────────────────────────────────────────────────────────┘ │ | 96 | │ │ │ │ │ │ │ │ |
| 97 | └──────────────────────────────────────────────────────────────────────────────┘ | 97 | │ │ │ │ Rate limiting: │ Waiting queue: │ │ │ │ |
| 98 | │ │ │ │ - in_flight: u32 │ - waiting_queue: VecDeque │ │ │ │ | ||
| 99 | │ │ │ │ - request_times │ - identifier_state: HashMap │ │ │ │ | ||
| 100 | │ │ │ │ │ - tried_urls (this domain)│ │ │ │ | ||
| 101 | │ │ │ │ │ - in_progress │ │ │ │ | ||
| 102 | │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ | ||
| 103 | │ │ │ │ │ │ | ||
| 104 | │ │ │ Domain Loop (per domain, runs independently): │ │ │ | ||
| 105 | │ │ │ 1. Wait for capacity │ │ │ | ||
| 106 | │ │ │ 2. Pick next identifier (round-robin, not in_progress) │ │ │ | ||
| 107 | │ │ │ 3. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │ | ||
| 108 | │ │ │ 4. If url: sync_identifier_from_url(url), mark tried │ │ │ | ||
| 109 | │ │ │ Else: remove identifier from queue │ │ │ | ||
| 110 | │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ | ||
| 111 | │ │ │ │ | ||
| 112 | │ └────────────────────────────────────────────────────────────────────────────┘ │ | ||
| 113 | └───────────────────────────────────────────────────────────────────────────────────┘ | ||
| 98 | ``` | 114 | ``` |
| 99 | 115 | ||
| 100 | ### Key Design Principle: Separation of Concerns | 116 | ### Key Design Principles |
| 101 | 117 | ||
| 102 | The previous design conflated two concerns in `DomainThrottle`: | 118 | **1. Two Independent Execution Paths** |
| 103 | 1. **Rate limiting** (per-domain): How many requests can we make to a domain? | ||
| 104 | 2. **URL tracking** (per-identifier): Which URLs have we tried for this sync? | ||
| 105 | 119 | ||
| 106 | The new design cleanly separates these: | 120 | The main sync loop and DomainThrottle loops run independently: |
| 121 | - **Main sync**: Tries non-throttled URLs, completes quickly, applies backoff, retries later | ||
| 122 | - **DomainThrottle**: Processes queued identifiers when capacity frees, doesn't block main sync | ||
| 107 | 123 | ||
| 108 | - **`DomainThrottle`**: Only handles rate limiting. Tracks in-flight requests and request timestamps per domain. Uses round-robin internally to distribute load across URLs. | 124 | **2. Two Separate tried_urls Tracking** |
| 109 | - **`sync_identifier`**: Owns its `tried_urls: HashSet<String>`. Passes this to the throttle when requesting a URL to try. | ||
| 110 | 125 | ||
| 111 | This separation enables: | 126 | Each path tracks its own tried URLs: |
| 112 | - **Unit testing** of sync logic with a mock throttle | 127 | - **sync_identifier**: Local `HashSet<String>` for current attempt (all domains) |
| 113 | - **Simpler state management** - throttle doesn't need cleanup when identifiers complete | 128 | - **DomainThrottle**: Per-identifier `HashSet<String>` for URLs tried via throttle (this domain only) |
| 114 | - **Clearer reasoning** about each component's responsibility | 129 | |
| 130 | These don't need to merge because: | ||
| 131 | - Main sync skips throttled domains anyway | ||
| 132 | - DomainThrottle only processes its own domain's URLs | ||
| 133 | |||
| 134 | **3. Shared Functions** | ||
| 135 | |||
| 136 | Both paths use the same core functions: | ||
| 137 | - **`sync_identifier_next_url`**: Pure URL selection logic | ||
| 138 | - **`sync_identifier_from_url`**: Pure fetch logic | ||
| 139 | |||
| 140 | The `domain` parameter determines behavior: | ||
| 141 | - `None`: Return any non-throttled URL | ||
| 142 | - `Some(domain)`: Return URL from that specific domain only | ||
| 115 | 143 | ||
| 116 | ### Flow Summary | 144 | ### Flow Summary |
| 117 | 145 | ||
| @@ -124,16 +152,17 @@ This separation enables: | |||
| 124 | - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`) | 152 | - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`) |
| 125 | - Spawns parallel tasks for each (marks `in_progress = true`) | 153 | - Spawns parallel tasks for each (marks `in_progress = true`) |
| 126 | - Each `sync_identifier()` task: | 154 | - Each `sync_identifier()` task: |
| 127 | - Creates fresh `tried_urls: HashSet<String>` | 155 | - Creates fresh `tried_urls: HashSet<String>` |
| 128 | - Loops calling `sync_identifier_step()` until complete | 156 | - Loops calling `sync_identifier_next_url(domain=None)` + `sync_identifier_from_url` |
| 129 | - Step returns `(Option<url_tried>, complete)` - clean testable interface | 157 | - When no non-throttled URLs remain: enqueue with throttled domains, return |
| 130 | - When task completes: apply backoff or remove from queue | 158 | - When task completes: apply backoff or remove from queue |
| 131 | 159 | ||
| 132 | 3. **Domain throttle**: | 160 | 3. **ThrottleManager / DomainThrottle loops** (independent): |
| 133 | - Pure rate limiting: tracks in_flight count and request timestamps per domain | 161 | - Each domain has its own loop checking for capacity |
| 134 | - `get_next_url()` takes `available_urls` and `tried_urls`, returns next URL to try | 162 | - When capacity available: pick next queued identifier (round-robin, not in_progress) |
| 135 | - Uses round-robin internally to distribute load | 163 | - Call `sync_identifier_next_url(domain=Some(this_domain))` |
| 136 | - No per-identifier state needed | 164 | - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress |
| 165 | - If no URL: remove identifier from this domain's queue | ||
| 137 | 166 | ||
| 138 | ## Data Structures | 167 | ## Data Structures |
| 139 | 168 | ||
| @@ -196,142 +225,312 @@ impl SyncQueueEntry { | |||
| 196 | } | 225 | } |
| 197 | ``` | 226 | ``` |
| 198 | 227 | ||
| 199 | ### DomainThrottle (Rate Limiting Only) | 228 | ### ThrottleManager |
| 229 | |||
| 230 | Manages all per-domain throttles and provides the interface for checking throttle status: | ||
| 200 | 231 | ||
| 201 | ```rust | 232 | ```rust |
| 202 | /// Domain-level rate limiting with round-robin URL selection. | 233 | /// Manages rate limiting across all domains. |
| 203 | /// | 234 | /// |
| 204 | /// This struct ONLY handles rate limiting. It does not track which URLs | 235 | /// Owns a collection of DomainThrottle instances and provides: |
| 205 | /// have been tried - that's the caller's responsibility. | 236 | /// - Throttle status checking for sync_identifier_next_url |
| 237 | /// - Domain throttle loop spawning | ||
| 238 | /// - Identifier queue management | ||
| 239 | pub struct ThrottleManager { | ||
| 240 | /// Per-domain throttle state | ||
| 241 | throttles: DashMap<String, DomainThrottle>, | ||
| 242 | |||
| 243 | /// Configuration | ||
| 244 | max_concurrent_per_domain: u32, | ||
| 245 | max_per_minute_per_domain: u32, | ||
| 246 | } | ||
| 247 | |||
| 248 | impl ThrottleManager { | ||
| 249 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { | ||
| 250 | Self { | ||
| 251 | throttles: DashMap::new(), | ||
| 252 | max_concurrent_per_domain: max_concurrent, | ||
| 253 | max_per_minute_per_domain: max_per_minute, | ||
| 254 | } | ||
| 255 | } | ||
| 256 | |||
| 257 | /// Check if a domain is currently throttled (at capacity) | ||
| 258 | pub fn is_throttled(&self, domain: &str) -> bool { | ||
| 259 | self.throttles | ||
| 260 | .get(domain) | ||
| 261 | .map_or(false, |t| !t.has_capacity()) | ||
| 262 | } | ||
| 263 | |||
| 264 | /// Get or create throttle for a domain | ||
| 265 | fn get_or_create(&self, domain: &str) -> dashmap::mapref::one::RefMut<String, DomainThrottle> { | ||
| 266 | self.throttles | ||
| 267 | .entry(domain.to_string()) | ||
| 268 | .or_insert_with(|| DomainThrottle::new( | ||
| 269 | domain.to_string(), | ||
| 270 | self.max_concurrent_per_domain, | ||
| 271 | self.max_per_minute_per_domain, | ||
| 272 | )) | ||
| 273 | } | ||
| 274 | |||
| 275 | /// Record that a request is starting for a domain | ||
| 276 | pub fn start_request(&self, domain: &str) { | ||
| 277 | self.get_or_create(domain).start_request(); | ||
| 278 | } | ||
| 279 | |||
| 280 | /// Record that a request completed for a domain | ||
| 281 | pub fn complete_request(&self, domain: &str) { | ||
| 282 | if let Some(mut throttle) = self.throttles.get_mut(domain) { | ||
| 283 | throttle.complete_request(); | ||
| 284 | } | ||
| 285 | } | ||
| 286 | |||
| 287 | /// Add an identifier to a domain's waiting queue | ||
| 288 | pub fn enqueue_identifier( | ||
| 289 | &self, | ||
| 290 | domain: &str, | ||
| 291 | identifier: String, | ||
| 292 | tried_urls_for_domain: HashSet<String>, | ||
| 293 | ) { | ||
| 294 | self.get_or_create(domain) | ||
| 295 | .enqueue_identifier(identifier, tried_urls_for_domain); | ||
| 296 | } | ||
| 297 | |||
| 298 | /// Start the throttle loop for all domains (called once at startup) | ||
| 299 | pub fn start_domain_loops<C: SyncContext + 'static>( | ||
| 300 | self: Arc<Self>, | ||
| 301 | ctx: Arc<C>, | ||
| 302 | ) -> Vec<tokio::task::JoinHandle<()>> { | ||
| 303 | // Spawn a single coordinator loop that manages all domains | ||
| 304 | vec![tokio::spawn(async move { | ||
| 305 | let mut interval = tokio::time::interval(Duration::from_millis(100)); | ||
| 306 | |||
| 307 | loop { | ||
| 308 | interval.tick().await; | ||
| 309 | |||
| 310 | // Check each domain for capacity and queued work | ||
| 311 | for mut entry in self.throttles.iter_mut() { | ||
| 312 | let domain = entry.key().clone(); | ||
| 313 | let throttle = entry.value_mut(); | ||
| 314 | |||
| 315 | if throttle.has_capacity() { | ||
| 316 | if let Some(identifier) = throttle.next_ready_identifier() { | ||
| 317 | let ctx = ctx.clone(); | ||
| 318 | let manager = self.clone(); | ||
| 319 | let domain_clone = domain.clone(); | ||
| 320 | |||
| 321 | tokio::spawn(async move { | ||
| 322 | manager.process_queued_identifier( | ||
| 323 | &ctx, | ||
| 324 | &domain_clone, | ||
| 325 | &identifier, | ||
| 326 | ).await; | ||
| 327 | }); | ||
| 328 | } | ||
| 329 | } | ||
| 330 | } | ||
| 331 | } | ||
| 332 | })] | ||
| 333 | } | ||
| 334 | |||
| 335 | /// Process a single identifier from a domain's queue | ||
| 336 | async fn process_queued_identifier<C: SyncContext>( | ||
| 337 | &self, | ||
| 338 | ctx: &C, | ||
| 339 | domain: &str, | ||
| 340 | identifier: &str, | ||
| 341 | ) { | ||
| 342 | // Get next URL for this identifier on this domain | ||
| 343 | let url = { | ||
| 344 | let throttle = match self.throttles.get(domain) { | ||
| 345 | Some(t) => t, | ||
| 346 | None => return, | ||
| 347 | }; | ||
| 348 | let tried_urls = throttle.get_tried_urls(identifier); | ||
| 349 | |||
| 350 | sync_identifier_next_url( | ||
| 351 | ctx, | ||
| 352 | identifier, | ||
| 353 | Some(domain), | ||
| 354 | &tried_urls, | ||
| 355 | self, | ||
| 356 | ).await | ||
| 357 | }; | ||
| 358 | |||
| 359 | match url { | ||
| 360 | Some(url) => { | ||
| 361 | // Fetch from this URL | ||
| 362 | sync_identifier_from_url(ctx, identifier, &url, self).await; | ||
| 363 | |||
| 364 | // Record URL as tried | ||
| 365 | if let Some(mut throttle) = self.throttles.get_mut(domain) { | ||
| 366 | throttle.mark_url_tried(identifier, url); | ||
| 367 | throttle.mark_identifier_not_in_progress(identifier); | ||
| 368 | } | ||
| 369 | } | ||
| 370 | None => { | ||
| 371 | // No more URLs for this identifier on this domain - remove from queue | ||
| 372 | if let Some(mut throttle) = self.throttles.get_mut(domain) { | ||
| 373 | throttle.remove_identifier(identifier); | ||
| 374 | } | ||
| 375 | } | ||
| 376 | } | ||
| 377 | } | ||
| 378 | } | ||
| 379 | ``` | ||
| 380 | |||
| 381 | ### DomainThrottle | ||
| 382 | |||
| 383 | Per-domain rate limiting and waiting queue: | ||
| 384 | |||
| 385 | ```rust | ||
| 386 | /// Per-domain rate limiting and identifier queue. | ||
| 206 | /// | 387 | /// |
| 207 | /// Rate limits: 5 concurrent requests, 30 requests/minute per domain | 388 | /// Handles: |
| 389 | /// - Rate limiting (concurrent requests, requests per minute) | ||
| 390 | /// - Queue of identifiers waiting for capacity | ||
| 391 | /// - Tracking tried URLs per identifier (for this domain only) | ||
| 392 | /// - In-progress flag per identifier (prevent concurrent fetches for same identifier) | ||
| 208 | pub struct DomainThrottle { | 393 | pub struct DomainThrottle { |
| 209 | /// In-flight request count per domain | 394 | /// Domain this throttle manages |
| 210 | in_flight: DashMap<String, u32>, | 395 | domain: String, |
| 211 | 396 | ||
| 212 | /// Request timestamps per domain (sliding window) | 397 | /// Current in-flight request count |
| 213 | request_times: DashMap<String, VecDeque<Instant>>, | 398 | in_flight: u32, |
| 214 | 399 | ||
| 215 | /// Round-robin index per domain (for fair URL distribution) | 400 | /// Request timestamps (sliding window for rate limiting) |
| 216 | round_robin_index: DashMap<String, usize>, | 401 | request_times: VecDeque<Instant>, |
| 217 | 402 | ||
| 403 | /// Identifiers waiting for capacity on this domain | ||
| 404 | /// Stored in order for round-robin processing | ||
| 405 | waiting_queue: VecDeque<String>, | ||
| 406 | |||
| 407 | /// Per-identifier state for queued identifiers | ||
| 408 | identifier_state: HashMap<String, IdentifierQueueState>, | ||
| 409 | |||
| 410 | /// Configuration | ||
| 218 | max_concurrent: u32, | 411 | max_concurrent: u32, |
| 219 | max_per_minute: u32, | 412 | max_per_minute: u32, |
| 220 | } | 413 | } |
| 221 | 414 | ||
| 415 | /// State for an identifier waiting in a domain's queue | ||
| 416 | #[derive(Debug, Clone)] | ||
| 417 | struct IdentifierQueueState { | ||
| 418 | /// URLs from this domain that have been tried | ||
| 419 | tried_urls: HashSet<String>, | ||
| 420 | |||
| 421 | /// Whether a fetch is currently in progress for this identifier on this domain | ||
| 422 | in_progress: bool, | ||
| 423 | } | ||
| 424 | |||
| 222 | impl DomainThrottle { | 425 | impl DomainThrottle { |
| 223 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { | 426 | pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self { |
| 224 | Self { | 427 | Self { |
| 225 | in_flight: DashMap::new(), | 428 | domain, |
| 226 | request_times: DashMap::new(), | 429 | in_flight: 0, |
| 227 | round_robin_index: DashMap::new(), | 430 | request_times: VecDeque::new(), |
| 431 | waiting_queue: VecDeque::new(), | ||
| 432 | identifier_state: HashMap::new(), | ||
| 228 | max_concurrent, | 433 | max_concurrent, |
| 229 | max_per_minute, | 434 | max_per_minute, |
| 230 | } | 435 | } |
| 231 | } | 436 | } |
| 232 | 437 | ||
| 233 | /// Check if domain has capacity for another request | 438 | /// Check if domain has capacity for another request |
| 234 | pub fn has_capacity(&self, domain: &str) -> bool { | 439 | pub fn has_capacity(&self) -> bool { |
| 235 | let in_flight = self.in_flight.get(domain).map_or(0, |v| *v); | 440 | if self.in_flight >= self.max_concurrent { |
| 236 | if in_flight >= self.max_concurrent { | ||
| 237 | return false; | 441 | return false; |
| 238 | } | 442 | } |
| 239 | 443 | ||
| 240 | let now = Instant::now(); | 444 | let now = Instant::now(); |
| 241 | let window = Duration::from_secs(60); | 445 | let window = Duration::from_secs(60); |
| 242 | self.request_times | 446 | let recent_count = self.request_times |
| 243 | .get(domain) | ||
| 244 | .map_or(true, |times| { | ||
| 245 | times.iter().filter(|t| now.duration_since(**t) < window).count() | ||
| 246 | < self.max_per_minute as usize | ||
| 247 | }) | ||
| 248 | } | ||
| 249 | |||
| 250 | /// Get next URL to try from available URLs, excluding already-tried URLs. | ||
| 251 | /// Uses round-robin to distribute load across URLs for a domain. | ||
| 252 | /// | ||
| 253 | /// Returns None if: | ||
| 254 | /// - Domain is at capacity (rate limited) | ||
| 255 | /// - All available URLs have been tried | ||
| 256 | pub fn get_next_url( | ||
| 257 | &self, | ||
| 258 | domain: &str, | ||
| 259 | available_urls: &[String], | ||
| 260 | tried_urls: &HashSet<String>, | ||
| 261 | ) -> Option<String> { | ||
| 262 | if !self.has_capacity(domain) { | ||
| 263 | return None; | ||
| 264 | } | ||
| 265 | |||
| 266 | // Filter to untried URLs | ||
| 267 | let untried: Vec<_> = available_urls | ||
| 268 | .iter() | 447 | .iter() |
| 269 | .filter(|url| !tried_urls.contains(*url)) | 448 | .filter(|t| now.duration_since(**t) < window) |
| 270 | .collect(); | 449 | .count(); |
| 271 | |||
| 272 | if untried.is_empty() { | ||
| 273 | return None; | ||
| 274 | } | ||
| 275 | 450 | ||
| 276 | // Round-robin selection | 451 | recent_count < self.max_per_minute as usize |
| 277 | let mut index = self.round_robin_index.entry(domain.to_string()).or_insert(0); | ||
| 278 | let selected_index = *index % untried.len(); | ||
| 279 | *index = (*index + 1) % untried.len(); | ||
| 280 | |||
| 281 | Some(untried[selected_index].clone()) | ||
| 282 | } | 452 | } |
| 283 | 453 | ||
| 284 | /// Record that a request is starting | 454 | /// Record that a request is starting |
| 285 | pub fn start_request(&self, domain: &str) { | 455 | pub fn start_request(&mut self) { |
| 286 | *self.in_flight.entry(domain.to_string()).or_insert(0) += 1; | 456 | self.in_flight += 1; |
| 287 | self.request_times | 457 | self.request_times.push_back(Instant::now()); |
| 288 | .entry(domain.to_string()) | ||
| 289 | .or_default() | ||
| 290 | .push_back(Instant::now()); | ||
| 291 | } | 458 | } |
| 292 | 459 | ||
| 293 | /// Record that a request completed | 460 | /// Record that a request completed |
| 294 | pub fn complete_request(&self, domain: &str) { | 461 | pub fn complete_request(&mut self) { |
| 295 | if let Some(mut count) = self.in_flight.get_mut(domain) { | 462 | self.in_flight = self.in_flight.saturating_sub(1); |
| 296 | *count = count.saturating_sub(1); | ||
| 297 | } | ||
| 298 | 463 | ||
| 299 | // Clean old timestamps | 464 | // Clean old timestamps |
| 300 | let now = Instant::now(); | 465 | let now = Instant::now(); |
| 301 | let window = Duration::from_secs(60); | 466 | let window = Duration::from_secs(60); |
| 302 | if let Some(mut times) = self.request_times.get_mut(domain) { | 467 | while self.request_times.front().map_or(false, |t| now.duration_since(*t) >= window) { |
| 303 | while times.front().map_or(false, |t| now.duration_since(*t) >= window) { | 468 | self.request_times.pop_front(); |
| 304 | times.pop_front(); | ||
| 305 | } | ||
| 306 | } | 469 | } |
| 307 | } | 470 | } |
| 308 | 471 | ||
| 309 | /// Get time until domain has capacity (for scheduling retries) | 472 | /// Add an identifier to the waiting queue |
| 310 | pub fn time_until_capacity(&self, domain: &str) -> Option<Duration> { | 473 | pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) { |
| 311 | // Check concurrent limit first | 474 | if !self.identifier_state.contains_key(&identifier) { |
| 312 | let in_flight = self.in_flight.get(domain).map_or(0, |v| *v); | 475 | self.waiting_queue.push_back(identifier.clone()); |
| 313 | if in_flight >= self.max_concurrent { | ||
| 314 | // Can't predict when a request will complete | ||
| 315 | return Some(Duration::from_millis(100)); | ||
| 316 | } | 476 | } |
| 317 | 477 | // Update or insert state (merge tried_urls if already exists) | |
| 318 | // Check rate limit | 478 | self.identifier_state |
| 319 | let now = Instant::now(); | 479 | .entry(identifier) |
| 320 | let window = Duration::from_secs(60); | 480 | .and_modify(|state| { |
| 321 | if let Some(times) = self.request_times.get(domain) { | 481 | state.tried_urls.extend(tried_urls.iter().cloned()); |
| 322 | let recent_count = times.iter().filter(|t| now.duration_since(**t) < window).count(); | 482 | }) |
| 323 | if recent_count >= self.max_per_minute as usize { | 483 | .or_insert(IdentifierQueueState { |
| 324 | // Find oldest request in window, wait until it expires | 484 | tried_urls, |
| 325 | if let Some(oldest) = times.front() { | 485 | in_progress: false, |
| 326 | let age = now.duration_since(*oldest); | 486 | }); |
| 327 | if age < window { | 487 | } |
| 328 | return Some(window - age); | 488 | |
| 489 | /// Get next identifier ready for processing (round-robin, not in_progress) | ||
| 490 | pub fn next_ready_identifier(&mut self) -> Option<String> { | ||
| 491 | // Find first identifier that's not in_progress | ||
| 492 | for _ in 0..self.waiting_queue.len() { | ||
| 493 | if let Some(identifier) = self.waiting_queue.pop_front() { | ||
| 494 | if let Some(state) = self.identifier_state.get_mut(&identifier) { | ||
| 495 | if !state.in_progress { | ||
| 496 | state.in_progress = true; | ||
| 497 | self.waiting_queue.push_back(identifier.clone()); // Re-add for round-robin | ||
| 498 | return Some(identifier); | ||
| 329 | } | 499 | } |
| 330 | } | 500 | } |
| 501 | // Put it back if in_progress | ||
| 502 | self.waiting_queue.push_back(identifier); | ||
| 331 | } | 503 | } |
| 332 | } | 504 | } |
| 333 | 505 | None | |
| 334 | None // Has capacity now | 506 | } |
| 507 | |||
| 508 | /// Get tried URLs for an identifier | ||
| 509 | pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> { | ||
| 510 | self.identifier_state | ||
| 511 | .get(identifier) | ||
| 512 | .map(|s| s.tried_urls.clone()) | ||
| 513 | .unwrap_or_default() | ||
| 514 | } | ||
| 515 | |||
| 516 | /// Mark a URL as tried for an identifier | ||
| 517 | pub fn mark_url_tried(&mut self, identifier: &str, url: String) { | ||
| 518 | if let Some(state) = self.identifier_state.get_mut(identifier) { | ||
| 519 | state.tried_urls.insert(url); | ||
| 520 | } | ||
| 521 | } | ||
| 522 | |||
| 523 | /// Mark identifier as not in progress (fetch completed) | ||
| 524 | pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) { | ||
| 525 | if let Some(state) = self.identifier_state.get_mut(identifier) { | ||
| 526 | state.in_progress = false; | ||
| 527 | } | ||
| 528 | } | ||
| 529 | |||
| 530 | /// Remove an identifier from the queue entirely | ||
| 531 | pub fn remove_identifier(&mut self, identifier: &str) { | ||
| 532 | self.identifier_state.remove(identifier); | ||
| 533 | self.waiting_queue.retain(|id| id != identifier); | ||
| 335 | } | 534 | } |
| 336 | } | 535 | } |
| 337 | ``` | 536 | ``` |
| @@ -378,58 +577,58 @@ pub trait SyncContext: Send + Sync { | |||
| 378 | 577 | ||
| 379 | ## Core Sync Logic | 578 | ## Core Sync Logic |
| 380 | 579 | ||
| 381 | ### The Sync Step Function | 580 | ### Two-Function Design |
| 382 | 581 | ||
| 383 | This is the key abstraction that enables clean testing: | 582 | The sync logic is split into two functions that can be called by either the main sync loop or by DomainThrottle: |
| 384 | 583 | ||
| 385 | ```rust | 584 | 1. **`sync_identifier_next_url`**: Pure selection logic - finds next URL to try |
| 386 | /// Result of a single sync step | 585 | 2. **`sync_identifier_from_url`**: Pure fetch logic - fetches from a specific URL |
| 387 | #[derive(Debug, Clone, PartialEq)] | 586 | |
| 388 | pub enum SyncStepResult { | 587 | This separation enables: |
| 389 | /// Successfully tried a URL, may or may not have fetched OIDs | 588 | - Main sync loop to try non-throttled URLs immediately |
| 390 | TriedUrl { url: String, oids_fetched: usize }, | 589 | - DomainThrottle to process queued identifiers when capacity frees |
| 391 | 590 | - Clean testability with mocked SyncContext | |
| 392 | /// All available URLs have been tried, need to wait for throttle | ||
| 393 | AllUrlsThrottled { wait_duration: Duration }, | ||
| 394 | |||
| 395 | /// No more URLs to try (all exhausted) | ||
| 396 | NoMoreUrls, | ||
| 397 | |||
| 398 | /// All OIDs are now available, sync complete | ||
| 399 | Complete, | ||
| 400 | |||
| 401 | /// No pending events remain | ||
| 402 | NoPendingEvents, | ||
| 403 | } | ||
| 404 | 591 | ||
| 405 | /// Execute one step of the sync process. | 592 | ### sync_identifier_next_url |
| 593 | |||
| 594 | ```rust | ||
| 595 | /// Find the next URL to try for an identifier. | ||
| 406 | /// | 596 | /// |
| 407 | /// This function is pure logic - all I/O goes through the SyncContext trait. | 597 | /// When `domain` is None: returns any non-throttled URL not in tried_urls |
| 408 | /// This makes it trivially unit testable. | 598 | /// When `domain` is Some: returns a URL from that specific domain not in tried_urls |
| 409 | pub async fn sync_identifier_step<C: SyncContext>( | 599 | /// |
| 600 | /// Returns None if: | ||
| 601 | /// - No pending events for this identifier | ||
| 602 | /// - No OIDs needed (sync complete) | ||
| 603 | /// - No untried URLs available (for the specified domain or all domains) | ||
| 604 | /// - All available domains are throttled (when domain is None) | ||
| 605 | pub async fn sync_identifier_next_url<C: SyncContext>( | ||
| 410 | ctx: &C, | 606 | ctx: &C, |
| 411 | identifier: &str, | 607 | identifier: &str, |
| 608 | domain: Option<&str>, | ||
| 412 | tried_urls: &HashSet<String>, | 609 | tried_urls: &HashSet<String>, |
| 413 | throttle: &DomainThrottle, | 610 | throttle_manager: &ThrottleManager, |
| 414 | ) -> Result<SyncStepResult> { | 611 | ) -> Option<String> { |
| 415 | // 1. Check if we still have pending events | 612 | // 1. Check if we still have pending events |
| 416 | if !ctx.has_pending_events(identifier) { | 613 | if !ctx.has_pending_events(identifier) { |
| 417 | return Ok(SyncStepResult::NoPendingEvents); | 614 | return None; |
| 418 | } | 615 | } |
| 419 | 616 | ||
| 420 | // 2. Collect needed OIDs (fresh each step - may have changed) | 617 | // 2. Collect needed OIDs |
| 421 | let needed_oids = ctx.collect_needed_oids(identifier); | 618 | let needed_oids = ctx.collect_needed_oids(identifier); |
| 422 | if needed_oids.is_empty() { | 619 | if needed_oids.is_empty() { |
| 423 | // No OIDs needed - try to process events | 620 | // No OIDs needed - sync is complete |
| 424 | ctx.process_satisfiable_events(identifier).await?; | 621 | return None; |
| 425 | return Ok(SyncStepResult::Complete); | ||
| 426 | } | 622 | } |
| 427 | 623 | ||
| 428 | // 3. Get repository data (fresh each step - announcements may have arrived) | 624 | // 3. Get repository data |
| 429 | let db_repo_data = ctx.fetch_repository_data(identifier).await?; | 625 | let repo_data = match ctx.fetch_repository_data(identifier).await { |
| 626 | Ok(data) => data, | ||
| 627 | Err(_) => return None, | ||
| 628 | }; | ||
| 430 | 629 | ||
| 431 | // 4. Collect clone URLs, excluding our domain | 630 | // 4. Collect clone URLs, excluding our domain |
| 432 | let all_urls: Vec<String> = db_repo_data | 631 | let all_urls: Vec<String> = repo_data |
| 433 | .announcements | 632 | .announcements |
| 434 | .iter() | 633 | .iter() |
| 435 | .flat_map(|a| a.clone_urls.iter().cloned()) | 634 | .flat_map(|a| a.clone_urls.iter().cloned()) |
| @@ -438,132 +637,292 @@ pub async fn sync_identifier_step<C: SyncContext>( | |||
| 438 | .into_iter() | 637 | .into_iter() |
| 439 | .collect(); | 638 | .collect(); |
| 440 | 639 | ||
| 441 | if all_urls.is_empty() { | 640 | // 5. Group by domain |
| 442 | return Ok(SyncStepResult::NoMoreUrls); | 641 | let urls_by_domain: HashMap<String, Vec<String>> = all_urls |
| 642 | .iter() | ||
| 643 | .fold(HashMap::new(), |mut acc, url| { | ||
| 644 | if let Some(d) = extract_domain(url) { | ||
| 645 | acc.entry(d).or_default().push(url.clone()); | ||
| 646 | } | ||
| 647 | acc | ||
| 648 | }); | ||
| 649 | |||
| 650 | // 6. Find an available URL | ||
| 651 | match domain { | ||
| 652 | Some(specific_domain) => { | ||
| 653 | // Only look at URLs from this specific domain | ||
| 654 | urls_by_domain | ||
| 655 | .get(specific_domain) | ||
| 656 | .and_then(|urls| { | ||
| 657 | urls.iter() | ||
| 658 | .find(|url| !tried_urls.contains(*url)) | ||
| 659 | .cloned() | ||
| 660 | }) | ||
| 661 | } | ||
| 662 | None => { | ||
| 663 | // Try any non-throttled domain | ||
| 664 | for (d, domain_urls) in &urls_by_domain { | ||
| 665 | if throttle_manager.is_throttled(d) { | ||
| 666 | continue; | ||
| 667 | } | ||
| 668 | if let Some(url) = domain_urls.iter().find(|url| !tried_urls.contains(*url)) { | ||
| 669 | return Some(url.clone()); | ||
| 670 | } | ||
| 671 | } | ||
| 672 | None | ||
| 673 | } | ||
| 443 | } | 674 | } |
| 675 | } | ||
| 676 | |||
| 677 | /// Information about throttled domains with untried URLs | ||
| 678 | #[derive(Debug, Clone)] | ||
| 679 | pub struct ThrottledDomainInfo { | ||
| 680 | pub domain: String, | ||
| 681 | pub tried_urls_for_domain: HashSet<String>, | ||
| 682 | } | ||
| 683 | |||
| 684 | /// Get information about throttled domains that have untried URLs. | ||
| 685 | /// | ||
| 686 | /// Called by main sync loop to know which DomainThrottle queues to add the identifier to. | ||
| 687 | pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>( | ||
| 688 | ctx: &C, | ||
| 689 | identifier: &str, | ||
| 690 | tried_urls: &HashSet<String>, | ||
| 691 | throttle_manager: &ThrottleManager, | ||
| 692 | ) -> Vec<ThrottledDomainInfo> { | ||
| 693 | let repo_data = match ctx.fetch_repository_data(identifier).await { | ||
| 694 | Ok(data) => data, | ||
| 695 | Err(_) => return vec![], | ||
| 696 | }; | ||
| 697 | |||
| 698 | let all_urls: Vec<String> = repo_data | ||
| 699 | .announcements | ||
| 700 | .iter() | ||
| 701 | .flat_map(|a| a.clone_urls.iter().cloned()) | ||
| 702 | .filter(|url| ctx.our_domain().map_or(true, |d| !url.contains(d))) | ||
| 703 | .collect::<HashSet<_>>() | ||
| 704 | .into_iter() | ||
| 705 | .collect(); | ||
| 444 | 706 | ||
| 445 | // 5. Group by domain and find an available URL | ||
| 446 | let urls_by_domain: HashMap<String, Vec<String>> = all_urls | 707 | let urls_by_domain: HashMap<String, Vec<String>> = all_urls |
| 447 | .iter() | 708 | .iter() |
| 448 | .fold(HashMap::new(), |mut acc, url| { | 709 | .fold(HashMap::new(), |mut acc, url| { |
| 449 | acc.entry(extract_domain(url)).or_default().push(url.clone()); | 710 | if let Some(d) = extract_domain(url) { |
| 711 | acc.entry(d).or_default().push(url.clone()); | ||
| 712 | } | ||
| 450 | acc | 713 | acc |
| 451 | }); | 714 | }); |
| 452 | 715 | ||
| 453 | // 6. Try to get a URL from any domain that has capacity | 716 | urls_by_domain |
| 454 | let mut min_wait: Option<Duration> = None; | 717 | .into_iter() |
| 455 | 718 | .filter_map(|(domain, domain_urls)| { | |
| 456 | for (domain, domain_urls) in &urls_by_domain { | 719 | if !throttle_manager.is_throttled(&domain) { |
| 457 | if let Some(url) = throttle.get_next_url(domain, domain_urls, tried_urls) { | 720 | return None; // Not throttled, skip |
| 458 | // Found a URL to try! | 721 | } |
| 459 | let target_repo = match ctx.find_target_repo(&db_repo_data) { | ||
| 460 | Some(path) => path, | ||
| 461 | None => return Ok(SyncStepResult::NoMoreUrls), | ||
| 462 | }; | ||
| 463 | |||
| 464 | // Start the fetch | ||
| 465 | throttle.start_request(domain); | ||
| 466 | let oids_to_fetch: Vec<String> = needed_oids.iter().cloned().collect(); | ||
| 467 | let fetch_result = ctx.fetch_oids(&target_repo, &url, &oids_to_fetch).await; | ||
| 468 | throttle.complete_request(domain); | ||
| 469 | 722 | ||
| 470 | let oids_fetched = match fetch_result { | 723 | let untried: Vec<_> = domain_urls |
| 471 | Ok(fetched) => fetched.len(), | 724 | .iter() |
| 472 | Err(e) => { | 725 | .filter(|url| !tried_urls.contains(*url)) |
| 473 | tracing::debug!(url = %url, error = %e, "Fetch failed"); | 726 | .collect(); |
| 474 | 0 | ||
| 475 | } | ||
| 476 | }; | ||
| 477 | 727 | ||
| 478 | // Try to process any events that can now be satisfied | 728 | if untried.is_empty() { |
| 479 | if oids_fetched > 0 { | 729 | return None; // All URLs tried for this domain |
| 480 | let _ = ctx.process_satisfiable_events(identifier).await; | ||
| 481 | } | 730 | } |
| 482 | 731 | ||
| 483 | return Ok(SyncStepResult::TriedUrl { url, oids_fetched }); | 732 | // Collect tried URLs that belong to this domain |
| 484 | } else { | 733 | let tried_urls_for_domain: HashSet<String> = tried_urls |
| 485 | // Domain throttled or all URLs tried | 734 | .iter() |
| 486 | let untried_exist = domain_urls.iter().any(|u| !tried_urls.contains(u)); | 735 | .filter(|url| extract_domain(url).as_deref() == Some(&domain)) |
| 487 | if untried_exist { | 736 | .cloned() |
| 488 | // URLs exist but domain is throttled | 737 | .collect(); |
| 489 | if let Some(wait) = throttle.time_until_capacity(domain) { | 738 | |
| 490 | min_wait = Some(min_wait.map_or(wait, |m| m.min(wait))); | 739 | Some(ThrottledDomainInfo { |
| 491 | } | 740 | domain, |
| 492 | } | 741 | tried_urls_for_domain, |
| 742 | }) | ||
| 743 | }) | ||
| 744 | .collect() | ||
| 745 | } | ||
| 746 | ``` | ||
| 747 | |||
| 748 | ### sync_identifier_from_url | ||
| 749 | |||
| 750 | ```rust | ||
| 751 | /// Fetch git data from a specific URL for an identifier. | ||
| 752 | /// | ||
| 753 | /// This function: | ||
| 754 | /// 1. Records the request with the throttle manager | ||
| 755 | /// 2. Performs the actual git fetch | ||
| 756 | /// 3. Processes any events that can now be satisfied | ||
| 757 | /// 4. Records request completion | ||
| 758 | /// | ||
| 759 | /// Returns the number of OIDs successfully fetched. | ||
| 760 | pub async fn sync_identifier_from_url<C: SyncContext>( | ||
| 761 | ctx: &C, | ||
| 762 | identifier: &str, | ||
| 763 | url: &str, | ||
| 764 | throttle_manager: &ThrottleManager, | ||
| 765 | ) -> usize { | ||
| 766 | let domain = match extract_domain(url) { | ||
| 767 | Some(d) => d, | ||
| 768 | None => return 0, | ||
| 769 | }; | ||
| 770 | |||
| 771 | // Get repository data for target repo path | ||
| 772 | let repo_data = match ctx.fetch_repository_data(identifier).await { | ||
| 773 | Ok(data) => data, | ||
| 774 | Err(e) => { | ||
| 775 | tracing::debug!(identifier = %identifier, error = %e, "Failed to fetch repo data"); | ||
| 776 | return 0; | ||
| 493 | } | 777 | } |
| 778 | }; | ||
| 779 | |||
| 780 | let target_repo = match ctx.find_target_repo(&repo_data) { | ||
| 781 | Some(path) => path, | ||
| 782 | None => { | ||
| 783 | tracing::debug!(identifier = %identifier, "No target repo found"); | ||
| 784 | return 0; | ||
| 785 | } | ||
| 786 | }; | ||
| 787 | |||
| 788 | // Collect needed OIDs | ||
| 789 | let needed_oids: Vec<String> = ctx.collect_needed_oids(identifier).into_iter().collect(); | ||
| 790 | if needed_oids.is_empty() { | ||
| 791 | return 0; | ||
| 494 | } | 792 | } |
| 495 | 793 | ||
| 496 | // Check if all URLs have been tried | 794 | // Perform the fetch |
| 497 | let all_tried = all_urls.iter().all(|url| tried_urls.contains(url)); | 795 | throttle_manager.start_request(&domain); |
| 498 | if all_tried { | 796 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; |
| 499 | return Ok(SyncStepResult::NoMoreUrls); | 797 | throttle_manager.complete_request(&domain); |
| 798 | |||
| 799 | let oids_fetched = match fetch_result { | ||
| 800 | Ok(fetched) => { | ||
| 801 | tracing::debug!( | ||
| 802 | identifier = %identifier, | ||
| 803 | url = %url, | ||
| 804 | oids_fetched = fetched.len(), | ||
| 805 | "Fetch succeeded" | ||
| 806 | ); | ||
| 807 | fetched.len() | ||
| 808 | } | ||
| 809 | Err(e) => { | ||
| 810 | tracing::debug!( | ||
| 811 | identifier = %identifier, | ||
| 812 | url = %url, | ||
| 813 | error = %e, | ||
| 814 | "Fetch failed" | ||
| 815 | ); | ||
| 816 | 0 | ||
| 817 | } | ||
| 818 | }; | ||
| 819 | |||
| 820 | // Try to process any events that can now be satisfied | ||
| 821 | if oids_fetched > 0 { | ||
| 822 | if let Err(e) = ctx.process_satisfiable_events(identifier).await { | ||
| 823 | tracing::warn!( | ||
| 824 | identifier = %identifier, | ||
| 825 | error = %e, | ||
| 826 | "Failed to process satisfiable events" | ||
| 827 | ); | ||
| 828 | } | ||
| 500 | } | 829 | } |
| 501 | 830 | ||
| 502 | // Some URLs exist but all domains are throttled | 831 | oids_fetched |
| 503 | Ok(SyncStepResult::AllUrlsThrottled { | ||
| 504 | wait_duration: min_wait.unwrap_or(Duration::from_millis(100)), | ||
| 505 | }) | ||
| 506 | } | 832 | } |
| 507 | ``` | 833 | ``` |
| 508 | 834 | ||
| 509 | ### The Sync Identifier Loop | 835 | ### The Sync Identifier Loop (Main Sync) |
| 510 | 836 | ||
| 511 | ```rust | 837 | ```rust |
| 512 | /// Sync git data for an identifier. | 838 | /// Sync git data for an identifier. |
| 513 | /// | 839 | /// |
| 514 | /// Returns true if sync completed successfully (no more pending events), | 840 | /// This is called by the main sync loop. It: |
| 515 | /// false if we exhausted all options but events remain. | 841 | /// 1. Tries all non-throttled URLs |
| 842 | /// 2. Enqueues with throttled domains for later processing | ||
| 843 | /// 3. Returns without waiting for throttled domains | ||
| 844 | /// | ||
| 845 | /// Returns true if sync completed (no pending events or no OIDs needed), | ||
| 846 | /// false if events remain (will be retried after backoff). | ||
| 516 | pub async fn sync_identifier<C: SyncContext>( | 847 | pub async fn sync_identifier<C: SyncContext>( |
| 517 | ctx: &C, | 848 | ctx: &C, |
| 518 | identifier: &str, | 849 | identifier: &str, |
| 519 | throttle: &DomainThrottle, | 850 | throttle_manager: &ThrottleManager, |
| 520 | ) -> bool { | 851 | ) -> bool { |
| 521 | let mut tried_urls: HashSet<String> = HashSet::new(); | 852 | let mut tried_urls: HashSet<String> = HashSet::new(); |
| 522 | 853 | ||
| 854 | // Try all non-throttled URLs | ||
| 523 | loop { | 855 | loop { |
| 524 | match sync_identifier_step(ctx, identifier, &tried_urls, throttle).await { | 856 | match sync_identifier_next_url( |
| 525 | Ok(SyncStepResult::TriedUrl { url, oids_fetched }) => { | 857 | ctx, |
| 526 | tried_urls.insert(url.clone()); | 858 | identifier, |
| 527 | tracing::debug!( | 859 | None, // Any domain |
| 528 | identifier = %identifier, | 860 | &tried_urls, |
| 529 | url = %url, | 861 | throttle_manager, |
| 530 | oids_fetched = oids_fetched, | 862 | ).await { |
| 531 | "Tried URL" | 863 | Some(url) => { |
| 532 | ); | 864 | // Found a non-throttled URL to try |
| 533 | // Continue looping | 865 | sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await; |
| 534 | } | 866 | tried_urls.insert(url); |
| 535 | 867 | ||
| 536 | Ok(SyncStepResult::AllUrlsThrottled { wait_duration }) => { | 868 | // Check if sync is now complete |
| 537 | tracing::debug!( | 869 | if !ctx.has_pending_events(identifier) { |
| 538 | identifier = %identifier, | 870 | tracing::info!(identifier = %identifier, "Sync complete - no pending events"); |
| 539 | wait_ms = wait_duration.as_millis(), | 871 | return true; |
| 540 | "All domains throttled, waiting" | 872 | } |
| 541 | ); | 873 | |
| 542 | tokio::time::sleep(wait_duration).await; | 874 | let needed_oids = ctx.collect_needed_oids(identifier); |
| 543 | // Continue looping | 875 | if needed_oids.is_empty() { |
| 544 | } | 876 | // Process any remaining satisfiable events |
| 545 | 877 | let _ = ctx.process_satisfiable_events(identifier).await; | |
| 546 | Ok(SyncStepResult::NoMoreUrls) => { | 878 | tracing::info!(identifier = %identifier, "Sync complete - all OIDs available"); |
| 547 | tracing::debug!(identifier = %identifier, "No more URLs to try"); | 879 | return true; |
| 548 | return false; // Events remain but no URLs left | 880 | } |
| 549 | } | 881 | |
| 550 | 882 | // Continue trying more URLs | |
| 551 | Ok(SyncStepResult::Complete) => { | ||
| 552 | tracing::info!(identifier = %identifier, "Sync complete"); | ||
| 553 | return true; | ||
| 554 | } | ||
| 555 | |||
| 556 | Ok(SyncStepResult::NoPendingEvents) => { | ||
| 557 | tracing::debug!(identifier = %identifier, "No pending events"); | ||
| 558 | return true; | ||
| 559 | } | 883 | } |
| 560 | 884 | None => { | |
| 561 | Err(e) => { | 885 | // No more non-throttled URLs available |
| 562 | tracing::warn!(identifier = %identifier, error = %e, "Sync step error"); | 886 | break; |
| 563 | return false; | ||
| 564 | } | 887 | } |
| 565 | } | 888 | } |
| 566 | } | 889 | } |
| 890 | |||
| 891 | // Check if we're done (no pending events or no needed OIDs) | ||
| 892 | if !ctx.has_pending_events(identifier) { | ||
| 893 | return true; | ||
| 894 | } | ||
| 895 | |||
| 896 | let needed_oids = ctx.collect_needed_oids(identifier); | ||
| 897 | if needed_oids.is_empty() { | ||
| 898 | let _ = ctx.process_satisfiable_events(identifier).await; | ||
| 899 | return true; | ||
| 900 | } | ||
| 901 | |||
| 902 | // Enqueue with any throttled domains that have untried URLs | ||
| 903 | let throttled_domains = get_throttled_domains_with_untried_urls( | ||
| 904 | ctx, | ||
| 905 | identifier, | ||
| 906 | &tried_urls, | ||
| 907 | throttle_manager, | ||
| 908 | ).await; | ||
| 909 | |||
| 910 | for info in throttled_domains { | ||
| 911 | tracing::debug!( | ||
| 912 | identifier = %identifier, | ||
| 913 | domain = %info.domain, | ||
| 914 | "Enqueueing with throttled domain" | ||
| 915 | ); | ||
| 916 | throttle_manager.enqueue_identifier( | ||
| 917 | &info.domain, | ||
| 918 | identifier.to_string(), | ||
| 919 | info.tried_urls_for_domain, | ||
| 920 | ); | ||
| 921 | } | ||
| 922 | |||
| 923 | // Return false - events remain, will retry after backoff | ||
| 924 | // (throttled domains will process independently) | ||
| 925 | false | ||
| 567 | } | 926 | } |
| 568 | ``` | 927 | ``` |
| 569 | 928 | ||
| @@ -576,7 +935,7 @@ impl Purgatory { | |||
| 576 | database: SharedDatabase, | 935 | database: SharedDatabase, |
| 577 | our_domain: Option<String>, | 936 | our_domain: Option<String>, |
| 578 | local_relay: Option<nostr_relay_builder::LocalRelay>, | 937 | local_relay: Option<nostr_relay_builder::LocalRelay>, |
| 579 | throttle: Arc<DomainThrottle>, | 938 | throttle_manager: Arc<ThrottleManager>, |
| 580 | ) -> tokio::task::JoinHandle<()> { | 939 | ) -> tokio::task::JoinHandle<()> { |
| 581 | tokio::spawn(async move { | 940 | tokio::spawn(async move { |
| 582 | let mut interval = tokio::time::interval(Duration::from_secs(1)); | 941 | let mut interval = tokio::time::interval(Duration::from_secs(1)); |
| @@ -613,7 +972,7 @@ impl Purgatory { | |||
| 613 | let db = database.clone(); | 972 | let db = database.clone(); |
| 614 | let domain = our_domain.clone(); | 973 | let domain = our_domain.clone(); |
| 615 | let relay = local_relay.clone(); | 974 | let relay = local_relay.clone(); |
| 616 | let throttle = throttle.clone(); | 975 | let throttle_manager = throttle_manager.clone(); |
| 617 | let id = identifier.clone(); | 976 | let id = identifier.clone(); |
| 618 | 977 | ||
| 619 | tokio::spawn(async move { | 978 | tokio::spawn(async move { |
| @@ -625,13 +984,14 @@ impl Purgatory { | |||
| 625 | relay, | 984 | relay, |
| 626 | ); | 985 | ); |
| 627 | 986 | ||
| 628 | let complete = sync_identifier(&ctx, &id, &throttle).await; | 987 | let complete = sync_identifier(&ctx, &id, &throttle_manager).await; |
| 629 | 988 | ||
| 630 | if complete || !purgatory.has_pending_events(&id) { | 989 | if complete || !purgatory.has_pending_events(&id) { |
| 631 | purgatory.sync_queue.remove(&id); | 990 | purgatory.sync_queue.remove(&id); |
| 632 | tracing::info!(identifier = %id, "Removed from sync queue"); | 991 | tracing::info!(identifier = %id, "Removed from sync queue"); |
| 633 | } else { | 992 | } else { |
| 634 | // Apply backoff | 993 | // Apply backoff - will retry later |
| 994 | // (throttled domains are being processed independently) | ||
| 635 | if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { | 995 | if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { |
| 636 | entry.on_sync_complete(); | 996 | entry.on_sync_complete(); |
| 637 | } | 997 | } |
| @@ -667,7 +1027,6 @@ mod tests { | |||
| 667 | #[async_trait] | 1027 | #[async_trait] |
| 668 | impl SyncContext for MockSyncContext { | 1028 | impl SyncContext for MockSyncContext { |
| 669 | async fn fetch_repository_data(&self, _id: &str) -> Result<RepositoryData> { | 1029 | async fn fetch_repository_data(&self, _id: &str) -> Result<RepositoryData> { |
| 670 | // Return mock data with our available_urls | ||
| 671 | Ok(RepositoryData { | 1030 | Ok(RepositoryData { |
| 672 | announcements: vec![MockAnnouncement { | 1031 | announcements: vec![MockAnnouncement { |
| 673 | clone_urls: self.available_urls.clone(), | 1032 | clone_urls: self.available_urls.clone(), |
| @@ -686,7 +1045,6 @@ mod tests { | |||
| 686 | } | 1045 | } |
| 687 | 1046 | ||
| 688 | async fn fetch_oids(&self, _path: &Path, url: &str, _oids: &[String]) -> Result<Vec<String>> { | 1047 | async fn fetch_oids(&self, _path: &Path, url: &str, _oids: &[String]) -> Result<Vec<String>> { |
| 689 | // Return pre-configured fetch result for this URL | ||
| 690 | Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) | 1048 | Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) |
| 691 | } | 1049 | } |
| 692 | 1050 | ||
| @@ -709,67 +1067,102 @@ mod tests { | |||
| 709 | } | 1067 | } |
| 710 | 1068 | ||
| 711 | #[tokio::test] | 1069 | #[tokio::test] |
| 712 | async fn test_sync_step_no_pending_events() { | 1070 | async fn test_next_url_no_pending_events() { |
| 713 | let ctx = MockSyncContext { | 1071 | let ctx = MockSyncContext { |
| 714 | pending_events: RefCell::new(false), | 1072 | pending_events: RefCell::new(false), |
| 1073 | needed_oids: RefCell::new(HashSet::new()), | ||
| 1074 | available_urls: vec!["https://example.com/repo.git".to_string()], | ||
| 715 | ..Default::default() | 1075 | ..Default::default() |
| 716 | }; | 1076 | }; |
| 717 | let throttle = DomainThrottle::new(5, 30); | 1077 | let throttle_manager = ThrottleManager::new(5, 30); |
| 718 | let tried = HashSet::new(); | 1078 | let tried = HashSet::new(); |
| 719 | 1079 | ||
| 720 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); | 1080 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; |
| 721 | assert_eq!(result, SyncStepResult::NoPendingEvents); | 1081 | assert!(result.is_none()); |
| 722 | } | 1082 | } |
| 723 | 1083 | ||
| 724 | #[tokio::test] | 1084 | #[tokio::test] |
| 725 | async fn test_sync_step_no_oids_needed() { | 1085 | async fn test_next_url_no_oids_needed() { |
| 726 | let ctx = MockSyncContext { | 1086 | let ctx = MockSyncContext { |
| 727 | pending_events: RefCell::new(true), | 1087 | pending_events: RefCell::new(true), |
| 728 | needed_oids: RefCell::new(HashSet::new()), // Empty = no OIDs needed | 1088 | needed_oids: RefCell::new(HashSet::new()), // Empty = no OIDs needed |
| 1089 | available_urls: vec!["https://example.com/repo.git".to_string()], | ||
| 729 | ..Default::default() | 1090 | ..Default::default() |
| 730 | }; | 1091 | }; |
| 731 | let throttle = DomainThrottle::new(5, 30); | 1092 | let throttle_manager = ThrottleManager::new(5, 30); |
| 732 | let tried = HashSet::new(); | 1093 | let tried = HashSet::new(); |
| 733 | 1094 | ||
| 734 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); | 1095 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; |
| 735 | assert_eq!(result, SyncStepResult::Complete); | 1096 | assert!(result.is_none()); // No URL needed, sync is complete |
| 736 | assert_eq!(*ctx.processed_count.borrow(), 1); | ||
| 737 | } | 1097 | } |
| 738 | 1098 | ||
| 739 | #[tokio::test] | 1099 | #[tokio::test] |
| 740 | async fn test_sync_step_tries_url() { | 1100 | async fn test_next_url_returns_non_throttled() { |
| 741 | let mut needed = HashSet::new(); | 1101 | let mut needed = HashSet::new(); |
| 742 | needed.insert("abc123".to_string()); | 1102 | needed.insert("abc123".to_string()); |
| 743 | 1103 | ||
| 744 | let mut fetch_results = HashMap::new(); | ||
| 745 | fetch_results.insert( | ||
| 746 | "https://example.com/repo.git".to_string(), | ||
| 747 | vec!["abc123".to_string()], | ||
| 748 | ); | ||
| 749 | |||
| 750 | let ctx = MockSyncContext { | 1104 | let ctx = MockSyncContext { |
| 751 | pending_events: RefCell::new(true), | 1105 | pending_events: RefCell::new(true), |
| 752 | needed_oids: RefCell::new(needed), | 1106 | needed_oids: RefCell::new(needed), |
| 753 | available_urls: vec!["https://example.com/repo.git".to_string()], | 1107 | available_urls: vec!["https://example.com/repo.git".to_string()], |
| 754 | fetch_results: RefCell::new(fetch_results), | 1108 | ..Default::default() |
| 755 | processed_count: RefCell::new(0), | ||
| 756 | }; | 1109 | }; |
| 757 | let throttle = DomainThrottle::new(5, 30); | 1110 | let throttle_manager = ThrottleManager::new(5, 30); |
| 758 | let tried = HashSet::new(); | 1111 | let tried = HashSet::new(); |
| 759 | 1112 | ||
| 760 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); | 1113 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; |
| 1114 | assert_eq!(result, Some("https://example.com/repo.git".to_string())); | ||
| 1115 | } | ||
| 1116 | |||
| 1117 | #[tokio::test] | ||
| 1118 | async fn test_next_url_skips_tried() { | ||
| 1119 | let mut needed = HashSet::new(); | ||
| 1120 | needed.insert("abc123".to_string()); | ||
| 761 | 1121 | ||
| 762 | match result { | 1122 | let ctx = MockSyncContext { |
| 763 | SyncStepResult::TriedUrl { url, oids_fetched } => { | 1123 | pending_events: RefCell::new(true), |
| 764 | assert_eq!(url, "https://example.com/repo.git"); | 1124 | needed_oids: RefCell::new(needed), |
| 765 | assert_eq!(oids_fetched, 1); | 1125 | available_urls: vec![ |
| 766 | } | 1126 | "https://example.com/repo.git".to_string(), |
| 767 | _ => panic!("Expected TriedUrl, got {:?}", result), | 1127 | "https://other.com/repo.git".to_string(), |
| 768 | } | 1128 | ], |
| 1129 | ..Default::default() | ||
| 1130 | }; | ||
| 1131 | let throttle_manager = ThrottleManager::new(5, 30); | ||
| 1132 | |||
| 1133 | let mut tried = HashSet::new(); | ||
| 1134 | tried.insert("https://example.com/repo.git".to_string()); | ||
| 1135 | |||
| 1136 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; | ||
| 1137 | assert_eq!(result, Some("https://other.com/repo.git".to_string())); | ||
| 1138 | } | ||
| 1139 | |||
| 1140 | #[tokio::test] | ||
| 1141 | async fn test_next_url_specific_domain() { | ||
| 1142 | let mut needed = HashSet::new(); | ||
| 1143 | needed.insert("abc123".to_string()); | ||
| 1144 | |||
| 1145 | let ctx = MockSyncContext { | ||
| 1146 | pending_events: RefCell::new(true), | ||
| 1147 | needed_oids: RefCell::new(needed), | ||
| 1148 | available_urls: vec![ | ||
| 1149 | "https://example.com/repo.git".to_string(), | ||
| 1150 | "https://other.com/repo.git".to_string(), | ||
| 1151 | ], | ||
| 1152 | ..Default::default() | ||
| 1153 | }; | ||
| 1154 | let throttle_manager = ThrottleManager::new(5, 30); | ||
| 1155 | let tried = HashSet::new(); | ||
| 1156 | |||
| 1157 | // Request specific domain | ||
| 1158 | let result = sync_identifier_next_url( | ||
| 1159 | &ctx, "test", Some("other.com"), &tried, &throttle_manager | ||
| 1160 | ).await; | ||
| 1161 | assert_eq!(result, Some("https://other.com/repo.git".to_string())); | ||
| 769 | } | 1162 | } |
| 770 | 1163 | ||
| 771 | #[tokio::test] | 1164 | #[tokio::test] |
| 772 | async fn test_sync_step_all_urls_tried() { | 1165 | async fn test_next_url_none_when_all_tried() { |
| 773 | let mut needed = HashSet::new(); | 1166 | let mut needed = HashSet::new(); |
| 774 | needed.insert("abc123".to_string()); | 1167 | needed.insert("abc123".to_string()); |
| 775 | 1168 | ||
| @@ -777,60 +1170,58 @@ mod tests { | |||
| 777 | pending_events: RefCell::new(true), | 1170 | pending_events: RefCell::new(true), |
| 778 | needed_oids: RefCell::new(needed), | 1171 | needed_oids: RefCell::new(needed), |
| 779 | available_urls: vec!["https://example.com/repo.git".to_string()], | 1172 | available_urls: vec!["https://example.com/repo.git".to_string()], |
| 780 | fetch_results: RefCell::new(HashMap::new()), | 1173 | ..Default::default() |
| 781 | processed_count: RefCell::new(0), | ||
| 782 | }; | 1174 | }; |
| 783 | let throttle = DomainThrottle::new(5, 30); | 1175 | let throttle_manager = ThrottleManager::new(5, 30); |
| 784 | 1176 | ||
| 785 | // Mark the only URL as tried | ||
| 786 | let mut tried = HashSet::new(); | 1177 | let mut tried = HashSet::new(); |
| 787 | tried.insert("https://example.com/repo.git".to_string()); | 1178 | tried.insert("https://example.com/repo.git".to_string()); |
| 788 | 1179 | ||
| 789 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); | 1180 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; |
| 790 | assert_eq!(result, SyncStepResult::NoMoreUrls); | 1181 | assert!(result.is_none()); |
| 791 | } | 1182 | } |
| 792 | 1183 | ||
| 793 | #[tokio::test] | 1184 | #[tokio::test] |
| 794 | async fn test_sync_step_domain_throttled() { | 1185 | async fn test_from_url_fetches_and_processes() { |
| 795 | let mut needed = HashSet::new(); | 1186 | let mut needed = HashSet::new(); |
| 796 | needed.insert("abc123".to_string()); | 1187 | needed.insert("abc123".to_string()); |
| 797 | 1188 | ||
| 1189 | let mut fetch_results = HashMap::new(); | ||
| 1190 | fetch_results.insert( | ||
| 1191 | "https://example.com/repo.git".to_string(), | ||
| 1192 | vec!["abc123".to_string()], | ||
| 1193 | ); | ||
| 1194 | |||
| 798 | let ctx = MockSyncContext { | 1195 | let ctx = MockSyncContext { |
| 799 | pending_events: RefCell::new(true), | 1196 | pending_events: RefCell::new(true), |
| 800 | needed_oids: RefCell::new(needed), | 1197 | needed_oids: RefCell::new(needed), |
| 801 | available_urls: vec!["https://example.com/repo.git".to_string()], | 1198 | available_urls: vec!["https://example.com/repo.git".to_string()], |
| 802 | fetch_results: RefCell::new(HashMap::new()), | 1199 | fetch_results: RefCell::new(fetch_results), |
| 803 | processed_count: RefCell::new(0), | 1200 | processed_count: RefCell::new(0), |
| 804 | }; | 1201 | }; |
| 1202 | let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); | ||
| 805 | 1203 | ||
| 806 | // Create throttle with 0 concurrent limit | 1204 | let oids_fetched = sync_identifier_from_url( |
| 807 | let throttle = DomainThrottle::new(0, 30); | 1205 | &ctx, "test", "https://example.com/repo.git", &throttle_manager |
| 808 | let tried = HashSet::new(); | 1206 | ).await; |
| 809 | |||
| 810 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); | ||
| 811 | 1207 | ||
| 812 | match result { | 1208 | assert_eq!(oids_fetched, 1); |
| 813 | SyncStepResult::AllUrlsThrottled { .. } => {} | 1209 | assert_eq!(*ctx.processed_count.borrow(), 1); |
| 814 | _ => panic!("Expected AllUrlsThrottled, got {:?}", result), | ||
| 815 | } | ||
| 816 | } | 1210 | } |
| 817 | 1211 | ||
| 818 | #[tokio::test] | 1212 | #[tokio::test] |
| 819 | async fn test_full_sync_loop() { | 1213 | async fn test_full_sync_with_throttled_domains() { |
| 820 | let mut needed = HashSet::new(); | 1214 | let mut needed = HashSet::new(); |
| 821 | needed.insert("abc123".to_string()); | 1215 | needed.insert("abc123".to_string()); |
| 822 | needed.insert("def456".to_string()); | ||
| 823 | 1216 | ||
| 824 | let mut fetch_results = HashMap::new(); | 1217 | let mut fetch_results = HashMap::new(); |
| 825 | // First URL returns one OID | ||
| 826 | fetch_results.insert( | 1218 | fetch_results.insert( |
| 827 | "https://server1.com/repo.git".to_string(), | 1219 | "https://server1.com/repo.git".to_string(), |
| 828 | vec!["abc123".to_string()], | 1220 | vec![], // First server doesn't have the OID |
| 829 | ); | 1221 | ); |
| 830 | // Second URL returns the other | ||
| 831 | fetch_results.insert( | 1222 | fetch_results.insert( |
| 832 | "https://server2.com/repo.git".to_string(), | 1223 | "https://server2.com/repo.git".to_string(), |
| 833 | vec!["def456".to_string()], | 1224 | vec!["abc123".to_string()], // Second server has it |
| 834 | ); | 1225 | ); |
| 835 | 1226 | ||
| 836 | let ctx = MockSyncContext { | 1227 | let ctx = MockSyncContext { |
| @@ -844,15 +1235,63 @@ mod tests { | |||
| 844 | processed_count: RefCell::new(0), | 1235 | processed_count: RefCell::new(0), |
| 845 | }; | 1236 | }; |
| 846 | 1237 | ||
| 847 | // Simulate OIDs being removed as they're fetched | 1238 | let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); |
| 848 | // (In real code, collect_needed_oids would return fewer OIDs) | 1239 | |
| 1240 | // Manually throttle server2.com to test enqueueing | ||
| 1241 | // (In real code, this would happen due to rate limits) | ||
| 1242 | // For this test, we just verify the sync tries available URLs | ||
| 849 | 1243 | ||
| 850 | let throttle = DomainThrottle::new(5, 30); | 1244 | let complete = sync_identifier(&ctx, "test", &throttle_manager).await; |
| 851 | let complete = sync_identifier(&ctx, "test", &throttle).await; | ||
| 852 | 1245 | ||
| 853 | // Should have tried both URLs | 1246 | // Should have processed events (found OID from server2) |
| 854 | assert!(*ctx.processed_count.borrow() >= 1); | 1247 | assert!(*ctx.processed_count.borrow() >= 1); |
| 855 | } | 1248 | } |
| 1249 | |||
| 1250 | #[tokio::test] | ||
| 1251 | async fn test_domain_throttle_queue_round_robin() { | ||
| 1252 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | ||
| 1253 | |||
| 1254 | // Enqueue three identifiers | ||
| 1255 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | ||
| 1256 | throttle.enqueue_identifier("id2".to_string(), HashSet::new()); | ||
| 1257 | throttle.enqueue_identifier("id3".to_string(), HashSet::new()); | ||
| 1258 | |||
| 1259 | // Should get them in round-robin order | ||
| 1260 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | ||
| 1261 | throttle.mark_identifier_not_in_progress("id1"); | ||
| 1262 | |||
| 1263 | assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string())); | ||
| 1264 | throttle.mark_identifier_not_in_progress("id2"); | ||
| 1265 | |||
| 1266 | assert_eq!(throttle.next_ready_identifier(), Some("id3".to_string())); | ||
| 1267 | throttle.mark_identifier_not_in_progress("id3"); | ||
| 1268 | |||
| 1269 | // Back to id1 | ||
| 1270 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | ||
| 1271 | } | ||
| 1272 | |||
| 1273 | #[tokio::test] | ||
| 1274 | async fn test_domain_throttle_skips_in_progress() { | ||
| 1275 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | ||
| 1276 | |||
| 1277 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | ||
| 1278 | throttle.enqueue_identifier("id2".to_string(), HashSet::new()); | ||
| 1279 | |||
| 1280 | // Get id1 (marks it in_progress) | ||
| 1281 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | ||
| 1282 | |||
| 1283 | // Next should skip id1 and return id2 | ||
| 1284 | assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string())); | ||
| 1285 | |||
| 1286 | // Both in progress, should return None | ||
| 1287 | assert_eq!(throttle.next_ready_identifier(), None); | ||
| 1288 | |||
| 1289 | // Mark id1 not in progress | ||
| 1290 | throttle.mark_identifier_not_in_progress("id1"); | ||
| 1291 | |||
| 1292 | // Now id1 should be available again | ||
| 1293 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | ||
| 1294 | } | ||
| 856 | } | 1295 | } |
| 857 | ``` | 1296 | ``` |
| 858 | 1297 | ||
| @@ -866,11 +1305,12 @@ mod tests { | |||
| 866 | 1305 | ||
| 867 | ## Migration Path | 1306 | ## Migration Path |
| 868 | 1307 | ||
| 869 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle, SyncContext trait) | 1308 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) |
| 870 | 2. **Phase 2**: Implement `sync_identifier_step` with unit tests | 1309 | 2. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests |
| 871 | 3. **Phase 3**: Implement main sync loop alongside existing `start_state_sync` | 1310 | 3. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` |
| 872 | 4. **Phase 4**: Add PR event syncing | 1311 | 4. **Phase 4**: Implement ThrottleManager domain loops |
| 873 | 5. **Phase 5**: Remove old `start_state_sync` code | 1312 | 5. **Phase 5**: Add PR event syncing |
| 1313 | 6. **Phase 6**: Remove old `start_state_sync` code | ||
| 874 | 1314 | ||
| 875 | ## Configuration | 1315 | ## Configuration |
| 876 | 1316 | ||