upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/purgatory-sync-redesign.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/explanation/purgatory-sync-redesign.md')
-rw-r--r--docs/explanation/purgatory-sync-redesign.md1959
1 files changed, 0 insertions, 1959 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md
deleted file mode 100644
index 629c2ff..0000000
--- a/docs/explanation/purgatory-sync-redesign.md
+++ /dev/null
@@ -1,1959 +0,0 @@
1# Purgatory Sync Redesign
2
3## Status
4
5**Proposed** - January 2026
6
7## Context
8
9The current purgatory sync implementation (`start_state_sync` at `src/purgatory/mod.rs:510`) has several limitations:
10
111. **Per-event syncing**: Each state event triggers its own independent sync operation
122. **No PR event syncing**: PR events enter purgatory but don't trigger git data fetching
133. **No batching**: Multiple events for the same repository cause redundant fetch requests
144. **No rate limiting**: Can overwhelm remote git servers or get rate-limited
155. **No coordination**: Multiple concurrent syncs may fetch the same OIDs
16
17When syncing a new repository, we often receive multiple state and PR events in a burst. The current approach creates unnecessary load on remote servers and doesn't handle this common case efficiently.
18
19## Decision
20
21Redesign purgatory sync to be **identifier-based** rather than **event-based**, with:
22
231. A background sync loop that processes identifiers, not individual events
242. Batched OID fetching across all purgatory events for an identifier
253. Domain-based throttling (30 requests/minute per domain)
264. Exponential backoff per identifier (20s → 2m, then 2m intervals)
275. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default)
286. **Clean separation of concerns**: Domain throttle handles rate limiting only; sync logic tracks its own tried URLs
29
30### Key Design Decision: Where Does OID Copying Happen?
31
32**Answer: In `process_newly_available_git_data`, NOT after the entire sync completes.**
33
34The current implementation (`sync_state_git_data`) fetches all OIDs first, then at the end:
351. Copies OIDs to all authorized owner repos
362. Aligns refs with state
373. Saves to database
384. Notifies subscribers
395. Removes from purgatory
40
41The redesign moves all of this into `process_newly_available_git_data`, which is called after **each successful URL fetch**. This enables:
42
43| Aspect | Current (end-of-sync) | Redesign (per-fetch) |
44|--------|----------------------|---------------------|
45| **When events release** | Only after all URLs tried | As soon as OIDs available |
46| **Partial success** | All or nothing per event | Events release independently |
47| **Multiple state events** | All wait for slowest | Each releases when ready |
48| **Authorization check** | Once at start | At release time (handles changes) |
49
50**Why this matters:**
51
52Consider syncing an identifier with 3 state events from different maintainers:
53- State A needs OIDs from `server1.com` (fast)
54- State B needs OIDs from `server2.com` (slow)
55- State C needs OIDs from `server3.com` (down)
56
57With the redesign:
581. Fetch from `server1.com` succeeds → `process_newly_available_git_data` releases State A immediately
592. Fetch from `server2.com` succeeds → `process_newly_available_git_data` releases State B
603. Fetch from `server3.com` fails → State C stays in purgatory, retries with backoff
61
62The current implementation would wait for all servers before releasing any events.
63
64### Unified Processing with Git Push Handler
65
66**Key insight**: The post-git-data-available processing is identical whether data arrives via:
67- A successful `git push` (handle_receive_pack)
68- Purgatory sync fetching OIDs from remote servers
69
70Both paths need to:
711. Discover satisfiable events from purgatory
722. Sync OIDs to authorized owner repos
733. Align refs (+ set HEAD)
744. Save events to database
755. Notify WebSocket subscribers
766. Remove from purgatory
77
78Rather than duplicate this logic, we use a single unified function `process_newly_available_git_data` that handles all post-git-data-available processing. See [Unified Git Data Sync](unify-git-data-sync.md) for the complete design.
79
80This means:
81- **`handle_receive_pack`** calls `process_newly_available_git_data` after git push succeeds
82- **`sync_identifier_from_url`** calls `process_newly_available_git_data` after OID fetch succeeds
83- **Same behavior** regardless of how git data arrived
84
85## Architecture
86
87### Overview
88
89```
90┌──────────────────────────────────────────────────────────────────────────────────┐
91│ Purgatory │
92│ │
93│ ┌─────────────────┐ ┌─────────────────┐ │
94│ │ State Events │ │ PR Events │ │
95│ │ (by identifier)│ │ (by event_id) │ │
96│ └────────┬────────┘ └────────┬────────┘ │
97│ │ │ │
98│ └──────────┬─────────┘ │
99│ │ add_state() / add_pr() / trigger_immediate_sync() │
100│ ▼ │
101│ ┌──────────────────────────┐ │
102│ │ Sync Queue │ │
103│ │ DashMap<id, Entry> │ │
104│ │ │ │
105│ │ Entry { │ │
106│ │ next_attempt, │ ← delay/backoff timer │
107│ │ attempt_count, │ ← for backoff calculation │
108│ │ in_progress, │ ← prevents concurrent runs │
109│ │ } │ │
110│ └────────────┬─────────────┘ │
111│ │ │
112│ ┌─────────────────────┼──────────────────────────────────────────────────────┐ │
113│ │ ▼ │ │
114│ │ ┌─────────────────────┐ │ │
115│ │ │ Main Sync Loop │ (every 1s) │ │
116│ │ │ │ │ │
117│ │ │ 1. Find ALL ready │ │ │
118│ │ │ identifiers │ │ │
119│ │ │ 2. Spawn parallel │───────┐ │ │
120│ │ │ tasks for each │ │ (parallel tasks) │ │
121│ │ │ 3. Apply backoff │ │ │ │
122│ │ │ when done │ │ │ │
123│ │ └─────────────────────┘ │ │ │
124│ │ ▼ │ │
125│ │ ┌──────────────────────────────────────────┐ │ │
126│ │ │ sync_identifier() │ │ │
127│ │ │ │ │ │
128│ │ │ Owns its own tried_urls: HashSet │ │ │
129│ │ │ │ │ │
130│ │ │ loop: │ │ │
131│ │ │ url = sync_identifier_next_url( │ │ │
132│ │ │ domain=None) │ │ │
133│ │ │ if url is Some: │ │ │
134│ │ │ sync_identifier_from_url(url) │ │ │
135│ │ │ tried_urls.insert(url) │ │ │
136│ │ │ else: │ │ │
137│ │ │ break (no non-throttled URLs left) │ │ │
138│ │ │ │ │ │
139│ │ │ Enqueue throttled domains then return │ │ │
140│ │ └──────────────────────────────────────────┘ │ │
141│ │ │ │ │
142│ │ │ enqueue_identifier() │ │
143│ │ ▼ │ │
144│ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │
145│ │ │ ThrottleManager │ │ │
146│ │ │ │ │ │
147│ │ │ DashMap<domain, DomainThrottle> │ │ │
148│ │ │ │ │ │
149│ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │
150│ │ │ │ DomainThrottle (per domain) │ │ │ │
151│ │ │ │ │ │ │ │
152│ │ │ │ Rate limiting: │ Queue (IndexMap for ordering): │ │ │ │
153│ │ │ │ - in_flight: u32 │ - queue: IndexMap<id, State> │ │ │ │
154│ │ │ │ - request_times │ - State: tried_urls, │ │ │ │
155│ │ │ │ - round_robin_index │ in_progress │ │ │ │
156│ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │
157│ │ │ │ │ │
158│ │ │ Trigger-based processing (no polling loop): │ │ │
159│ │ │ - enqueue_identifier() triggers if capacity available │ │ │
160│ │ │ - complete_request() triggers next item if capacity available │ │ │
161│ │ │ │ │ │
162│ │ │ process_queued_identifier(): │ │ │
163│ │ │ 1. Pick next identifier (round-robin, not in_progress) │ │ │
164│ │ │ 2. url = sync_identifier_next_url(domain=Some(this_domain)) │ │ │
165│ │ │ 3. If url: sync_identifier_from_url(url), mark tried │ │ │
166│ │ │ Else: remove identifier from queue, try next │ │ │
167│ │ └─────────────────────────────────────────────────────────────────────┘ │ │
168│ │ │ │
169│ └────────────────────────────────────────────────────────────────────────────┘ │
170└───────────────────────────────────────────────────────────────────────────────────┘
171```
172
173### Key Design Principles
174
175**1. Two Independent Execution Paths**
176
177The main sync loop and DomainThrottle loops run independently:
178- **Main sync**: Tries non-throttled URLs, completes quickly, applies backoff, retries later
179- **DomainThrottle**: Processes queued identifiers when capacity frees, doesn't block main sync
180
181**2. Two Separate tried_urls Tracking**
182
183Each path tracks its own tried URLs:
184- **sync_identifier**: Local `HashSet<String>` for current attempt (all domains)
185- **DomainThrottle**: Per-identifier `HashSet<String>` for URLs tried via throttle (this domain only)
186
187These don't need to merge because:
188- Main sync skips throttled domains anyway
189- DomainThrottle only processes its own domain's URLs
190
191**3. Shared Functions**
192
193Both paths use the same core functions:
194- **`sync_identifier_next_url`**: Pure URL selection logic
195- **`sync_identifier_from_url`**: Pure fetch logic
196
197The `domain` parameter determines behavior:
198- `None`: Return any non-throttled URL
199- `Some(domain)`: Return URL from that specific domain only
200
201### Flow Summary
202
2031. **Event arrives** → added to state_events/pr_events + sync_queue with delay
204 - User-submitted: 3 minute delay (expect git push to follow)
205 - Sync-triggered: 500ms delay (batch burst arrivals)
206 - `enqueue_sync()` resets `attempt_count` to 0 and updates `next_attempt` if needed
207
2082. **Main sync loop** (every 1s):
209 - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`)
210 - Spawns parallel tasks for each (marks `in_progress = true`)
211 - Each `sync_identifier()` task:
212 - Creates fresh `tried_urls: HashSet<String>`
213 - Loops calling `sync_identifier_next_url(domain=None)` + `sync_identifier_from_url`
214 - When no non-throttled URLs remain: enqueue with throttled domains, return
215 - When task completes: apply backoff or remove from queue
216
2173. **ThrottleManager / DomainThrottle** (trigger-based, no polling):
218 - Processing triggered by `enqueue_identifier()` or `complete_request()`
219 - When triggered and capacity available: pick next queued identifier (round-robin, not in_progress)
220 - Call `sync_identifier_next_url(domain=Some(this_domain))`
221 - If URL returned: call `sync_identifier_from_url`, mark URL tried, mark not in_progress
222 - If no URL: remove identifier from queue, try next identifier
223
224## Data Structures
225
226### SyncQueueEntry
227
228Tracks sync state for each identifier in the main sync queue:
229
230```rust
231/// Entry in the sync queue tracking when/how to sync an identifier
232#[derive(Debug, Clone)]
233pub struct SyncQueueEntry {
234 /// Don't attempt sync before this time
235 pub next_attempt: Instant,
236
237 /// Number of sync attempts (for backoff calculation)
238 /// Reset to 0 when new event arrives for this identifier
239 pub attempt_count: u32,
240
241 /// Whether a sync is currently in progress for this identifier
242 pub in_progress: bool,
243}
244
245impl SyncQueueEntry {
246 pub fn new(delay: Duration) -> Self {
247 Self {
248 next_attempt: Instant::now() + delay,
249 attempt_count: 0,
250 in_progress: false,
251 }
252 }
253
254 /// Calculate backoff: 20s, 40s, 80s, 120s (capped at 2min)
255 pub fn backoff(&self) -> Duration {
256 let base = Duration::from_secs(20);
257 let multiplier = 2u32.saturating_pow(self.attempt_count.saturating_sub(1).min(3));
258 (base * multiplier).min(Duration::from_secs(120))
259 }
260
261 pub fn is_ready(&self) -> bool {
262 !self.in_progress && Instant::now() >= self.next_attempt
263 }
264
265 /// Called when new event arrives - resets attempt_count
266 pub fn on_new_event(&mut self, delay: Duration) {
267 self.attempt_count = 0;
268 let new_attempt = Instant::now() + delay;
269 if new_attempt < self.next_attempt {
270 self.next_attempt = new_attempt;
271 }
272 }
273
274 /// Called when sync attempt completes
275 pub fn on_sync_complete(&mut self) {
276 self.in_progress = false;
277 if self.next_attempt <= Instant::now() {
278 self.attempt_count += 1;
279 self.next_attempt = Instant::now() + self.backoff();
280 }
281 }
282}
283```
284
285### ThrottleManager
286
287Manages all per-domain throttles and provides the interface for checking throttle status:
288
289```rust
290/// Manages rate limiting across all domains.
291///
292/// Owns a collection of DomainThrottle instances and provides:
293/// - Throttle status checking for sync_identifier_next_url
294/// - Identifier queue management
295/// - Trigger-based processing when capacity frees up
296pub struct ThrottleManager {
297 /// Per-domain throttle state
298 throttles: DashMap<String, DomainThrottle>,
299
300 /// Sync context for processing queued identifiers
301 /// Set once at startup via set_context()
302 ctx: OnceLock<Arc<dyn SyncContext>>,
303
304 /// Configuration
305 max_concurrent_per_domain: u32,
306 max_per_minute_per_domain: u32,
307}
308
309impl ThrottleManager {
310 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self {
311 Self {
312 throttles: DashMap::new(),
313 ctx: OnceLock::new(),
314 max_concurrent_per_domain: max_concurrent,
315 max_per_minute_per_domain: max_per_minute,
316 }
317 }
318
319 /// Set the sync context (called once at startup)
320 pub fn set_context(&self, ctx: Arc<dyn SyncContext>) {
321 let _ = self.ctx.set(ctx);
322 }
323
324 /// Check if a domain is currently throttled (at capacity)
325 pub fn is_throttled(&self, domain: &str) -> bool {
326 self.throttles
327 .get(domain)
328 .map_or(false, |t| !t.has_capacity())
329 }
330
331 /// Get or create throttle for a domain
332 fn get_or_create(&self, domain: &str) -> dashmap::mapref::one::RefMut<String, DomainThrottle> {
333 self.throttles
334 .entry(domain.to_string())
335 .or_insert_with(|| DomainThrottle::new(
336 domain.to_string(),
337 self.max_concurrent_per_domain,
338 self.max_per_minute_per_domain,
339 ))
340 }
341
342 /// Record that a request is starting for a domain
343 pub fn start_request(&self, domain: &str) {
344 self.get_or_create(domain).start_request();
345 }
346
347 /// Record that a request completed for a domain.
348 /// Triggers processing of next queued identifier if capacity available.
349 pub fn complete_request(self: &Arc<Self>, domain: &str) {
350 let should_trigger = {
351 if let Some(mut throttle) = self.throttles.get_mut(domain) {
352 throttle.complete_request();
353 throttle.has_capacity() && throttle.has_queued_work()
354 } else {
355 false
356 }
357 };
358
359 if should_trigger {
360 self.try_process_next(domain);
361 }
362 }
363
364 /// Add an identifier to a domain's waiting queue.
365 /// Triggers processing if capacity is available.
366 pub fn enqueue_identifier(
367 self: &Arc<Self>,
368 domain: &str,
369 identifier: String,
370 tried_urls_for_domain: HashSet<String>,
371 ) {
372 let should_trigger = {
373 let mut throttle = self.get_or_create(domain);
374 throttle.enqueue_identifier(identifier, tried_urls_for_domain);
375 throttle.has_capacity()
376 };
377
378 if should_trigger {
379 self.try_process_next(domain);
380 }
381 }
382
383 /// Try to process the next queued identifier for a domain
384 fn try_process_next(self: &Arc<Self>, domain: &str) {
385 let identifier = {
386 if let Some(mut throttle) = self.throttles.get_mut(domain) {
387 throttle.next_ready_identifier()
388 } else {
389 None
390 }
391 };
392
393 if let Some(identifier) = identifier {
394 let manager = self.clone();
395 let domain = domain.to_string();
396
397 tokio::spawn(async move {
398 manager.process_queued_identifier(&domain, &identifier).await;
399 });
400 }
401 }
402
403 /// Process a single identifier from a domain's queue
404 async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str) {
405 let ctx = match self.ctx.get() {
406 Some(ctx) => ctx,
407 None => return,
408 };
409
410 // Get next URL for this identifier on this domain
411 let url = {
412 let throttle = match self.throttles.get(domain) {
413 Some(t) => t,
414 None => return,
415 };
416 let tried_urls = throttle.get_tried_urls(identifier);
417
418 sync_identifier_next_url(
419 ctx.as_ref(),
420 identifier,
421 Some(domain),
422 &tried_urls,
423 self,
424 ).await
425 };
426
427 match url {
428 Some(url) => {
429 // Fetch from this URL (this calls start_request/complete_request internally)
430 sync_identifier_from_url(ctx.as_ref(), identifier, &url, self).await;
431
432 // Record URL as tried and mark not in_progress
433 // complete_request() will trigger next item if capacity available
434 if let Some(mut throttle) = self.throttles.get_mut(domain) {
435 throttle.mark_url_tried(identifier, url);
436 throttle.mark_identifier_not_in_progress(identifier);
437 }
438 }
439 None => {
440 // No more URLs for this identifier on this domain - remove from queue
441 if let Some(mut throttle) = self.throttles.get_mut(domain) {
442 throttle.remove_identifier(identifier);
443 }
444 // Try next identifier since we didn't use any capacity
445 self.try_process_next(domain);
446 }
447 }
448 }
449}
450```
451
452### DomainThrottle
453
454Per-domain rate limiting and waiting queue:
455
456```rust
457/// Per-domain rate limiting and identifier queue.
458///
459/// Handles:
460/// - Rate limiting (concurrent requests, requests per minute)
461/// - Queue of identifiers waiting for capacity (using IndexMap for round-robin order)
462/// - Tracking tried URLs per identifier (for this domain only)
463/// - In-progress flag per identifier (prevents concurrent fetches for same identifier
464/// on this domain, important when queue is small and we have multiple concurrent slots)
465pub struct DomainThrottle {
466 /// Domain this throttle manages
467 domain: String,
468
469 /// Current in-flight request count
470 in_flight: u32,
471
472 /// Request timestamps (sliding window for rate limiting)
473 request_times: VecDeque<Instant>,
474
475 /// Queued identifiers with their state.
476 /// IndexMap preserves insertion order for round-robin processing.
477 queue: IndexMap<String, IdentifierQueueState>,
478
479 /// Round-robin index for fair processing across identifiers
480 round_robin_index: usize,
481
482 /// Configuration
483 max_concurrent: u32,
484 max_per_minute: u32,
485}
486
487/// State for an identifier waiting in a domain's queue
488#[derive(Debug, Clone)]
489struct IdentifierQueueState {
490 /// URLs from this domain that have been tried
491 tried_urls: HashSet<String>,
492
493 /// Whether a fetch is currently in progress for this identifier on this domain.
494 /// Prevents starting multiple concurrent fetches for the same identifier,
495 /// which is important when the queue is small (e.g., 2 identifiers with 5
496 /// concurrent slots would otherwise try to process the same identifier multiple times).
497 in_progress: bool,
498}
499
500impl DomainThrottle {
501 pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self {
502 Self {
503 domain,
504 in_flight: 0,
505 request_times: VecDeque::new(),
506 queue: IndexMap::new(),
507 round_robin_index: 0,
508 max_concurrent,
509 max_per_minute,
510 }
511 }
512
513 /// Check if domain has capacity for another request
514 pub fn has_capacity(&self) -> bool {
515 if self.in_flight >= self.max_concurrent {
516 return false;
517 }
518
519 let now = Instant::now();
520 let window = Duration::from_secs(60);
521 let recent_count = self.request_times
522 .iter()
523 .filter(|t| now.duration_since(**t) < window)
524 .count();
525
526 recent_count < self.max_per_minute as usize
527 }
528
529 /// Check if there are any identifiers in the queue
530 pub fn has_queued_work(&self) -> bool {
531 !self.queue.is_empty()
532 }
533
534 /// Record that a request is starting
535 pub fn start_request(&mut self) {
536 self.in_flight += 1;
537 self.request_times.push_back(Instant::now());
538 }
539
540 /// Record that a request completed
541 pub fn complete_request(&mut self) {
542 self.in_flight = self.in_flight.saturating_sub(1);
543
544 // Clean old timestamps
545 let now = Instant::now();
546 let window = Duration::from_secs(60);
547 while self.request_times.front().map_or(false, |t| now.duration_since(*t) >= window) {
548 self.request_times.pop_front();
549 }
550 }
551
552 /// Add an identifier to the queue
553 pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>) {
554 self.queue
555 .entry(identifier)
556 .and_modify(|state| {
557 // Merge tried_urls if already exists
558 state.tried_urls.extend(tried_urls.iter().cloned());
559 })
560 .or_insert(IdentifierQueueState {
561 tried_urls,
562 in_progress: false,
563 });
564 }
565
566 /// Get next identifier ready for processing (round-robin, not in_progress).
567 ///
568 /// Iterates through the queue starting from round_robin_index, skipping
569 /// any identifiers that are already in_progress. This ensures fair
570 /// distribution even when some identifiers have active fetches.
571 pub fn next_ready_identifier(&mut self) -> Option<String> {
572 let len = self.queue.len();
573 if len == 0 {
574 return None;
575 }
576
577 // Try each identifier starting from round_robin_index
578 for i in 0..len {
579 let index = (self.round_robin_index + i) % len;
580 if let Some((identifier, state)) = self.queue.get_index_mut(index) {
581 if !state.in_progress {
582 state.in_progress = true;
583 self.round_robin_index = (index + 1) % len;
584 return Some(identifier.clone());
585 }
586 }
587 }
588
589 None // All identifiers are in_progress
590 }
591
592 /// Get tried URLs for an identifier
593 pub fn get_tried_urls(&self, identifier: &str) -> HashSet<String> {
594 self.queue
595 .get(identifier)
596 .map(|s| s.tried_urls.clone())
597 .unwrap_or_default()
598 }
599
600 /// Mark a URL as tried for an identifier
601 pub fn mark_url_tried(&mut self, identifier: &str, url: String) {
602 if let Some(state) = self.queue.get_mut(identifier) {
603 state.tried_urls.insert(url);
604 }
605 }
606
607 /// Mark identifier as not in progress (fetch completed)
608 pub fn mark_identifier_not_in_progress(&mut self, identifier: &str) {
609 if let Some(state) = self.queue.get_mut(identifier) {
610 state.in_progress = false;
611 }
612 }
613
614 /// Remove an identifier from the queue entirely
615 pub fn remove_identifier(&mut self, identifier: &str) {
616 if let Some((index, _, _)) = self.queue.shift_remove_full(identifier) {
617 // Adjust round_robin_index if we removed an entry before it
618 if index < self.round_robin_index && self.round_robin_index > 0 {
619 self.round_robin_index -= 1;
620 }
621 // Clamp to valid range
622 if !self.queue.is_empty() {
623 self.round_robin_index = self.round_robin_index % self.queue.len();
624 } else {
625 self.round_robin_index = 0;
626 }
627 }
628 }
629}
630```
631
632### SyncContext Trait (For Testability)
633
634Abstract the external dependencies to enable unit testing:
635
636```rust
637/// Abstraction over external dependencies for sync operations.
638///
639/// This trait allows unit testing of sync logic by mocking:
640/// - Repository data fetching
641/// - OID existence checks
642/// - Git fetch operations
643/// - Event processing
644#[async_trait]
645pub trait SyncContext: Send + Sync {
646 /// Get repository data (announcements, clone URLs, etc.)
647 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData>;
648
649 /// Get all OIDs needed for purgatory events with this identifier
650 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String>;
651
652 /// Check if an OID exists locally
653 fn oid_exists(&self, repo_path: &Path, oid: &str) -> bool;
654
655 /// Fetch OIDs from a remote server
656 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>;
657
658 /// Process newly available git data.
659 ///
660 /// This is a thin wrapper around the unified `process_newly_available_git_data` function.
661 /// Called after each successful OID fetch to check if any purgatory events can now be satisfied.
662 ///
663 /// See [Unified Git Data Sync](unify-git-data-sync.md) for the complete design.
664 async fn process_newly_available_git_data(
665 &self,
666 source_repo_path: &Path,
667 new_oids: &HashSet<String>,
668 ) -> Result<ProcessResult>;
669
670 /// Check if there are still pending events for this identifier
671 fn has_pending_events(&self, identifier: &str) -> bool;
672
673 /// Find the best local repo to fetch into
674 fn find_target_repo(&self, db_repo_data: &RepositoryData) -> Option<PathBuf>;
675
676 /// Our domain (to exclude from clone URLs)
677 fn our_domain(&self) -> Option<&str>;
678}
679
680/// Real implementation of SyncContext with all dependencies
681pub struct RealSyncContext {
682 purgatory: Purgatory,
683 database: SharedDatabase,
684 git_data_path: PathBuf,
685 our_domain: Option<String>,
686 local_relay: Option<nostr_relay_builder::LocalRelay>,
687}
688
689impl RealSyncContext {
690 pub fn new(
691 purgatory: Purgatory,
692 database: SharedDatabase,
693 git_data_path: PathBuf,
694 our_domain: Option<String>,
695 local_relay: Option<nostr_relay_builder::LocalRelay>,
696 ) -> Self {
697 Self {
698 purgatory,
699 database,
700 git_data_path,
701 our_domain,
702 local_relay,
703 }
704 }
705}
706
707#[async_trait]
708impl SyncContext for RealSyncContext {
709 // ... other methods ...
710
711 async fn process_newly_available_git_data(
712 &self,
713 source_repo_path: &Path,
714 new_oids: &HashSet<String>,
715 ) -> Result<ProcessResult> {
716 // Call the unified function that handles all post-git-data-available processing
717 // This is the same function called by handle_receive_pack after a git push
718 crate::git::process_newly_available_git_data(
719 source_repo_path,
720 new_oids,
721 &self.database,
722 self.local_relay.as_ref(),
723 &self.purgatory,
724 &self.git_data_path,
725 ).await
726 }
727
728 // ... other methods ...
729}
730```
731
732**Note**: The `SyncContext` trait abstracts away the dependencies for testability. The real implementation (`RealSyncContext`) holds references to purgatory, database, etc., and the `process_newly_available_git_data` method delegates to the unified function. This keeps the sync logic functions (`sync_identifier_next_url`, `sync_identifier_from_url`) clean and testable with mocks.
733
734## Core Sync Logic
735
736### Two-Function Design
737
738The sync logic is split into two functions that can be called by either the main sync loop or by DomainThrottle:
739
7401. **`sync_identifier_next_url`**: Pure selection logic - finds next URL to try
7412. **`sync_identifier_from_url`**: Pure fetch logic - fetches from a specific URL
742
743This separation enables:
744- Main sync loop to try non-throttled URLs immediately
745- DomainThrottle to process queued identifiers when capacity frees
746- Clean testability with mocked SyncContext
747
748### Helper: extract_domain
749
750```rust
751/// Extract domain from a URL (e.g., "https://github.com/foo/bar.git" → "github.com")
752fn extract_domain(url: &str) -> Option<String> {
753 url::Url::parse(url)
754 .ok()
755 .and_then(|u| u.host_str().map(|s| s.to_string()))
756}
757```
758
759### sync_identifier_next_url
760
761```rust
762/// Find the next URL to try for an identifier.
763///
764/// When `domain` is None: returns any non-throttled URL not in tried_urls
765/// When `domain` is Some: returns a URL from that specific domain not in tried_urls
766///
767/// Returns None if:
768/// - No pending events for this identifier
769/// - No OIDs needed (sync complete)
770/// - No untried URLs available (for the specified domain or all domains)
771/// - All available domains are throttled (when domain is None)
772pub async fn sync_identifier_next_url<C: SyncContext>(
773 ctx: &C,
774 identifier: &str,
775 domain: Option<&str>,
776 tried_urls: &HashSet<String>,
777 throttle_manager: &ThrottleManager,
778) -> Option<String> {
779 // 1. Check if we still have pending events
780 if !ctx.has_pending_events(identifier) {
781 return None;
782 }
783
784 // 2. Collect needed OIDs
785 let needed_oids = ctx.collect_needed_oids(identifier);
786 if needed_oids.is_empty() {
787 // No OIDs needed - sync is complete
788 return None;
789 }
790
791 // 3. Get repository data
792 let repo_data = match ctx.fetch_repository_data(identifier).await {
793 Ok(data) => data,
794 Err(_) => return None,
795 };
796
797 // 4. Collect clone URLs, excluding our domain
798 let all_urls: Vec<String> = repo_data
799 .announcements
800 .iter()
801 .flat_map(|a| a.clone_urls.iter().cloned())
802 .filter(|url| ctx.our_domain().map_or(true, |d| !url.contains(d)))
803 .collect::<HashSet<_>>()
804 .into_iter()
805 .collect();
806
807 // 5. Group by domain
808 let urls_by_domain: HashMap<String, Vec<String>> = all_urls
809 .iter()
810 .fold(HashMap::new(), |mut acc, url| {
811 if let Some(d) = extract_domain(url) {
812 acc.entry(d).or_default().push(url.clone());
813 }
814 acc
815 });
816
817 // 6. Find an available URL
818 match domain {
819 Some(specific_domain) => {
820 // Only look at URLs from this specific domain
821 urls_by_domain
822 .get(specific_domain)
823 .and_then(|urls| {
824 urls.iter()
825 .find(|url| !tried_urls.contains(*url))
826 .cloned()
827 })
828 }
829 None => {
830 // Try any non-throttled domain
831 for (d, domain_urls) in &urls_by_domain {
832 if throttle_manager.is_throttled(d) {
833 continue;
834 }
835 if let Some(url) = domain_urls.iter().find(|url| !tried_urls.contains(*url)) {
836 return Some(url.clone());
837 }
838 }
839 None
840 }
841 }
842}
843
844/// Information about throttled domains with untried URLs
845#[derive(Debug, Clone)]
846pub struct ThrottledDomainInfo {
847 pub domain: String,
848 pub tried_urls_for_domain: HashSet<String>,
849}
850
851/// Get information about throttled domains that have untried URLs.
852///
853/// Called by main sync loop to know which DomainThrottle queues to add the identifier to.
854pub async fn get_throttled_domains_with_untried_urls<C: SyncContext>(
855 ctx: &C,
856 identifier: &str,
857 tried_urls: &HashSet<String>,
858 throttle_manager: &ThrottleManager,
859) -> Vec<ThrottledDomainInfo> {
860 let repo_data = match ctx.fetch_repository_data(identifier).await {
861 Ok(data) => data,
862 Err(_) => return vec![],
863 };
864
865 let all_urls: Vec<String> = repo_data
866 .announcements
867 .iter()
868 .flat_map(|a| a.clone_urls.iter().cloned())
869 .filter(|url| ctx.our_domain().map_or(true, |d| !url.contains(d)))
870 .collect::<HashSet<_>>()
871 .into_iter()
872 .collect();
873
874 let urls_by_domain: HashMap<String, Vec<String>> = all_urls
875 .iter()
876 .fold(HashMap::new(), |mut acc, url| {
877 if let Some(d) = extract_domain(url) {
878 acc.entry(d).or_default().push(url.clone());
879 }
880 acc
881 });
882
883 urls_by_domain
884 .into_iter()
885 .filter_map(|(domain, domain_urls)| {
886 if !throttle_manager.is_throttled(&domain) {
887 return None; // Not throttled, skip
888 }
889
890 let untried: Vec<_> = domain_urls
891 .iter()
892 .filter(|url| !tried_urls.contains(*url))
893 .collect();
894
895 if untried.is_empty() {
896 return None; // All URLs tried for this domain
897 }
898
899 // Collect tried URLs that belong to this domain
900 let tried_urls_for_domain: HashSet<String> = tried_urls
901 .iter()
902 .filter(|url| extract_domain(url).as_deref() == Some(&domain))
903 .cloned()
904 .collect();
905
906 Some(ThrottledDomainInfo {
907 domain,
908 tried_urls_for_domain,
909 })
910 })
911 .collect()
912}
913```
914
915### sync_identifier_from_url
916
917```rust
918/// Fetch git data from a specific URL for an identifier.
919///
920/// This function:
921/// 1. Records the request with the throttle manager
922/// 2. Performs the actual git fetch
923/// 3. Processes any events that can now be satisfied
924/// 4. Records request completion
925///
926/// Returns the number of OIDs successfully fetched.
927pub async fn sync_identifier_from_url<C: SyncContext>(
928 ctx: &C,
929 identifier: &str,
930 url: &str,
931 throttle_manager: &Arc<ThrottleManager>,
932) -> usize {
933 let domain = match extract_domain(url) {
934 Some(d) => d,
935 None => return 0,
936 };
937
938 // Get repository data for target repo path
939 let repo_data = match ctx.fetch_repository_data(identifier).await {
940 Ok(data) => data,
941 Err(e) => {
942 tracing::debug!(identifier = %identifier, error = %e, "Failed to fetch repo data");
943 return 0;
944 }
945 };
946
947 let target_repo = match ctx.find_target_repo(&repo_data) {
948 Some(path) => path,
949 None => {
950 tracing::debug!(identifier = %identifier, "No target repo found");
951 return 0;
952 }
953 };
954
955 // Collect needed OIDs
956 let needed_oids: Vec<String> = ctx.collect_needed_oids(identifier).into_iter().collect();
957 if needed_oids.is_empty() {
958 return 0;
959 }
960
961 // Perform the fetch
962 throttle_manager.start_request(&domain);
963 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await;
964 throttle_manager.complete_request(&domain);
965
966 let oids_fetched = match fetch_result {
967 Ok(fetched) => {
968 tracing::debug!(
969 identifier = %identifier,
970 url = %url,
971 oids_fetched = fetched.len(),
972 "Fetch succeeded"
973 );
974 fetched.len()
975 }
976 Err(e) => {
977 tracing::debug!(
978 identifier = %identifier,
979 url = %url,
980 error = %e,
981 "Fetch failed"
982 );
983 0
984 }
985 };
986
987 // Try to process any events that can now be satisfied
988 if oids_fetched > 0 {
989 let new_oids: HashSet<String> = needed_oids.iter().cloned().collect();
990 if let Err(e) = ctx.process_newly_available_git_data(&target_repo, &new_oids).await {
991 tracing::warn!(
992 identifier = %identifier,
993 error = %e,
994 "Failed to process newly available git data"
995 );
996 }
997 }
998
999 oids_fetched
1000}
1001```
1002
1003### process_newly_available_git_data (Unified Function)
1004
1005This is the core function that handles the "release from purgatory" logic. It's called after each successful fetch to check if any purgatory events can now be satisfied with the available git data.
1006
1007**Key Design Decision**: This is a **unified function** shared with the git push handler. Both `handle_receive_pack` (after git push) and `sync_identifier_from_url` (after purgatory sync fetch) call the same function. See [Unified Git Data Sync](unify-git-data-sync.md) for the complete implementation.
1008
1009**Why unify?**
1010
1011The post-git-data-available processing is identical regardless of how data arrived:
1012
1013| Step | After git push | After purgatory fetch |
1014|------|---------------|----------------------|
1015| Discover satisfiable events | ✅ Same | ✅ Same |
1016| Sync OIDs to owner repos | ✅ Same | ✅ Same |
1017| Align refs (+ set HEAD) | ✅ Same | ✅ Same |
1018| Save events to database | ✅ Same | ✅ Same |
1019| Notify WebSocket | ✅ Same | ✅ Same |
1020| Remove from purgatory | ✅ Same | ✅ Same |
1021
1022```rust
1023/// Result of processing newly available git data
1024#[derive(Debug, Default)]
1025pub struct ProcessResult {
1026 /// Number of state events released from purgatory
1027 pub states_released: usize,
1028 /// Number of PR events released from purgatory
1029 pub prs_released: usize,
1030 /// Number of repositories synced (OIDs copied + refs aligned)
1031 pub repos_synced: usize,
1032 /// Number of refs created/updated/deleted
1033 pub refs_created: usize,
1034 pub refs_updated: usize,
1035 pub refs_deleted: usize,
1036 /// Errors encountered (non-fatal)
1037 pub errors: Vec<String>,
1038}
1039
1040/// Unified processing of newly available git data.
1041///
1042/// Called whenever git data becomes available, whether from:
1043/// - A successful `git push` (handle_receive_pack)
1044/// - Purgatory sync fetching OIDs from remote servers
1045///
1046/// See unify-git-data-sync.md for complete implementation details.
1047pub async fn process_newly_available_git_data(
1048 source_repo_path: &Path,
1049 new_oids: &HashSet<String>,
1050 database: &SharedDatabase,
1051 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1052 purgatory: &Purgatory,
1053 git_data_path: &Path,
1054) -> Result<ProcessResult>;
1055```
1056
1057**Key properties of the unified function:**
1058
10591. **Early release**: If we fetch from `server1.com` and get all OIDs for state event A, we immediately release A even if state event B still needs OIDs from `server2.com`
1060
10612. **Idempotent**: The function can be called multiple times safely. It only processes events that are actually satisfiable.
1062
10633. **Atomic per-event**: Each event is processed independently. If saving one event fails, others can still succeed.
1064
10654. **Authorization at release time**: We check authorization when releasing, not when adding to purgatory. This handles the case where maintainer sets change while an event is in purgatory.
1066
10675. **Handles all event types**: Both state events (kind 30618) and PR events (kind 1617/1618) are processed uniformly.
1068
1069### The Sync Identifier Loop (Main Sync)
1070
1071```rust
1072/// Sync git data for an identifier.
1073///
1074/// This is called by the main sync loop. It:
1075/// 1. Tries all non-throttled URLs
1076/// 2. Enqueues with throttled domains for later processing
1077/// 3. Returns without waiting for throttled domains
1078///
1079/// Returns true if sync completed (no pending events or no OIDs needed),
1080/// false if events remain (will be retried after backoff).
1081pub async fn sync_identifier<C: SyncContext>(
1082 ctx: &C,
1083 identifier: &str,
1084 throttle_manager: &Arc<ThrottleManager>,
1085) -> bool {
1086 let mut tried_urls: HashSet<String> = HashSet::new();
1087
1088 // Try all non-throttled URLs
1089 loop {
1090 match sync_identifier_next_url(
1091 ctx,
1092 identifier,
1093 None, // Any domain
1094 &tried_urls,
1095 throttle_manager,
1096 ).await {
1097 Some(url) => {
1098 // Found a non-throttled URL to try
1099 sync_identifier_from_url(ctx, identifier, &url, throttle_manager).await;
1100 tried_urls.insert(url);
1101
1102 // Check if sync is now complete
1103 if !ctx.has_pending_events(identifier) {
1104 tracing::info!(identifier = %identifier, "Sync complete - no pending events");
1105 return true;
1106 }
1107
1108 let needed_oids = ctx.collect_needed_oids(identifier);
1109 if needed_oids.is_empty() {
1110 tracing::info!(identifier = %identifier, "Sync complete - all OIDs available");
1111 return true;
1112 }
1113
1114 // Continue trying more URLs
1115 }
1116 None => {
1117 // No more non-throttled URLs available
1118 break;
1119 }
1120 }
1121 }
1122
1123 // Check if we're done (no pending events or no needed OIDs)
1124 if !ctx.has_pending_events(identifier) {
1125 return true;
1126 }
1127
1128 let needed_oids = ctx.collect_needed_oids(identifier);
1129 if needed_oids.is_empty() {
1130 return true;
1131 }
1132
1133 // Enqueue with any throttled domains that have untried URLs
1134 let throttled_domains = get_throttled_domains_with_untried_urls(
1135 ctx,
1136 identifier,
1137 &tried_urls,
1138 throttle_manager,
1139 ).await;
1140
1141 for info in throttled_domains {
1142 tracing::debug!(
1143 identifier = %identifier,
1144 domain = %info.domain,
1145 "Enqueueing with throttled domain"
1146 );
1147 throttle_manager.enqueue_identifier(
1148 &info.domain,
1149 identifier.to_string(),
1150 info.tried_urls_for_domain,
1151 );
1152 }
1153
1154 // Return false - events remain, will retry after backoff
1155 // (throttled domains will process independently)
1156 false
1157}
1158```
1159
1160### The Main Sync Loop
1161
1162```rust
1163impl Purgatory {
1164 pub fn start_sync_loop(
1165 self: Arc<Self>,
1166 database: SharedDatabase,
1167 our_domain: Option<String>,
1168 local_relay: Option<nostr_relay_builder::LocalRelay>,
1169 throttle_manager: Arc<ThrottleManager>,
1170 ) -> tokio::task::JoinHandle<()> {
1171 tokio::spawn(async move {
1172 let mut interval = tokio::time::interval(Duration::from_secs(1));
1173
1174 loop {
1175 interval.tick().await;
1176
1177 // Find all ready identifiers
1178 let ready: Vec<String> = self.sync_queue
1179 .iter()
1180 .filter(|e| e.value().is_ready())
1181 .map(|e| e.key().clone())
1182 .collect();
1183
1184 for identifier in ready {
1185 // Check if events still exist
1186 if !self.has_pending_events(&identifier) {
1187 self.sync_queue.remove(&identifier);
1188 continue;
1189 }
1190
1191 // Mark in progress
1192 if let Some(mut entry) = self.sync_queue.get_mut(&identifier) {
1193 if entry.in_progress {
1194 continue;
1195 }
1196 entry.in_progress = true;
1197 } else {
1198 continue;
1199 }
1200
1201 // Spawn sync task
1202 let purgatory = self.clone();
1203 let db = database.clone();
1204 let domain = our_domain.clone();
1205 let relay = local_relay.clone();
1206 let throttle_manager = throttle_manager.clone();
1207 let id = identifier.clone();
1208
1209 tokio::spawn(async move {
1210 // Create the real SyncContext implementation
1211 let ctx = RealSyncContext::new(
1212 purgatory.clone(),
1213 db,
1214 domain,
1215 relay,
1216 );
1217
1218 let complete = sync_identifier(&ctx, &id, &throttle_manager).await;
1219
1220 if complete || !purgatory.has_pending_events(&id) {
1221 purgatory.sync_queue.remove(&id);
1222 tracing::info!(identifier = %id, "Removed from sync queue");
1223 } else {
1224 // Apply backoff - will retry later
1225 // (throttled domains are being processed independently)
1226 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) {
1227 entry.on_sync_complete();
1228 }
1229 }
1230 });
1231 }
1232 }
1233 })
1234 }
1235}
1236```
1237
1238## Testing Strategy
1239
1240Tests are created **only** as part of each implementation phase. See [Implementation Phases](#implementation-phases) for the complete test plan.
1241
1242### Design Principles
1243
12441. **Tests accompany code**: Each phase specifies exactly which tests to create
12452. **Unit tests for mechanics**: Test backoff, throttle, retry logic in isolation using mocks
12463. **Integration tests for outcomes**: Verify events sync correctly end-to-end
12474. **No speculative tests**: Don't create tests for code that doesn't exist yet
1248
1249### MockSyncContext
1250
1251Phases 4-6 use `MockSyncContext` to test sync logic without I/O:
1252
1253```rust
1254/// Mock context for testing sync logic
1255#[cfg(test)]
1256pub struct MockSyncContext {
1257 /// Repository data to return
1258 repo_data: RepositoryData,
1259 /// OIDs still needed (decremented when "fetched")
1260 needed_oids: RefCell<HashSet<String>>,
1261 /// Which OIDs each URL can provide
1262 url_provides_oids: HashMap<String, HashSet<String>>,
1263 /// Track fetch attempts for assertions
1264 fetch_log: RefCell<Vec<String>>,
1265 /// Whether there are pending events
1266 has_pending: RefCell<bool>,
1267}
1268
1269impl MockSyncContext {
1270 pub fn new() -> Self;
1271 pub fn with_urls(self, urls: &[&str]) -> Self;
1272 pub fn with_needed_oids(self, oids: &[&str]) -> Self;
1273 pub fn url_provides(self, url: &str, oids: &[&str]) -> Self;
1274}
1275```
1276
1277### Test Locations
1278
1279| Test Type | Location | Created In |
1280|-----------|----------|------------|
1281| SyncQueueEntry | `src/purgatory/sync/queue.rs` | Phase 1 |
1282| DomainThrottle | `src/purgatory/sync/throttle.rs` | Phase 2 |
1283| ThrottleManager | `src/purgatory/sync/throttle.rs` | Phase 3 |
1284| Core sync functions | `src/purgatory/sync/functions.rs` | Phase 5-6 |
1285| Queue integration | `src/purgatory/mod.rs` | Phase 7 |
1286| Unified function helpers | `src/git/sync.rs` | Phase 9 |
1287| Integration tests | `tests/purgatory_sync.rs` | Phase 12 |
1288
1289## Implementation Phases
1290
1291Each phase has clear deliverables, unit tests, and success criteria. Unit tests are created **only** for the code built in that phase.
1292
1293---
1294
1295### Phase 1: SyncQueueEntry with Backoff
1296
1297**Goal**: Implement the sync queue entry struct with backoff calculation.
1298
1299**Files**:
1300- `src/purgatory/sync/mod.rs` (new - module declaration)
1301- `src/purgatory/sync/queue.rs` (new)
1302
1303**Note**: This creates a new `sync` submodule under `src/purgatory/`. The current purgatory structure is flat (`mod.rs`, `helpers.rs`, `types.rs`). Add `pub mod sync;` to `src/purgatory/mod.rs`.
1304
1305**Deliverables**:
1306```rust
1307pub struct SyncQueueEntry {
1308 pub next_attempt: Instant,
1309 pub attempt_count: u32,
1310 pub in_progress: bool,
1311}
1312
1313impl SyncQueueEntry {
1314 pub fn new(delay: Duration) -> Self;
1315 pub fn backoff(&self) -> Duration;
1316 pub fn is_ready(&self) -> bool;
1317 pub fn on_new_event(&mut self, delay: Duration);
1318 pub fn on_sync_complete(&mut self);
1319}
1320```
1321
1322**Unit Tests** (2 tests):
1323```rust
1324#[cfg(test)]
1325mod tests {
1326 #[test]
1327 fn backoff_doubles_up_to_cap() {
1328 // 20s → 40s → 80s → 120s → 120s (capped)
1329 }
1330
1331 #[test]
1332 fn new_event_resets_attempt_count() {
1333 // on_new_event() resets attempt_count to 0
1334 }
1335}
1336```
1337
1338**Success Criteria**:
1339- [ ] `SyncQueueEntry::new()` creates entry with given delay
1340- [ ] `backoff()` returns 20s, 40s, 80s, 120s, 120s for attempts 1-5
1341- [ ] `on_new_event()` resets `attempt_count` to 0
1342- [ ] `on_sync_complete()` increments `attempt_count` and updates `next_attempt`
1343- [ ] Both unit tests pass
1344
1345---
1346
1347### Phase 2: DomainThrottle with Rate Limiting and Round-Robin
1348
1349**Goal**: Implement per-domain throttling with concurrent/rate limits and fair queue processing.
1350
1351**Files**:
1352- `src/purgatory/sync/throttle.rs` (new)
1353
1354**Deliverables**:
1355```rust
1356pub struct DomainThrottle {
1357 domain: String,
1358 in_flight: u32,
1359 request_times: VecDeque<Instant>,
1360 queue: IndexMap<String, IdentifierQueueState>,
1361 round_robin_index: usize,
1362 max_concurrent: u32,
1363 max_per_minute: u32,
1364}
1365
1366impl DomainThrottle {
1367 pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self;
1368 pub fn has_capacity(&self) -> bool;
1369 pub fn start_request(&mut self);
1370 pub fn complete_request(&mut self);
1371 pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>);
1372 pub fn next_ready_identifier(&mut self) -> Option<String>;
1373 pub fn mark_identifier_not_in_progress(&mut self, identifier: &str);
1374 pub fn remove_identifier(&mut self, identifier: &str);
1375}
1376```
1377
1378**Unit Tests** (4 tests):
1379```rust
1380#[cfg(test)]
1381mod tests {
1382 #[test]
1383 fn concurrent_limit_blocks_when_saturated() {
1384 // has_capacity() returns false when in_flight >= max_concurrent
1385 }
1386
1387 #[test]
1388 fn rate_limit_blocks_when_window_full() {
1389 // has_capacity() returns false when requests in last 60s >= max_per_minute
1390 // Use deterministic time (pass Instant or mock clock)
1391 }
1392
1393 #[test]
1394 fn round_robin_processes_identifiers_fairly() {
1395 // Enqueue A, B, C → next_ready returns A, B, C, A, B, C...
1396 }
1397
1398 #[test]
1399 fn skips_in_progress_identifiers() {
1400 // next_ready skips identifiers where in_progress=true
1401 }
1402}
1403```
1404
1405**Success Criteria**:
1406- [ ] Concurrent limit enforced (blocks at max_concurrent)
1407- [ ] Rate limit enforced (blocks at max_per_minute within 60s window)
1408- [ ] Round-robin ordering maintained across calls
1409- [ ] In-progress identifiers skipped
1410- [ ] All 4 unit tests pass
1411
1412---
1413
1414### Phase 3: ThrottleManager (Rate Limiting Only)
1415
1416**Goal**: Implement the manager that owns all domain throttles and provides rate limiting. Queue processing methods (`set_context`, `process_queued_identifier`) are added in Phase 6 after `SyncContext` exists.
1417
1418**Files**:
1419- `src/purgatory/sync/throttle.rs` (extend)
1420
1421**Deliverables**:
1422```rust
1423pub struct ThrottleManager {
1424 throttles: DashMap<String, DomainThrottle>,
1425 max_concurrent_per_domain: u32,
1426 max_per_minute_per_domain: u32,
1427 // Note: ctx: OnceLock<Arc<dyn SyncContext>> added in Phase 6
1428}
1429
1430impl ThrottleManager {
1431 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self;
1432 pub fn is_throttled(&self, domain: &str) -> bool;
1433 pub fn start_request(&self, domain: &str);
1434 pub fn complete_request(&self, domain: &str); // Trigger logic added in Phase 6
1435 pub fn enqueue_identifier(&self, domain: &str, identifier: String, tried_urls: HashSet<String>); // Trigger logic added in Phase 6
1436}
1437```
1438
1439**Unit Tests** (1 test):
1440```rust
1441#[cfg(test)]
1442mod tests {
1443 #[test]
1444 fn is_throttled_reflects_domain_capacity() {
1445 // is_throttled returns true when domain has no capacity
1446 }
1447}
1448```
1449
1450**Success Criteria**:
1451- [ ] `is_throttled()` correctly reflects domain capacity
1452- [ ] `start_request()`/`complete_request()` delegate to correct domain
1453- [ ] `enqueue_identifier()` creates domain throttle if needed
1454- [ ] Unit test passes
1455
1456**Note**: The `complete_request()` and `enqueue_identifier()` methods in this phase only update state. The trigger-based processing (spawning tasks when capacity frees) is added in Phase 6 after `SyncContext` is available.
1457
1458---
1459
1460### Phase 4: SyncContext Trait and MockSyncContext
1461
1462**Goal**: Define the abstraction for sync operations and create the test mock.
1463
1464**Files**:
1465- `src/purgatory/sync/context.rs` (new)
1466
1467**Deliverables**:
1468```rust
1469#[async_trait]
1470pub trait SyncContext: Send + Sync {
1471 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData>;
1472 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String>;
1473 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>;
1474 async fn process_newly_available_git_data(
1475 &self,
1476 source_repo_path: &Path,
1477 new_oids: &HashSet<String>,
1478 ) -> Result<ProcessResult>;
1479 fn has_pending_events(&self, identifier: &str) -> bool;
1480 fn find_target_repo(&self, data: &RepositoryData) -> Option<PathBuf>;
1481 fn our_domain(&self) -> Option<&str>;
1482}
1483
1484// Test support
1485#[cfg(test)]
1486pub struct MockSyncContext { ... }
1487```
1488
1489**Unit Tests** (0 tests):
1490- This phase creates infrastructure only; tests come in Phase 5
1491
1492**Success Criteria**:
1493- [ ] `SyncContext` trait compiles with all required methods
1494- [ ] `MockSyncContext` implements `SyncContext`
1495- [ ] Mock supports builder pattern for test setup
1496
1497---
1498
1499### Phase 5: Core Sync Functions
1500
1501**Goal**: Implement `sync_identifier_next_url` and `sync_identifier_from_url`.
1502
1503**Files**:
1504- `src/purgatory/sync/functions.rs` (new)
1505
1506**Deliverables**:
1507```rust
1508pub async fn sync_identifier_next_url<C: SyncContext>(
1509 ctx: &C,
1510 identifier: &str,
1511 domain: Option<&str>,
1512 tried_urls: &HashSet<String>,
1513 throttle_manager: &ThrottleManager,
1514) -> Option<String>;
1515
1516pub async fn sync_identifier_from_url<C: SyncContext>(
1517 ctx: &C,
1518 identifier: &str,
1519 url: &str,
1520 throttle_manager: &Arc<ThrottleManager>,
1521) -> usize;
1522```
1523
1524**Unit Tests** (3 tests):
1525```rust
1526#[cfg(test)]
1527mod tests {
1528 #[tokio::test]
1529 async fn next_url_skips_throttled_domains() {
1530 // When domain is throttled, next_url returns URL from different domain
1531 }
1532
1533 #[tokio::test]
1534 async fn next_url_skips_tried_urls() {
1535 // URLs in tried_urls set are not returned
1536 }
1537
1538 #[tokio::test]
1539 async fn from_url_fetches_and_processes_on_success() {
1540 // Successful fetch triggers process_newly_available_git_data
1541 }
1542}
1543```
1544
1545**Success Criteria**:
1546- [ ] `sync_identifier_next_url` returns non-throttled, untried URL
1547- [ ] `sync_identifier_next_url` returns `None` when all URLs tried or throttled
1548- [ ] `sync_identifier_from_url` calls `fetch_oids` and `process_newly_available_git_data`
1549- [ ] All 3 unit tests pass
1550
1551---
1552
1553### Phase 6: sync_identifier Orchestration + ThrottleManager Queue Processing
1554
1555**Goal**: Implement the main sync loop for a single identifier AND add trigger-based queue processing to ThrottleManager.
1556
1557**Files**:
1558- `src/purgatory/sync/functions.rs` (extend)
1559- `src/purgatory/sync/throttle.rs` (extend - add queue processing triggers)
1560
1561**Deliverables**:
1562```rust
1563// In functions.rs
1564pub async fn sync_identifier<C: SyncContext>(
1565 ctx: &C,
1566 identifier: &str,
1567 throttle_manager: &Arc<ThrottleManager>,
1568) -> bool; // true if complete, false if pending
1569
1570// In throttle.rs - add to ThrottleManager
1571impl ThrottleManager {
1572 /// Set the sync context (called once at startup)
1573 pub fn set_context(&self, ctx: Arc<dyn SyncContext>);
1574
1575 /// Try to process the next queued identifier for a domain (internal)
1576 fn try_process_next(self: &Arc<Self>, domain: &str);
1577
1578 /// Process a single identifier from a domain's queue (internal)
1579 async fn process_queued_identifier(self: &Arc<Self>, domain: &str, identifier: &str);
1580}
1581
1582// Update complete_request and enqueue_identifier to trigger processing
1583```
1584
1585**Unit Tests** (2 tests):
1586```rust
1587#[cfg(test)]
1588mod tests {
1589 #[tokio::test]
1590 async fn tries_multiple_urls_until_complete() {
1591 // Tries URL1 (partial), URL2 (partial), URL3 (complete) → returns true
1592 }
1593
1594 #[tokio::test]
1595 async fn enqueues_throttled_domains_when_incomplete() {
1596 // When URLs remain but are throttled, enqueues and returns false
1597 }
1598}
1599```
1600
1601**Success Criteria**:
1602- [ ] Loops through available URLs until sync complete or all tried
1603- [ ] Enqueues with throttled domains when OIDs still needed
1604- [ ] Returns `true` when all OIDs fetched, `false` otherwise
1605- [ ] `complete_request()` triggers `try_process_next()` when capacity available
1606- [ ] `enqueue_identifier()` triggers `try_process_next()` when capacity available
1607- [ ] Both unit tests pass
1608
1609---
1610
1611### Phase 7: Purgatory Sync Queue Integration
1612
1613**Goal**: Add sync queue to Purgatory and implement `enqueue_sync`.
1614
1615**Files**:
1616- `src/purgatory/mod.rs` (extend)
1617
1618**Deliverables**:
1619```rust
1620impl Purgatory {
1621 // New field: sync_queue: Arc<DashMap<String, SyncQueueEntry>>
1622
1623 pub fn enqueue_sync(&self, identifier: &str, delay: Duration);
1624 pub fn has_pending_events(&self, identifier: &str) -> bool;
1625}
1626```
1627
1628**Unit Tests** (1 test):
1629```rust
1630#[cfg(test)]
1631mod tests {
1632 #[test]
1633 fn enqueue_sync_debounces_rapid_calls() {
1634 // Multiple enqueue_sync calls within delay window result in single entry
1635 }
1636}
1637```
1638
1639**Success Criteria**:
1640- [ ] `enqueue_sync` adds/updates entry in sync_queue
1641- [ ] Rapid calls debounce (don't create multiple entries)
1642- [ ] `has_pending_events` checks both state_events and pr_events
1643- [ ] Unit test passes
1644
1645---
1646
1647### Phase 8: Main Sync Loop
1648
1649**Goal**: Implement the background sync loop that processes ready identifiers.
1650
1651**Files**:
1652- `src/purgatory/sync/loop.rs` (new)
1653
1654**Deliverables**:
1655```rust
1656impl Purgatory {
1657 pub fn start_sync_loop(
1658 self: Arc<Self>,
1659 ctx: Arc<dyn SyncContext>,
1660 throttle_manager: Arc<ThrottleManager>,
1661 ) -> JoinHandle<()>;
1662}
1663```
1664
1665**Note**: The sync loop interval is hardcoded to 1 second. No configuration option needed.
1666
1667**Unit Tests** (0 tests):
1668- The sync loop is tested via integration tests; unit testing async loops is fragile
1669
1670**Success Criteria**:
1671- [ ] Loop runs every 1 second (hardcoded)
1672- [ ] Finds ready identifiers and spawns sync tasks
1673- [ ] Applies backoff on incomplete syncs
1674- [ ] Removes completed identifiers from queue
1675
1676---
1677
1678### Phase 9: Unified `process_newly_available_git_data` Function
1679
1680**Goal**: Implement the unified function that handles all post-git-data-available processing.
1681
1682This is the core function described in [Unified Git Data Sync](unify-git-data-sync.md). It will be called by both:
1683- `handle_receive_pack` after a successful git push
1684- `RealSyncContext::process_newly_available_git_data` after purgatory sync fetches OIDs
1685
1686**Files**:
1687- `src/git/sync.rs` (extend - add `ProcessResult` alongside existing types)
1688
1689**Note**: `src/git/sync.rs` already exists with `sync_to_owner_repos`, `align_repository_with_state`, etc. This phase extends it with the unified processing function.
1690
1691**Deliverables**:
1692```rust
1693/// Result of processing newly available git data
1694#[derive(Debug, Default)]
1695pub struct ProcessResult {
1696 pub states_released: usize,
1697 pub prs_released: usize,
1698 pub repos_synced: usize,
1699 pub refs_created: usize,
1700 pub refs_updated: usize,
1701 pub refs_deleted: usize,
1702 pub errors: Vec<String>,
1703}
1704
1705/// Unified processing of newly available git data.
1706///
1707/// Called whenever git data becomes available, whether from:
1708/// - A successful `git push` (handle_receive_pack)
1709/// - Purgatory sync fetching OIDs from remote servers
1710pub async fn process_newly_available_git_data(
1711 source_repo_path: &Path,
1712 new_oids: &HashSet<String>,
1713 database: &SharedDatabase,
1714 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1715 purgatory: &Purgatory,
1716 git_data_path: &Path,
1717) -> Result<ProcessResult>;
1718
1719// Helper functions
1720fn extract_identifier_from_repo_path(repo_path: &Path, git_data_path: &Path) -> Option<String>;
1721fn extract_identifier_from_pr_event(event: &Event) -> Option<String>;
1722
1723// Purgatory additions
1724impl Purgatory {
1725 /// Find all PR events for an identifier.
1726 /// Filters pr_events entries where the identifier matches (no secondary index needed).
1727 pub fn find_prs_for_identifier(&self, identifier: &str) -> Vec<PrPurgatoryEntry>;
1728}
1729```
1730
1731**Unit Tests** (3 tests):
1732```rust
1733#[cfg(test)]
1734mod tests {
1735 #[test]
1736 fn extract_identifier_from_repo_path_valid() {
1737 // {git_data_path}/{npub}/{identifier}.git → identifier
1738 }
1739
1740 #[test]
1741 fn extract_identifier_from_pr_event_valid() {
1742 // Event with "a" tag "30617:<pubkey>:<identifier>" → identifier
1743 }
1744
1745 #[test]
1746 fn extract_identifier_from_pr_event_missing_tag() {
1747 // Event without "a" tag → None
1748 }
1749}
1750```
1751
1752**Success Criteria**:
1753- [ ] `process_newly_available_git_data` discovers satisfiable events from purgatory
1754- [ ] State events: syncs OIDs to owner repos, aligns refs, sets HEAD, saves to DB, notifies WS, removes from purgatory
1755- [ ] PR events: syncs commit to owner repos, creates refs/nostr/<event-id>, saves to DB, notifies WS, removes from purgatory
1756- [ ] `find_prs_for_identifier` filters pr_events by identifier correctly
1757- [ ] All 3 unit tests pass
1758
1759---
1760
1761### Phase 10: Update `handle_receive_pack` to Use Unified Function
1762
1763**Goal**: Refactor the push authorization handler to use `process_newly_available_git_data`.
1764
1765This replaces ~100 lines of duplicated post-push processing with a single call to the unified function.
1766
1767**Files**:
1768- `src/git/handlers.rs` (modify)
1769
1770**Before** (current code):
1771```rust
1772// After git receive-pack succeeds:
1773// - try_set_head_if_available()
1774// - database.save_event()
1775// - remove_state_event() / remove_pr()
1776// - relay.notify_event()
1777// - sync_to_owner_repos()
1778// - sync_pr_refs_to_tagged_owner_repos()
1779// ... ~100 lines of processing
1780```
1781
1782**After** (simplified):
1783```rust
1784// After git receive-pack succeeds:
1785let new_oids: HashSet<String> = pushed_refs
1786 .iter()
1787 .filter(|(_, new_oid, _)| new_oid != "0000000000000000000000000000000000000000")
1788 .map(|(_, new_oid, _)| new_oid.clone())
1789 .collect();
1790
1791let result = process_newly_available_git_data(
1792 &repo_path,
1793 &new_oids,
1794 &database,
1795 Some(&relay),
1796 &purgatory,
1797 Path::new(git_data_path),
1798).await;
1799
1800info!(
1801 "Processed push: {} states, {} PRs released, {} repos synced",
1802 result.states_released,
1803 result.prs_released,
1804 result.repos_synced
1805);
1806```
1807
1808**Unit Tests** (0 tests):
1809- Behavior tested via existing integration tests which should continue to pass
1810
1811**Success Criteria**:
1812- [ ] `handle_receive_pack` uses `process_newly_available_git_data`
1813- [ ] Duplicate code removed (~100 lines)
1814- [ ] All existing push-related tests still pass
1815- [ ] Push behavior unchanged (same events saved, same refs created)
1816
1817---
1818
1819### Phase 11: RealSyncContext Implementation
1820
1821**Goal**: Implement the production `SyncContext` that connects to real systems.
1822
1823**Files**:
1824- `src/purgatory/sync/context.rs` (extend)
1825
1826**Deliverables**:
1827```rust
1828pub struct RealSyncContext {
1829 purgatory: Purgatory,
1830 database: SharedDatabase,
1831 git_data_path: PathBuf,
1832 our_domain: Option<String>,
1833 local_relay: Option<LocalRelay>,
1834}
1835
1836impl SyncContext for RealSyncContext { ... }
1837```
1838
1839**Unit Tests** (0 tests):
1840- `RealSyncContext` is tested via integration tests
1841
1842**Success Criteria**:
1843- [ ] All `SyncContext` methods implemented
1844- [ ] Connects to real database, git, and relay
1845- [ ] `process_newly_available_git_data` method delegates to unified function from Phase 9
1846
1847---
1848
1849### Phase 12: Integration Tests
1850
1851**Goal**: Verify end-to-end sync behavior with real relay instances.
1852
1853**Files**:
1854- `tests/purgatory_sync.rs` (new)
1855
1856**Integration Tests** (5 tests):
1857```rust
1858#[tokio::test]
1859async fn state_event_syncs_from_remote() {
1860 // State event enters purgatory, git data fetched, event released
1861}
1862
1863#[tokio::test]
1864async fn pr_event_syncs_from_remote() {
1865 // PR event enters purgatory, commit fetched, event released
1866}
1867
1868#[tokio::test]
1869async fn concurrent_state_and_pr_sync() {
1870 // Both event types sync correctly when arriving together
1871}
1872
1873#[tokio::test]
1874async fn partial_oid_aggregation_from_multiple_servers() {
1875 // OIDs aggregated when no single server has all
1876}
1877
1878#[tokio::test]
1879async fn push_triggers_unified_processing() {
1880 // Git push triggers process_newly_available_git_data
1881 // Verifies Phase 10 integration
1882}
1883```
1884
1885**Success Criteria**:
1886- [ ] All 5 integration tests pass
1887- [ ] State events release after git sync
1888- [ ] PR events release after commit sync
1889- [ ] Partial OID scenarios handled correctly
1890- [ ] Push path uses unified function (same behavior as purgatory sync)
1891
1892---
1893
1894### Phase 13: Cleanup
1895
1896**Goal**: Remove old `start_state_sync` code and wire up new system.
1897
1898**Files**:
1899- `src/purgatory/mod.rs` (modify)
1900- `src/main.rs` (modify)
1901
1902**Deliverables**:
1903- Remove `start_state_sync` method
1904- Wire `start_sync_loop` into application startup
1905- Update `add_state` to call `enqueue_sync`
1906
1907**Success Criteria**:
1908- [ ] Old sync code removed
1909- [ ] New sync loop starts on application boot
1910- [ ] All existing tests still pass
1911- [ ] All new tests pass
1912
1913---
1914
1915## Test Summary
1916
1917| Phase | Unit Tests | Integration Tests | Total |
1918|-------|------------|-------------------|-------|
1919| 1. SyncQueueEntry | 2 | - | 2 |
1920| 2. DomainThrottle | 4 | - | 4 |
1921| 3. ThrottleManager (rate limiting) | 1 | - | 1 |
1922| 4. SyncContext | 0 | - | 0 |
1923| 5. Core Functions | 3 | - | 3 |
1924| 6. sync_identifier + queue triggers | 2 | - | 2 |
1925| 7. Queue Integration | 1 | - | 1 |
1926| 8. Sync Loop | 0 | - | 0 |
1927| 9. Unified Function | 3 | - | 3 |
1928| 10. Push Handler Update | 0 | - | 0 |
1929| 11. RealSyncContext | 0 | - | 0 |
1930| 12. Integration | - | 5 | 5 |
1931| 13. Cleanup | 0 | - | 0 |
1932| **Total** | **16** | **5** | **21** |
1933
1934## Configuration
1935
1936| Option | CLI Flag | Environment Variable | Default |
1937|--------|----------|---------------------|---------|
1938| Domain concurrent limit | `--sync-domain-concurrent` | `NGIT_SYNC_DOMAIN_CONCURRENT` | `5` |
1939| Domain rate limit | `--sync-domain-rate-limit` | `NGIT_SYNC_DOMAIN_RATE_LIMIT` | `30` |
1940| Default sync delay | `--sync-default-delay-secs` | `NGIT_SYNC_DEFAULT_DELAY_SECS` | `180` |
1941| Immediate sync delay | `--sync-immediate-delay-ms` | `NGIT_SYNC_IMMEDIATE_DELAY_MS` | `500` |
1942
1943**Note**: Sync loop interval is hardcoded to 1 second (not configurable).
1944
1945## Observability
1946
1947### Metrics
1948
1949- `purgatory_sync_queue_size` - Identifiers pending sync
1950- `purgatory_sync_attempts_total` - Sync attempts per identifier
1951- `purgatory_sync_oids_fetched_total` - OIDs successfully fetched
1952- `purgatory_domain_in_flight` - In-flight requests per domain
1953- `purgatory_domain_requests_total` - Total requests per domain
1954
1955### Logging
1956
1957- `INFO`: Successful sync completion, OIDs fetched
1958- `DEBUG`: URL attempts, throttle decisions, backoff applied
1959- `WARN`: Fetch failures, processing errors