upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/purgatory-sync-redesign.md
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-06 14:27:59 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-06 14:27:59 +0000
commit71914d92a1bc296b3a585b60989cf0f1bf70dd4b (patch)
tree693eb746b25a2c4547297efcf2102ba4dba2e171 /docs/explanation/purgatory-sync-redesign.md
parent96203000f81a46976834971dd26e1a79465e6303 (diff)
docs: purgatory git sync design
Diffstat (limited to 'docs/explanation/purgatory-sync-redesign.md')
-rw-r--r--docs/explanation/purgatory-sync-redesign.md1328
1 files changed, 1328 insertions, 0 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md
new file mode 100644
index 0000000..ec02e75
--- /dev/null
+++ b/docs/explanation/purgatory-sync-redesign.md
@@ -0,0 +1,1328 @@
1# Purgatory Sync Redesign
2
3## Status
4
5**Proposed** - January 2026
6
7## Context
8
9The current purgatory sync implementation (`start_state_sync` at `src/purgatory/mod.rs:510`) has several limitations:
10
111. **Per-event syncing**: Each state event triggers its own independent sync operation
122. **No PR event syncing**: PR events enter purgatory but don't trigger git data fetching
133. **No batching**: Multiple events for the same repository cause redundant fetch requests
144. **No rate limiting**: Can overwhelm remote git servers or get rate-limited
155. **No coordination**: Multiple concurrent syncs may fetch the same OIDs
16
17When syncing a new repository, we often receive multiple state and PR events in a burst. The current approach creates unnecessary load on remote servers and doesn't handle this common case efficiently.
18
19## Decision
20
21Redesign purgatory sync to be **identifier-based** rather than **event-based**, with:
22
231. A background sync loop that processes identifiers, not individual events
242. Batched OID fetching across all purgatory events for an identifier
253. Domain-based throttling (30 requests/minute per domain)
264. Exponential backoff per identifier (20s → 2m, then 2m intervals)
275. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default)
28
29## Architecture
30
31### Overview
32
33```
34┌──────────────────────────────────────────────────────────────────────────────┐
35│ Purgatory │
36│ │
37│ ┌─────────────────┐ ┌─────────────────┐ │
38│ │ State Events │ │ PR Events │ │
39│ │ (by identifier)│ │ (by event_id) │ │
40│ └────────┬────────┘ └────────┬────────┘ │
41│ │ │ │
42│ └──────────┬─────────┘ │
43│ │ add_state() / add_pr() / trigger_immediate_sync() │
44│ ▼ │
45│ ┌──────────────────────────┐ │
46│ │ Sync Queue │ │
47│ │ DashMap<id, Entry> │ │
48│ │ │ │
49│ │ Entry { │ │
50│ │ next_attempt, │ ← delay/backoff timer │
51│ │ attempt_count, │ ← for backoff calculation │
52│ │ in_progress, │ ← prevents concurrent runs │
53│ │ } │ │
54│ └────────────┬─────────────┘ │
55│ │ │
56│ ┌─────────────────────┼─────────────────────────────────────────────────┐ │
57│ │ ▼ │ │
58│ │ ┌─────────────────────┐ │ │
59│ │ │ Main Sync Loop │ (every 1s) │ │
60│ │ │ │ │ │
61│ │ │ 1. Find ALL ready │ │ │
62│ │ │ identifiers │ │ │
63│ │ │ 2. Spawn parallel │ │ │
64│ │ │ tasks for each │───────┐ │ │
65│ │ │ 3. Apply backoff │ │ (parallel tasks) │ │
66│ │ │ when done │ │ │ │
67│ │ └─────────────────────┘ │ │ │
68│ │ ▼ │ │
69│ │ ┌─────────────────────────────────────┐ │ │
70│ │ │ sync_identifier() │ │ │
71│ │ │ │ │ │
72│ │ │ while (events && oids && urls): │ │ │
73│ │ │ 1. Recalc URLs/OIDs (fresh) │ │ │
74│ │ │ 2. Get untried URLs per domain │ │ │
75│ │ │ 3. Skip throttled domains │ │ │
76│ │ │ 4. Try available URLs │ │ │
77│ │ │ (respecting domain limits) │ │ │
78│ │ │ 5. Process satisfiable events │ │ │
79│ │ │ 6. Loop catches new events/URLs │ │ │
80│ │ └─────────────────────────────────────┘ │ │
81│ │ │ │ │
82│ │ ▼ │ │
83│ │ ┌─────────────────────────────────────┐ │ │
84│ │ │ Domain Throttle │ │ │
85│ │ │ │ │ │
86│ │ │ Per-domain state: │ │ │
87│ │ │ - 5 concurrent requests max │ │ │
88│ │ │ - 30 requests/min sliding window │ │ │
89│ │ │ │ │ │
90│ │ │ Per (domain, identifier) state: │ │ │
91│ │ │ - in_progress: bool │ │ │
92│ │ │ - urls_tried: HashSet<String> │ │ │
93│ │ │ │ │ │
94│ │ │ Identifier removed when all URLs │ │ │
95│ │ │ tried. Re-added on next sync. │ │ │
96│ │ └─────────────────────────────────────┘ │ │
97│ │ │ │
98│ └───────────────────────────────────────────────────────────────────────┘ │
99└──────────────────────────────────────────────────────────────────────────────┘
100```
101
102### Flow Summary
103
1041. **Event arrives** → added to state_events/pr_events + sync_queue with delay
105 - User-submitted: 3 minute delay (expect git push to follow)
106 - Sync-triggered: 500ms delay (batch burst arrivals)
107 - `enqueue_sync()` resets `attempt_count` to 0 and updates `next_attempt` if needed
108
1092. **Main sync loop** (every 1s):
110 - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`)
111 - Spawns parallel tasks for each (marks `in_progress = true`)
112 - Each `sync_identifier()` task runs a while loop:
113 - Recalculates URLs and OIDs fresh each iteration (catches new events/announcements)
114 - For each domain, gets untried URLs (via `domain_throttle`)
115 - Skips domains that are fully throttled
116 - Tries non-throttled domains first, then throttled if still need OIDs
117 - Respects per-domain concurrency (5 max) and rate limits (30/min)
118 - Processes satisfiable events
119 - Continues while: events remain AND OIDs missing AND untried URLs exist
120 - When task completes (regardless of outcome):
121 - If `next_attempt` is in the future (new event arrived): just clear `in_progress`
122 - Otherwise: increment `attempt_count`, set `next_attempt = now + backoff`, clear `in_progress`
123 - If no pending events remain: remove identifier from sync_queue
124
1253. **Domain throttle state management**:
126 - Tracks `(in_progress, urls_tried)` per `(domain, identifier)` pair
127 - `in_progress` prevents parallel fetches to same domain for same identifier
128 - `urls_tried` tracks which URLs have been attempted
129 - When all URLs for a domain+identifier are tried, that entry is removed
130 - Entry may be re-added on next sync attempt if new URLs available
131
132### New Data Structures
133
134#### SyncQueueEntry
135
136Tracks sync state for each identifier in the main sync queue:
137
138```rust
139/// Entry in the sync queue tracking when/how to sync an identifier
140#[derive(Debug, Clone)]
141pub struct SyncQueueEntry {
142 /// Don't attempt sync before this time
143 /// Set for: initial delay (3min user / 500ms sync), backoff after attempts
144 pub next_attempt: Instant,
145
146 /// Number of sync attempts (for backoff calculation)
147 /// Reset to 0 when new event arrives for this identifier
148 pub attempt_count: u32,
149
150 /// Whether a sync is currently in progress for this identifier
151 /// Prevents concurrent sync runs for the same identifier
152 pub in_progress: bool,
153}
154
155impl SyncQueueEntry {
156 /// Create new entry with specified delay
157 pub fn new(delay: Duration) -> Self {
158 Self {
159 next_attempt: Instant::now() + delay,
160 attempt_count: 0,
161 in_progress: false,
162 }
163 }
164
165 /// Calculate backoff duration: 20s, 40s, 80s, 120s (max 2min)
166 pub fn backoff(&self) -> Duration {
167 let base = Duration::from_secs(20);
168 let multiplier = 2u32.saturating_pow(self.attempt_count.saturating_sub(1).min(3));
169 let backoff = base * multiplier;
170 backoff.min(Duration::from_secs(120)) // Cap at 2 minutes
171 }
172
173 /// Check if this entry is ready to sync (not in progress, delay passed)
174 pub fn is_ready(&self) -> bool {
175 !self.in_progress && Instant::now() >= self.next_attempt
176 }
177
178 /// Called when new event arrives - resets attempt_count, may update next_attempt
179 pub fn on_new_event(&mut self, delay: Duration) {
180 self.attempt_count = 0;
181 let new_attempt = Instant::now() + delay;
182 // Only bring forward if new time is sooner
183 if new_attempt < self.next_attempt {
184 self.next_attempt = new_attempt;
185 }
186 }
187
188 /// Called when sync attempt completes
189 /// If next_attempt is in the future (new event arrived during sync), just clear in_progress
190 /// Otherwise, apply backoff
191 pub fn on_sync_complete(&mut self) {
192 self.in_progress = false;
193 let now = Instant::now();
194 if self.next_attempt <= now {
195 // No new event arrived during sync - apply backoff
196 self.attempt_count += 1;
197 self.next_attempt = now + self.backoff();
198 }
199 // else: new event arrived during sync, next_attempt already set, don't apply backoff
200 }
201}
202```
203
204#### DomainThrottle
205
206Tracks per-domain rate limiting and per-(domain, identifier) fetch state:
207
208```rust
209/// Tracks domain-level rate limiting and per-identifier fetch state
210///
211/// Rate limits: 5 concurrent requests, 30 requests/minute per domain
212///
213/// Per (domain, identifier): tracks which URLs have been tried and whether
214/// a fetch is currently in progress. When all URLs are tried, the identifier
215/// is removed from that domain's tracking. It may be re-added on the next
216/// sync attempt if new URLs become available.
217pub struct DomainThrottle {
218 /// Per-domain, per-identifier state: (in_progress, urls_tried)
219 /// domain -> identifier -> (in_progress, urls_tried)
220 state: DashMap<String, DashMap<String, (bool, HashSet<String>)>>,
221
222 /// Count of currently in-flight requests per domain
223 in_flight: DashMap<String, u32>,
224
225 /// Request timestamps per domain (sliding window for rate limiting)
226 request_times: DashMap<String, VecDeque<Instant>>,
227
228 /// Maximum concurrent requests per domain
229 max_concurrent: u32,
230
231 /// Maximum requests per minute per domain
232 max_per_minute: u32,
233}
234
235impl DomainThrottle {
236 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self {
237 Self {
238 state: DashMap::new(),
239 in_flight: DashMap::new(),
240 request_times: DashMap::new(),
241 max_concurrent,
242 max_per_minute,
243 }
244 }
245
246 /// Check if domain has capacity for another request
247 /// (both concurrent limit and rate limit)
248 pub fn has_capacity(&self, domain: &str) -> bool {
249 // Check concurrent limit
250 let current_in_flight = self.in_flight.get(domain).map_or(0, |v| *v);
251 if current_in_flight >= self.max_concurrent {
252 return false;
253 }
254
255 // Check rate limit (sliding window)
256 let now = Instant::now();
257 let window = Duration::from_secs(60);
258 self.request_times
259 .get(domain)
260 .map_or(true, |times| {
261 times.iter().filter(|t| now.duration_since(**t) < window).count()
262 < self.max_per_minute as usize
263 })
264 }
265
266 /// Check if identifier is currently fetching from this domain
267 pub fn is_in_progress(&self, domain: &str, identifier: &str) -> bool {
268 self.state
269 .get(domain)
270 .and_then(|domain_state| domain_state.get(identifier))
271 .map_or(false, |entry| entry.0)
272 }
273
274 /// Get untried URLs for an identifier from a specific domain
275 /// Returns URLs from `available_urls` that haven't been tried yet
276 pub fn get_untried_urls(
277 &self,
278 domain: &str,
279 identifier: &str,
280 available_urls: &[String]
281 ) -> Vec<String> {
282 let tried = self.state
283 .get(domain)
284 .and_then(|domain_state| domain_state.get(identifier))
285 .map(|entry| entry.1.clone())
286 .unwrap_or_default();
287
288 available_urls
289 .iter()
290 .filter(|url| !tried.contains(*url))
291 .cloned()
292 .collect()
293 }
294
295 /// Mark a URL as being fetched (in progress)
296 /// Returns false if domain has no capacity or identifier already in progress for this domain
297 pub fn start_fetch(&self, domain: &str, identifier: &str, url: &str) -> bool {
298 // Check capacity
299 if !self.has_capacity(domain) {
300 return false;
301 }
302
303 // Check if already in progress for this domain+identifier
304 if self.is_in_progress(domain, identifier) {
305 return false;
306 }
307
308 // Increment in-flight counter
309 *self.in_flight.entry(domain.to_string()).or_insert(0) += 1;
310
311 // Record request time
312 self.request_times
313 .entry(domain.to_string())
314 .or_default()
315 .push_back(Instant::now());
316
317 // Mark in progress and add URL to tried set
318 self.state
319 .entry(domain.to_string())
320 .or_default()
321 .entry(identifier.to_string())
322 .and_modify(|(in_progress, tried)| {
323 *in_progress = true;
324 tried.insert(url.to_string());
325 })
326 .or_insert_with(|| {
327 let mut tried = HashSet::new();
328 tried.insert(url.to_string());
329 (true, tried)
330 });
331
332 true
333 }
334
335 /// Mark a fetch as complete
336 pub fn complete_fetch(&self, domain: &str, identifier: &str) {
337 // Decrement in-flight counter
338 if let Some(mut count) = self.in_flight.get_mut(domain) {
339 *count = count.saturating_sub(1);
340 }
341
342 // Clear in_progress flag
343 if let Some(domain_state) = self.state.get(domain) {
344 if let Some(mut entry) = domain_state.get_mut(identifier) {
345 entry.0 = false;
346 }
347 }
348
349 // Clean old request times
350 let now = Instant::now();
351 let window = Duration::from_secs(60);
352 if let Some(mut times) = self.request_times.get_mut(domain) {
353 while times.front().map_or(false, |t| now.duration_since(*t) >= window) {
354 times.pop_front();
355 }
356 }
357 }
358
359 /// Check if all URLs have been tried for this domain+identifier
360 pub fn all_urls_tried(
361 &self,
362 domain: &str,
363 identifier: &str,
364 available_urls: &[String]
365 ) -> bool {
366 self.get_untried_urls(domain, identifier, available_urls).is_empty()
367 }
368
369 /// Remove identifier from a domain's tracking (called when all URLs tried)
370 pub fn remove_identifier_from_domain(&self, domain: &str, identifier: &str) {
371 if let Some(domain_state) = self.state.get(domain) {
372 domain_state.remove(identifier);
373 }
374 }
375
376 /// Remove identifier from all domains (called when sync complete or events expired)
377 pub fn remove_identifier(&self, identifier: &str) {
378 for domain_entry in self.state.iter() {
379 domain_entry.value().remove(identifier);
380 }
381 }
382
383 /// Get all domains where this identifier has untried URLs
384 pub fn domains_with_untried_urls(&self, identifier: &str) -> Vec<String> {
385 self.state
386 .iter()
387 .filter(|entry| entry.value().contains_key(identifier))
388 .map(|entry| entry.key().clone())
389 .collect()
390 }
391}
392```
393
394### Modified Purgatory Structure
395
396```rust
397pub struct Purgatory {
398 /// State events (kind 30618) indexed by repository identifier
399 state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>,
400
401 /// PR events (kind 1617/1618) indexed by event ID
402 pr_events: Arc<DashMap<String, PrPurgatoryEntry>>,
403
404 /// NEW: Sync queue - identifiers pending git data sync
405 sync_queue: Arc<DashMap<String, SyncQueueEntry>>,
406
407 /// NEW: Domain-level throttling and per-identifier fetch state
408 domain_throttle: Arc<DomainThrottle>,
409
410 git_data_path: PathBuf,
411}
412```
413
414### Key Methods
415
416#### Adding Events to Purgatory
417
418```rust
419impl Purgatory {
420 /// Add a state event to purgatory (user-submitted, 3min delay)
421 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) {
422 // ... existing logic to add to state_events ...
423
424 // Add to sync queue with 3 minute delay
425 self.enqueue_sync(&identifier, Duration::from_secs(180));
426 }
427
428 /// Add a PR event to purgatory (user-submitted, 3min delay)
429 pub fn add_pr(&self, event: Event, event_id: String, commit: String) {
430 // ... existing logic to add to pr_events ...
431
432 // Extract identifier from event's `a` tag and enqueue sync
433 if let Some(identifier) = extract_identifier_from_pr(&event) {
434 self.enqueue_sync(&identifier, Duration::from_secs(180));
435 }
436 }
437
438 /// Trigger immediate sync for an identifier (called from negentropy sync)
439 /// Still applies 500ms debounce for batching burst arrivals
440 pub fn trigger_immediate_sync(&self, identifier: &str) {
441 self.enqueue_sync(identifier, Duration::from_millis(500));
442 }
443
444 /// Internal: Add identifier to sync queue with specified delay
445 fn enqueue_sync(&self, identifier: &str, delay: Duration) {
446 self.sync_queue
447 .entry(identifier.to_string())
448 .and_modify(|entry| {
449 // New event arrived - reset backoff, potentially update timing
450 entry.reset(delay);
451 })
452 .or_insert_with(|| SyncQueueEntry::new(delay));
453 }
454}
455```
456
457#### Extracting Identifier from PR Events
458
459```rust
460/// Extract repository identifier from PR event's `a` tag
461fn extract_identifier_from_pr(event: &Event) -> Option<String> {
462 event.tags.iter().find_map(|tag| {
463 let tag_vec = tag.clone().to_vec();
464 if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") {
465 // Format: 30617:<owner>:<identifier>
466 let parts: Vec<&str> = tag_vec[1].split(':').collect();
467 if parts.len() >= 3 {
468 return Some(parts[2].to_string());
469 }
470 }
471 None
472 })
473}
474```
475
476### Sync Loop
477
478A single background loop handles syncing. The `DomainThrottle` is just state tracking, not a separate processing queue.
479
480```rust
481impl Purgatory {
482 /// Start the background sync loop
483 pub fn start_sync_loop(
484 self: Arc<Self>,
485 database: SharedDatabase,
486 our_domain: Option<String>,
487 local_relay: Option<nostr_relay_builder::LocalRelay>,
488 ) -> tokio::task::JoinHandle<()> {
489 tokio::spawn(async move {
490 let mut interval = tokio::time::interval(Duration::from_secs(1));
491
492 loop {
493 interval.tick().await;
494
495 // Find ALL ready identifiers
496 let ready_identifiers: Vec<String> = self.sync_queue
497 .iter()
498 .filter(|entry| entry.value().is_ready())
499 .map(|entry| entry.key().clone())
500 .collect();
501
502 // Spawn sync tasks in parallel for each ready identifier
503 for identifier in ready_identifiers {
504 // Check if there are still events in purgatory for this identifier
505 if !self.has_pending_events(&identifier) {
506 self.sync_queue.remove(&identifier);
507 self.domain_throttle.remove_identifier(&identifier);
508 continue;
509 }
510
511 // Mark as in progress (prevents re-spawning on next tick)
512 if let Some(mut entry) = self.sync_queue.get_mut(&identifier) {
513 if entry.in_progress {
514 continue; // Already running
515 }
516 entry.in_progress = true;
517 }
518
519 // Spawn task for this identifier
520 let purgatory = self.clone();
521 let db = database.clone();
522 let domain = our_domain.clone();
523 let relay = local_relay.clone();
524 let id = identifier.clone();
525
526 tokio::spawn(async move {
527 purgatory.sync_identifier(
528 &id,
529 &db,
530 domain.as_deref(),
531 relay.as_ref(),
532 ).await;
533
534 // Check if events remain
535 if !purgatory.has_pending_events(&id) {
536 purgatory.sync_queue.remove(&id);
537 purgatory.domain_throttle.remove_identifier(&id);
538 tracing::info!(identifier = %id, "Sync complete, removed from queues");
539 } else {
540 // Apply backoff (or not, if new event arrived during sync)
541 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) {
542 entry.on_sync_complete();
543 tracing::debug!(
544 identifier = %id,
545 attempt = entry.attempt_count,
546 next_attempt_secs = entry.next_attempt.duration_since(Instant::now()).as_secs(),
547 "Sync attempt complete, scheduled next attempt"
548 );
549 }
550 }
551 });
552 }
553 }
554 })
555 }
556
557 /// Check if there are pending events for an identifier
558 fn has_pending_events(&self, identifier: &str) -> bool {
559 // Check state events
560 if self.state_events.get(identifier).map_or(false, |v| !v.is_empty()) {
561 return true;
562 }
563
564 // Check PR events (need to scan for matching identifier)
565 for entry in self.pr_events.iter() {
566 if let Some(ref event) = entry.value().event {
567 if extract_identifier_from_pr(event).as_deref() == Some(identifier) {
568 return true;
569 }
570 }
571 }
572
573 false
574 }
575}
576```
577
578### Core Sync Logic
579
580```rust
581impl Purgatory {
582 /// Sync git data for all purgatory events with this identifier
583 ///
584 /// Uses a while loop that:
585 /// 1. Recalculates URLs and OIDs fresh each iteration (catches new events/announcements)
586 /// 2. Gets untried URLs per domain from DomainThrottle
587 /// 3. Skips domains that are fully throttled, tries available ones
588 /// 4. Continues while: events remain AND OIDs missing AND untried URLs exist
589 async fn sync_identifier(
590 &self,
591 identifier: &str,
592 database: &SharedDatabase,
593 our_domain: Option<&str>,
594 local_relay: Option<&nostr_relay_builder::LocalRelay>,
595 ) {
596 loop {
597 // 1. Check if any events remain (may have expired or been processed)
598 if !self.has_pending_events(identifier) {
599 return;
600 }
601
602 // 2. Collect all OIDs needed (fresh calculation each iteration)
603 let needed_oids = self.collect_needed_oids(identifier);
604
605 if needed_oids.is_empty() {
606 // No OIDs needed - try to process events and exit
607 self.try_process_events(identifier, database, our_domain, local_relay).await;
608 return;
609 }
610
611 // 3. Get repository data and clone URLs (fresh calculation each iteration)
612 let db_repo_data = match fetch_repository_data(database, identifier).await {
613 Ok(data) => data,
614 Err(e) => {
615 tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data");
616 return;
617 }
618 };
619
620 // 4. Collect clone URLs, excluding our domain
621 let all_clone_urls: Vec<String> = db_repo_data
622 .announcements
623 .iter()
624 .flat_map(|a| a.clone_urls.iter().cloned())
625 .filter(|url| our_domain.map_or(true, |d| !url.contains(d)))
626 .collect::<HashSet<_>>()
627 .into_iter()
628 .collect();
629
630 if all_clone_urls.is_empty() {
631 tracing::debug!(identifier = %identifier, "No external clone URLs available");
632 return;
633 }
634
635 // 5. Group URLs by domain
636 let urls_by_domain: HashMap<String, Vec<String>> = all_clone_urls
637 .iter()
638 .fold(HashMap::new(), |mut acc, url| {
639 let domain = extract_domain(url);
640 acc.entry(domain).or_default().push(url.clone());
641 acc
642 });
643
644 // 6. Find best local repo to fetch into
645 let target_repo = match self.find_target_repo(&db_repo_data, identifier) {
646 Some(path) => path,
647 None => {
648 tracing::debug!(identifier = %identifier, "No local repository found");
649 return;
650 }
651 };
652
653 // 7. Partition domains: available (have capacity) vs throttled
654 let (available_domains, throttled_domains): (Vec<_>, Vec<_>) = urls_by_domain
655 .keys()
656 .cloned()
657 .partition(|domain| self.domain_throttle.has_capacity(domain));
658
659 let mut remaining_oids: HashSet<String> = needed_oids.clone();
660 let mut any_fetch_started = false;
661
662 // 8. Try available domains first
663 for domain in &available_domains {
664 if remaining_oids.is_empty() {
665 break;
666 }
667
668 let domain_urls = urls_by_domain.get(domain).unwrap();
669 let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls);
670
671 for url in untried_urls {
672 if remaining_oids.is_empty() {
673 break;
674 }
675
676 // Skip if already in progress for this domain+identifier
677 if self.domain_throttle.is_in_progress(domain, identifier) {
678 break; // Wait for current fetch to complete
679 }
680
681 // Try to start fetch (checks capacity again, marks in_progress)
682 if !self.domain_throttle.start_fetch(domain, identifier, &url) {
683 break; // Domain at capacity
684 }
685
686 any_fetch_started = true;
687
688 // Fetch OIDs
689 let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect();
690 let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await;
691
692 // Mark fetch complete
693 self.domain_throttle.complete_fetch(domain, identifier);
694
695 match fetch_result {
696 Ok(fetched_oids) => {
697 if !fetched_oids.is_empty() {
698 let fetched_count = fetched_oids.len();
699 for oid in fetched_oids {
700 remaining_oids.remove(&oid);
701 }
702 tracing::info!(
703 identifier = %identifier,
704 url = %url,
705 fetched = fetched_count,
706 remaining = remaining_oids.len(),
707 "Fetched OIDs from server"
708 );
709 }
710 }
711 Err(e) => {
712 tracing::debug!(
713 identifier = %identifier,
714 url = %url,
715 error = %e,
716 "Failed to fetch from server"
717 );
718 }
719 }
720 }
721
722 // Clean up if all URLs tried for this domain
723 if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) {
724 self.domain_throttle.remove_identifier_from_domain(domain, identifier);
725 }
726 }
727
728 // 9. If still need OIDs, try throttled domains (they might have capacity now)
729 if !remaining_oids.is_empty() {
730 for domain in &throttled_domains {
731 if remaining_oids.is_empty() {
732 break;
733 }
734
735 // Re-check capacity (might have freed up)
736 if !self.domain_throttle.has_capacity(domain) {
737 continue;
738 }
739
740 let domain_urls = urls_by_domain.get(domain).unwrap();
741 let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls);
742
743 for url in untried_urls {
744 if remaining_oids.is_empty() {
745 break;
746 }
747
748 if self.domain_throttle.is_in_progress(domain, identifier) {
749 break;
750 }
751
752 if !self.domain_throttle.start_fetch(domain, identifier, &url) {
753 break;
754 }
755
756 any_fetch_started = true;
757
758 let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect();
759 let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await;
760
761 self.domain_throttle.complete_fetch(domain, identifier);
762
763 if let Ok(fetched_oids) = fetch_result {
764 for oid in fetched_oids {
765 remaining_oids.remove(&oid);
766 }
767 }
768 }
769
770 if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) {
771 self.domain_throttle.remove_identifier_from_domain(domain, identifier);
772 }
773 }
774 }
775
776 // 10. Try to process events that can now be satisfied
777 self.try_process_events(identifier, database, our_domain, local_relay).await;
778
779 // 11. Decide whether to continue looping
780 let still_have_events = self.has_pending_events(identifier);
781 if !still_have_events {
782 return;
783 }
784
785 let still_need_oids = !self.collect_needed_oids(identifier).is_empty();
786 if !still_need_oids {
787 // Events remain but no OIDs needed - loop to try processing again
788 continue;
789 }
790
791 // Check if there are any untried URLs left across all domains
792 let have_untried_urls = urls_by_domain.iter().any(|(domain, urls)| {
793 !self.domain_throttle.get_untried_urls(domain, identifier, urls).is_empty()
794 });
795
796 if !have_untried_urls {
797 // No more URLs to try - exit and let backoff handle retry
798 return;
799 }
800
801 // If no fetch was started this iteration (all throttled), yield briefly
802 if !any_fetch_started {
803 tokio::time::sleep(Duration::from_millis(100)).await;
804 }
805 }
806 }
807
808 /// Collect all OIDs needed for purgatory events with this identifier
809 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> {
810 let mut oids = HashSet::new();
811
812 // Collect from state events
813 if let Some(entries) = self.state_events.get(identifier) {
814 for entry in entries.iter() {
815 if let Ok(state) = RepositoryState::from_event(entry.event.clone()) {
816 for branch in &state.branches {
817 if !branch.commit.starts_with("ref: ") {
818 oids.insert(branch.commit.clone());
819 }
820 }
821 for tag in &state.tags {
822 if !tag.commit.starts_with("ref: ") {
823 oids.insert(tag.commit.clone());
824 }
825 }
826 }
827 }
828 }
829
830 // Collect from PR events
831 for entry in self.pr_events.iter() {
832 if let Some(ref event) = entry.value().event {
833 if extract_identifier_from_pr(event).as_deref() == Some(identifier) {
834 if let Some(commit) = extract_commit_from_pr(event) {
835 oids.insert(commit);
836 }
837 }
838 }
839 }
840
841 oids
842 }
843
844 /// Try to process events that can now be satisfied
845 async fn try_process_events(
846 &self,
847 identifier: &str,
848 database: &SharedDatabase,
849 our_domain: Option<&str>,
850 local_relay: Option<&nostr_relay_builder::LocalRelay>,
851 ) {
852 if !self.has_pending_events(identifier) {
853 return;
854 }
855
856 let db_repo_data = match fetch_repository_data(database, identifier).await {
857 Ok(data) => data,
858 Err(e) => {
859 tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data for processing");
860 return;
861 }
862 };
863
864 // Process state events (oldest first)
865 if let Some(mut entries) = self.state_events.get_mut(identifier) {
866 entries.sort_by_key(|e| e.event.created_at);
867
868 let mut to_remove = Vec::new();
869
870 for entry in entries.iter() {
871 if let Ok(state) = RepositoryState::from_event(entry.event.clone()) {
872 if self.can_satisfy_state(&state, &db_repo_data) {
873 match self.process_state_event(&state, &db_repo_data, database, local_relay).await {
874 Ok(()) => {
875 to_remove.push(entry.event.id);
876 }
877 Err(e) => {
878 tracing::warn!(
879 event_id = %entry.event.id,
880 error = %e,
881 "Failed to process state event"
882 );
883 }
884 }
885 }
886 }
887 }
888
889 entries.retain(|e| !to_remove.contains(&e.event.id));
890 }
891
892 // Process PR events (oldest first)
893 let mut pr_to_remove = Vec::new();
894 let mut pr_entries: Vec<_> = self.pr_events
895 .iter()
896 .filter_map(|entry| {
897 entry.value().event.as_ref().and_then(|event| {
898 if extract_identifier_from_pr(event).as_deref() == Some(identifier) {
899 Some((entry.key().clone(), event.clone(), entry.value().commit.clone()))
900 } else {
901 None
902 }
903 })
904 })
905 .collect();
906
907 pr_entries.sort_by_key(|(_, event, _)| event.created_at);
908
909 for (event_id, event, commit) in pr_entries {
910 if self.can_satisfy_pr(&commit, &db_repo_data) {
911 match self.process_pr_event(&event, &commit, &db_repo_data, database, local_relay).await {
912 Ok(()) => {
913 pr_to_remove.push(event_id);
914 }
915 Err(e) => {
916 tracing::warn!(
917 event_id = %event.id,
918 error = %e,
919 "Failed to process PR event"
920 );
921 }
922 }
923 }
924 }
925
926 for event_id in pr_to_remove {
927 self.pr_events.remove(&event_id);
928 }
929 }
930
931 /// Check if a state event can be satisfied (all OIDs available locally)
932 fn can_satisfy_state(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> bool {
933 for announcement in &db_repo_data.announcements {
934 let repo_path = self.git_data_path.join(announcement.repo_path());
935 if !repo_path.exists() {
936 continue;
937 }
938
939 let all_present = state.branches.iter().all(|b| {
940 b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit)
941 }) && state.tags.iter().all(|t| {
942 oid_exists(&repo_path, &t.commit)
943 });
944
945 if all_present {
946 return true;
947 }
948 }
949 false
950 }
951
952 /// Check if a PR event can be satisfied (commit available locally)
953 fn can_satisfy_pr(&self, commit: &str, db_repo_data: &RepositoryData) -> bool {
954 for announcement in &db_repo_data.announcements {
955 let repo_path = self.git_data_path.join(announcement.repo_path());
956 if repo_path.exists() && oid_exists(&repo_path, commit) {
957 return true;
958 }
959 }
960 false
961 }
962}
963
964/// Extract commit hash from PR event's `c` tag
965fn extract_commit_from_pr(event: &Event) -> Option<String> {
966 event.tags.iter().find_map(|tag| {
967 let tag_vec = tag.clone().to_vec();
968 if tag_vec.len() >= 2 && tag_vec[0] == "c" {
969 Some(tag_vec[1].clone())
970 } else {
971 None
972 }
973 })
974}
975```
976
977### Fetching OIDs
978
979```rust
980/// Fetch specific OIDs from a remote git server
981///
982/// Returns the list of OIDs that were successfully fetched (now exist locally).
983/// Git fetch may partially succeed, so we check which OIDs are available after.
984async fn fetch_oids_from_server(
985 repo_path: &Path,
986 server_url: &str,
987 oids: &[String],
988) -> Result<Vec<String>> {
989 if oids.is_empty() {
990 return Ok(Vec::new());
991 }
992
993 let repo_path = repo_path.to_path_buf();
994 let server_url = server_url.to_string();
995 let oids = oids.to_vec();
996
997 tokio::task::spawn_blocking(move || {
998 // Build git fetch command with all OIDs
999 let mut args = vec!["fetch", "--depth=1", &server_url];
1000 args.extend(oids.iter().map(|s| s.as_str()));
1001
1002 tracing::debug!(
1003 oids_count = oids.len(),
1004 server = %server_url,
1005 "Fetching OIDs"
1006 );
1007
1008 let output = Command::new("git")
1009 .args(&args)
1010 .current_dir(&repo_path)
1011 .output();
1012
1013 match output {
1014 Ok(result) => {
1015 // Check which OIDs we now have (regardless of command success)
1016 // git fetch may partially succeed
1017 let fetched: Vec<String> = oids
1018 .iter()
1019 .filter(|oid| oid_exists(&repo_path, oid))
1020 .cloned()
1021 .collect();
1022
1023 if !result.status.success() {
1024 let stderr = String::from_utf8_lossy(&result.stderr);
1025 tracing::debug!(
1026 server = %server_url,
1027 stderr = %stderr,
1028 fetched_count = fetched.len(),
1029 "git fetch returned non-zero but may have fetched some OIDs"
1030 );
1031 }
1032
1033 Ok(fetched)
1034 }
1035 Err(e) => {
1036 bail!("git fetch command error: {}", e)
1037 }
1038 }
1039 })
1040 .await?
1041}
1042
1043/// Extract domain from a URL for throttling
1044fn extract_domain(url: &str) -> String {
1045 url::Url::parse(url)
1046 .ok()
1047 .and_then(|u| u.host_str().map(|s| s.to_string()))
1048 .unwrap_or_else(|| url.to_string())
1049}
1050```
1051
1052### Helper Methods
1053
1054```rust
1055impl Purgatory {
1056 /// Find the best local repository to fetch OIDs into
1057 ///
1058 /// Prefers the repo with the most recent commit on its default branch,
1059 /// as it's most likely to have related git history.
1060 fn find_target_repo(&self, db_repo_data: &RepositoryData, identifier: &str) -> Option<PathBuf> {
1061 let mut best: Option<(Timestamp, PathBuf)> = None;
1062
1063 for announcement in &db_repo_data.announcements {
1064 let repo_path = self.git_data_path.join(announcement.repo_path());
1065 if !repo_path.exists() {
1066 continue;
1067 }
1068
1069 let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path)
1070 .unwrap_or(Timestamp::zero());
1071
1072 if best.as_ref().map_or(true, |(d, _)| commit_date > *d) {
1073 best = Some((commit_date, repo_path));
1074 }
1075 }
1076
1077 best.map(|(_, path)| path)
1078 }
1079
1080 /// Process a state event that can now be satisfied
1081 ///
1082 /// Syncs OIDs to all owner repos, aligns refs, saves to DB, notifies subscribers.
1083 async fn process_state_event(
1084 &self,
1085 state: &RepositoryState,
1086 db_repo_data: &RepositoryData,
1087 database: &SharedDatabase,
1088 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1089 ) -> Result<()> {
1090 // Find source repo (one that has all OIDs)
1091 let source_repo = self.find_repo_with_all_oids(state, db_repo_data)
1092 .ok_or_else(|| anyhow::anyhow!("No repo has all required OIDs"))?;
1093
1094 // Sync to other owner repos and align refs
1095 let sync_result = sync_to_owner_repos(&source_repo, state, db_repo_data, &self.git_data_path);
1096
1097 tracing::info!(
1098 identifier = %state.identifier,
1099 event_id = %state.event.id,
1100 repos_synced = sync_result.repos_synced,
1101 "Synced state from purgatory"
1102 );
1103
1104 // Save to database
1105 database.save_event(&state.event).await?;
1106
1107 // Notify subscribers
1108 if let Some(relay) = local_relay {
1109 relay.notify_event(state.event.clone());
1110 }
1111
1112 Ok(())
1113 }
1114
1115 /// Process a PR event that can now be satisfied
1116 async fn process_pr_event(
1117 &self,
1118 event: &Event,
1119 commit: &str,
1120 db_repo_data: &RepositoryData,
1121 database: &SharedDatabase,
1122 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1123 ) -> Result<()> {
1124 // Save to database
1125 database.save_event(event).await?;
1126
1127 // Notify subscribers
1128 if let Some(relay) = local_relay {
1129 relay.notify_event(event.clone());
1130 }
1131
1132 tracing::info!(
1133 event_id = %event.id,
1134 commit = %commit,
1135 "Processed PR event from purgatory"
1136 );
1137
1138 Ok(())
1139 }
1140
1141 /// Find a repository that has all OIDs required by a state event
1142 fn find_repo_with_all_oids(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> Option<PathBuf> {
1143 for announcement in &db_repo_data.announcements {
1144 let repo_path = self.git_data_path.join(announcement.repo_path());
1145 if !repo_path.exists() {
1146 continue;
1147 }
1148
1149 let all_present = state.branches.iter().all(|b| {
1150 b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit)
1151 }) && state.tags.iter().all(|t| {
1152 oid_exists(&repo_path, &t.commit)
1153 });
1154
1155 if all_present {
1156 return Some(repo_path);
1157 }
1158 }
1159 None
1160 }
1161}
1162```
1163```
1164
1165## Integration Points
1166
1167### 1. Negentropy Sync (src/purgatory/mod.rs:105-107)
1168
1169When events are added via negentropy sync, call `trigger_immediate_sync`:
1170
1171```rust
1172// In negentropy sync handler
1173purgatory.add_state(event, identifier.clone(), author);
1174purgatory.trigger_immediate_sync(&identifier); // NEW: triggers 500ms debounced sync
1175```
1176
1177### 2. Relay Startup
1178
1179Start the sync loop when the relay starts:
1180
1181```rust
1182// In main.rs or server setup
1183let domain_throttle = Arc::new(DomainThrottle::new(5, 30)); // 5 concurrent, 30/min
1184let purgatory = Arc::new(Purgatory::new(git_data_path, domain_throttle));
1185
1186let sync_handle = purgatory.clone().start_sync_loop(
1187 database.clone(),
1188 Some(domain.clone()),
1189 Some(local_relay.clone()),
1190);
1191```
1192
1193### 3. Shutdown
1194
1195The sync loop will naturally stop when the purgatory is dropped. No special shutdown handling needed since all state is in-memory.
1196
1197## Testing Strategy
1198
1199### Unit Tests
1200
12011. **SyncQueueEntry**
1202 - Verify backoff calculation: 20s → 40s → 80s → 120s → 120s
1203 - Verify `on_new_event()` resets attempt_count and updates next_attempt if sooner
1204 - Verify `on_sync_complete()` applies backoff only if next_attempt is in the past
1205 - Verify `is_ready()` respects both `next_attempt` and `in_progress`
1206
12072. **DomainThrottle**
1208 - Verify concurrent limit: 6th request to same domain blocked
1209 - Verify rate limit: 31st request in a minute blocked
1210 - Verify `has_capacity()` checks both limits
1211 - Verify `get_untried_urls()` returns only URLs not in urls_tried
1212 - Verify `start_fetch()` fails if already in_progress for domain+identifier
1213 - Verify `start_fetch()` adds URL to urls_tried
1214 - Verify `complete_fetch()` decrements in_flight and clears in_progress
1215 - Verify `all_urls_tried()` correctly identifies when done
1216 - Verify `remove_identifier_from_domain()` cleans up state
1217 - Verify `remove_identifier()` removes from all domains
1218
12193. **OID collection**
1220 - Verify OIDs extracted from state events correctly
1221 - Verify OIDs extracted from PR events correctly
1222 - Verify deduplication works across state and PR events
1223
12244. **Identifier extraction**
1225 - Verify `extract_identifier_from_pr()` handles various `a` tag formats
1226 - Verify `extract_commit_from_pr()` extracts `c` tag correctly
1227
1228### Integration Tests
1229
12301. **Sync against own implementation**
1231 - Start two ngit-grasp instances
1232 - Push to one, verify other can sync via purgatory
1233 - Verify partial OID availability handled correctly (some OIDs fetched, others missing)
1234
12352. **Burst handling**
1236 - Submit 10 events for same identifier within 100ms
1237 - Verify debounce: sync doesn't start until 500ms after last event
1238 - Verify only one sync operation runs (not 10)
1239
12403. **Backoff behavior**
1241 - Configure with unreachable clone URLs
1242 - Verify backoff timing: 20s, 40s, 80s, 120s, then stays at 120s
1243 - Verify new event arriving resets attempt_count to 0
1244 - Verify new event during sync prevents backoff (next_attempt already in future)
1245
12464. **Rate limiting**
1247 - Configure with single domain having multiple URLs
1248 - Trigger many sync operations
1249 - Verify only 30 requests made in first minute
1250 - Verify only 5 concurrent requests per domain
1251
12525. **Concurrent limit per domain+identifier**
1253 - Start fetch for domain+identifier
1254 - Verify second fetch attempt for same domain+identifier blocked
1255 - Verify fetch for different identifier on same domain allowed (up to 5)
1256
12576. **Parallel identifier processing**
1258 - Add events for 5 different identifiers
1259 - Verify all 5 sync tasks start in parallel (not serial)
1260 - Verify `in_progress` flag prevents duplicate tasks for same identifier
1261
12627. **Dynamic URL/OID recalculation**
1263 - Start sync for identifier
1264 - While sync is running, add new announcement with additional clone URL
1265 - Verify sync picks up new URL in next while loop iteration
1266 - Similarly for new events adding new OIDs
1267
12688. **urls_tried cleanup**
1269 - Sync identifier, exhaust all URLs for a domain
1270 - Verify identifier removed from that domain's state
1271 - Add new announcement with new URL for same domain
1272 - Verify new URL is tried on next sync attempt
1273
12749. **Mixed state and PR events**
1275 - Add state event and PR event for same identifier
1276 - Verify both OID sets collected
1277 - Verify both events processed when OIDs arrive
1278
127910. **Available domains first**
1280 - Have 10 URLs: 8 from available domains, 2 from throttled domain
1281 - Verify available domains tried first
1282 - Verify throttled domain only contacted if available didn't satisfy all OIDs
1283
1284## Migration Path
1285
12861. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle)
12872. **Phase 2**: Implement sync loop alongside existing `start_state_sync`
12883. **Phase 3**: Migrate state events to use new sync loop
12894. **Phase 4**: Add PR event syncing
12905. **Phase 5**: Remove old `start_state_sync` code
1291
1292## Configuration
1293
1294New configuration options:
1295
1296| Option | CLI Flag | Environment Variable | Default |
1297|--------|----------|---------------------|---------|
1298| Sync loop interval | `--sync-loop-interval-ms` | `NGIT_SYNC_LOOP_INTERVAL_MS` | `1000` |
1299| Domain concurrent limit | `--sync-domain-concurrent` | `NGIT_SYNC_DOMAIN_CONCURRENT` | `5` |
1300| Domain rate limit | `--sync-domain-rate-limit` | `NGIT_SYNC_DOMAIN_RATE_LIMIT` | `30` |
1301| Default sync delay | `--sync-default-delay-secs` | `NGIT_SYNC_DEFAULT_DELAY_SECS` | `180` |
1302| Immediate sync delay | `--sync-immediate-delay-ms` | `NGIT_SYNC_IMMEDIATE_DELAY_MS` | `500` |
1303
1304## Observability
1305
1306### Metrics
1307
1308- `purgatory_sync_queue_size` - Number of identifiers pending sync
1309- `purgatory_sync_attempts_total` - Counter of sync attempts per identifier
1310- `purgatory_sync_oids_fetched_total` - Counter of OIDs successfully fetched
1311- `purgatory_domain_in_flight` - Gauge of in-flight requests per domain
1312- `purgatory_domain_requests_total` - Counter of requests per domain
1313- `purgatory_sync_backoff_seconds` - Histogram of backoff durations applied
1314
1315### Logging
1316
1317Key log points:
1318- `INFO`: Successful sync completion, OIDs fetched
1319- `DEBUG`: Domain capacity checks, backoff applied, urls_tried state
1320- `WARN`: Fetch failures, processing errors
1321
1322## Open Questions
1323
13241. **PR placeholder handling**: Current code has `add_pr_placeholder()` for git-data-first scenario. How should this interact with the new sync system? (Probably: placeholders don't need syncing since git data already exists)
1325
13262. **Memory bounds**: Should we limit sync queue size? What happens if thousands of identifiers are pending?
1327
13283. **Persistence**: Currently all purgatory state is in-memory. Should sync queue state survive restarts? (Probably no - events will be re-synced via negentropy on restart)