upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/explanation/purgatory-sync-redesign.md1487
1 files changed, 529 insertions, 958 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md
index ec02e75..76d01b8 100644
--- a/docs/explanation/purgatory-sync-redesign.md
+++ b/docs/explanation/purgatory-sync-redesign.md
@@ -25,6 +25,7 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**,
253. Domain-based throttling (30 requests/minute per domain) 253. Domain-based throttling (30 requests/minute per domain)
264. Exponential backoff per identifier (20s → 2m, then 2m intervals) 264. Exponential backoff per identifier (20s → 2m, then 2m intervals)
275. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default) 275. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default)
286. **Clean separation of concerns**: Domain throttle handles rate limiting only; sync logic tracks its own tried URLs
28 29
29## Architecture 30## Architecture
30 31
@@ -69,36 +70,49 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**,
69│ │ ┌─────────────────────────────────────┐ │ │ 70│ │ ┌─────────────────────────────────────┐ │ │
70│ │ │ sync_identifier() │ │ │ 71│ │ │ sync_identifier() │ │ │
71│ │ │ │ │ │ 72│ │ │ │ │ │
72│ │ │ while (events && oids && urls): │ │ │ 73│ │ │ Owns its own tried_urls: HashSet │ │ │
73│ │ │ 1. Recalc URLs/OIDs (fresh) │ │ │ 74│ │ │ │ │ │
74│ │ │ 2. Get untried URLs per domain │ │ │ 75│ │ │ loop: │ │ │
75│ │ │ 3. Skip throttled domains │ │ │ 76│ │ │ sync_identifier_step() │ │ │
76│ │ │ 4. Try available URLs │ │ │ 77│ │ │ → (url_tried, complete) │ │ │
77│ │ │ (respecting domain limits) │ │ │ 78│ │ │ if complete: break │ │ │
78│ │ │ 5. Process satisfiable events │ │ │ 79│ │ │ tried_urls.insert(url_tried) │ │ │
79│ │ │ 6. Loop catches new events/URLs │ │ │
80│ │ └─────────────────────────────────────┘ │ │ 80│ │ └─────────────────────────────────────┘ │ │
81│ │ │ │ │ 81│ │ │ │ │
82│ │ ▼ │ │ 82│ │ ▼ │ │
83│ │ ┌─────────────────────────────────────┐ │ │ 83│ │ ┌─────────────────────────────────────┐ │ │
84│ │ │ Domain Throttle │ │ │ 84│ │ │ Domain Throttle │ │ │
85│ │ │ (rate limiting only) │ │ │
85│ │ │ │ │ │ 86│ │ │ │ │ │
86│ │ │ Per-domain state: │ │ │ 87│ │ │ Per-domain state: │ │ │
87│ │ │ - 5 concurrent requests max │ │ │ 88│ │ │ - in_flight: u32 │ │ │
88│ │ │ - 30 requests/min sliding window │ │ │ 89│ │ │ - request_times: VecDeque │ │ │
89│ │ │ │ │ │
90│ │ │ Per (domain, identifier) state: │ │ │
91│ │ │ - in_progress: bool │ │ │
92│ │ │ - urls_tried: HashSet<String> │ │ │
93│ │ │ │ │ │ 90│ │ │ │ │ │
94│ │ │ Identifier removed when all URLs │ │ │ 91│ │ │ Round-robin via: │ │ │
95│ │ │ tried. Re-added on next sync. │ │ │ 92│ │ │ - last_used_index per domain │ │ │
93│ │ │ - Caller passes tried_urls │ │ │
96│ │ └─────────────────────────────────────┘ │ │ 94│ │ └─────────────────────────────────────┘ │ │
97│ │ │ │ 95│ │ │ │
98│ └───────────────────────────────────────────────────────────────────────┘ │ 96│ └───────────────────────────────────────────────────────────────────────┘ │
99└──────────────────────────────────────────────────────────────────────────────┘ 97└──────────────────────────────────────────────────────────────────────────────┘
100``` 98```
101 99
100### Key Design Principle: Separation of Concerns
101
102The previous design conflated two concerns in `DomainThrottle`:
1031. **Rate limiting** (per-domain): How many requests can we make to a domain?
1042. **URL tracking** (per-identifier): Which URLs have we tried for this sync?
105
106The new design cleanly separates these:
107
108- **`DomainThrottle`**: Only handles rate limiting. Tracks in-flight requests and request timestamps per domain. Uses round-robin internally to distribute load across URLs.
109- **`sync_identifier`**: Owns its `tried_urls: HashSet<String>`. Passes this to the throttle when requesting a URL to try.
110
111This separation enables:
112- **Unit testing** of sync logic with a mock throttle
113- **Simpler state management** - throttle doesn't need cleanup when identifiers complete
114- **Clearer reasoning** about each component's responsibility
115
102### Flow Summary 116### Flow Summary
103 117
1041. **Event arrives** → added to state_events/pr_events + sync_queue with delay 1181. **Event arrives** → added to state_events/pr_events + sync_queue with delay
@@ -109,29 +123,21 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**,
1092. **Main sync loop** (every 1s): 1232. **Main sync loop** (every 1s):
110 - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`) 124 - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`)
111 - Spawns parallel tasks for each (marks `in_progress = true`) 125 - Spawns parallel tasks for each (marks `in_progress = true`)
112 - Each `sync_identifier()` task runs a while loop: 126 - Each `sync_identifier()` task:
113 - Recalculates URLs and OIDs fresh each iteration (catches new events/announcements) 127 - Creates fresh `tried_urls: HashSet<String>`
114 - For each domain, gets untried URLs (via `domain_throttle`) 128 - Loops calling `sync_identifier_step()` until complete
115 - Skips domains that are fully throttled 129 - Step returns `(Option<url_tried>, complete)` - clean testable interface
116 - Tries non-throttled domains first, then throttled if still need OIDs 130 - When task completes: apply backoff or remove from queue
117 - Respects per-domain concurrency (5 max) and rate limits (30/min)
118 - Processes satisfiable events
119 - Continues while: events remain AND OIDs missing AND untried URLs exist
120 - When task completes (regardless of outcome):
121 - If `next_attempt` is in the future (new event arrived): just clear `in_progress`
122 - Otherwise: increment `attempt_count`, set `next_attempt = now + backoff`, clear `in_progress`
123 - If no pending events remain: remove identifier from sync_queue
124 131
1253. **Domain throttle state management**: 1323. **Domain throttle**:
126 - Tracks `(in_progress, urls_tried)` per `(domain, identifier)` pair 133 - Pure rate limiting: tracks in_flight count and request timestamps per domain
127 - `in_progress` prevents parallel fetches to same domain for same identifier 134 - `get_next_url()` takes `available_urls` and `tried_urls`, returns next URL to try
128 - `urls_tried` tracks which URLs have been attempted 135 - Uses round-robin internally to distribute load
129 - When all URLs for a domain+identifier are tried, that entry is removed 136 - No per-identifier state needed
130 - Entry may be re-added on next sync attempt if new URLs available
131 137
132### New Data Structures 138## Data Structures
133 139
134#### SyncQueueEntry 140### SyncQueueEntry
135 141
136Tracks sync state for each identifier in the main sync queue: 142Tracks sync state for each identifier in the main sync queue:
137 143
@@ -140,7 +146,6 @@ Tracks sync state for each identifier in the main sync queue:
140#[derive(Debug, Clone)] 146#[derive(Debug, Clone)]
141pub struct SyncQueueEntry { 147pub struct SyncQueueEntry {
142 /// Don't attempt sync before this time 148 /// Don't attempt sync before this time
143 /// Set for: initial delay (3min user / 500ms sync), backoff after attempts
144 pub next_attempt: Instant, 149 pub next_attempt: Instant,
145 150
146 /// Number of sync attempts (for backoff calculation) 151 /// Number of sync attempts (for backoff calculation)
@@ -148,12 +153,10 @@ pub struct SyncQueueEntry {
148 pub attempt_count: u32, 153 pub attempt_count: u32,
149 154
150 /// Whether a sync is currently in progress for this identifier 155 /// Whether a sync is currently in progress for this identifier
151 /// Prevents concurrent sync runs for the same identifier
152 pub in_progress: bool, 156 pub in_progress: bool,
153} 157}
154 158
155impl SyncQueueEntry { 159impl SyncQueueEntry {
156 /// Create new entry with specified delay
157 pub fn new(delay: Duration) -> Self { 160 pub fn new(delay: Duration) -> Self {
158 Self { 161 Self {
159 next_attempt: Instant::now() + delay, 162 next_attempt: Instant::now() + delay,
@@ -162,97 +165,78 @@ impl SyncQueueEntry {
162 } 165 }
163 } 166 }
164 167
165 /// Calculate backoff duration: 20s, 40s, 80s, 120s (max 2min) 168 /// Calculate backoff: 20s, 40s, 80s, 120s (capped at 2min)
166 pub fn backoff(&self) -> Duration { 169 pub fn backoff(&self) -> Duration {
167 let base = Duration::from_secs(20); 170 let base = Duration::from_secs(20);
168 let multiplier = 2u32.saturating_pow(self.attempt_count.saturating_sub(1).min(3)); 171 let multiplier = 2u32.saturating_pow(self.attempt_count.saturating_sub(1).min(3));
169 let backoff = base * multiplier; 172 (base * multiplier).min(Duration::from_secs(120))
170 backoff.min(Duration::from_secs(120)) // Cap at 2 minutes
171 } 173 }
172 174
173 /// Check if this entry is ready to sync (not in progress, delay passed)
174 pub fn is_ready(&self) -> bool { 175 pub fn is_ready(&self) -> bool {
175 !self.in_progress && Instant::now() >= self.next_attempt 176 !self.in_progress && Instant::now() >= self.next_attempt
176 } 177 }
177 178
178 /// Called when new event arrives - resets attempt_count, may update next_attempt 179 /// Called when new event arrives - resets attempt_count
179 pub fn on_new_event(&mut self, delay: Duration) { 180 pub fn on_new_event(&mut self, delay: Duration) {
180 self.attempt_count = 0; 181 self.attempt_count = 0;
181 let new_attempt = Instant::now() + delay; 182 let new_attempt = Instant::now() + delay;
182 // Only bring forward if new time is sooner
183 if new_attempt < self.next_attempt { 183 if new_attempt < self.next_attempt {
184 self.next_attempt = new_attempt; 184 self.next_attempt = new_attempt;
185 } 185 }
186 } 186 }
187 187
188 /// Called when sync attempt completes 188 /// Called when sync attempt completes
189 /// If next_attempt is in the future (new event arrived during sync), just clear in_progress
190 /// Otherwise, apply backoff
191 pub fn on_sync_complete(&mut self) { 189 pub fn on_sync_complete(&mut self) {
192 self.in_progress = false; 190 self.in_progress = false;
193 let now = Instant::now(); 191 if self.next_attempt <= Instant::now() {
194 if self.next_attempt <= now {
195 // No new event arrived during sync - apply backoff
196 self.attempt_count += 1; 192 self.attempt_count += 1;
197 self.next_attempt = now + self.backoff(); 193 self.next_attempt = Instant::now() + self.backoff();
198 } 194 }
199 // else: new event arrived during sync, next_attempt already set, don't apply backoff
200 } 195 }
201} 196}
202``` 197```
203 198
204#### DomainThrottle 199### DomainThrottle (Rate Limiting Only)
205
206Tracks per-domain rate limiting and per-(domain, identifier) fetch state:
207 200
208```rust 201```rust
209/// Tracks domain-level rate limiting and per-identifier fetch state 202/// Domain-level rate limiting with round-robin URL selection.
210/// 203///
211/// Rate limits: 5 concurrent requests, 30 requests/minute per domain 204/// This struct ONLY handles rate limiting. It does not track which URLs
205/// have been tried - that's the caller's responsibility.
212/// 206///
213/// Per (domain, identifier): tracks which URLs have been tried and whether 207/// Rate limits: 5 concurrent requests, 30 requests/minute per domain
214/// a fetch is currently in progress. When all URLs are tried, the identifier
215/// is removed from that domain's tracking. It may be re-added on the next
216/// sync attempt if new URLs become available.
217pub struct DomainThrottle { 208pub struct DomainThrottle {
218 /// Per-domain, per-identifier state: (in_progress, urls_tried) 209 /// In-flight request count per domain
219 /// domain -> identifier -> (in_progress, urls_tried)
220 state: DashMap<String, DashMap<String, (bool, HashSet<String>)>>,
221
222 /// Count of currently in-flight requests per domain
223 in_flight: DashMap<String, u32>, 210 in_flight: DashMap<String, u32>,
224 211
225 /// Request timestamps per domain (sliding window for rate limiting) 212 /// Request timestamps per domain (sliding window)
226 request_times: DashMap<String, VecDeque<Instant>>, 213 request_times: DashMap<String, VecDeque<Instant>>,
227 214
228 /// Maximum concurrent requests per domain 215 /// Round-robin index per domain (for fair URL distribution)
229 max_concurrent: u32, 216 round_robin_index: DashMap<String, usize>,
230 217
231 /// Maximum requests per minute per domain 218 max_concurrent: u32,
232 max_per_minute: u32, 219 max_per_minute: u32,
233} 220}
234 221
235impl DomainThrottle { 222impl DomainThrottle {
236 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { 223 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self {
237 Self { 224 Self {
238 state: DashMap::new(),
239 in_flight: DashMap::new(), 225 in_flight: DashMap::new(),
240 request_times: DashMap::new(), 226 request_times: DashMap::new(),
227 round_robin_index: DashMap::new(),
241 max_concurrent, 228 max_concurrent,
242 max_per_minute, 229 max_per_minute,
243 } 230 }
244 } 231 }
245 232
246 /// Check if domain has capacity for another request 233 /// Check if domain has capacity for another request
247 /// (both concurrent limit and rate limit)
248 pub fn has_capacity(&self, domain: &str) -> bool { 234 pub fn has_capacity(&self, domain: &str) -> bool {
249 // Check concurrent limit 235 let in_flight = self.in_flight.get(domain).map_or(0, |v| *v);
250 let current_in_flight = self.in_flight.get(domain).map_or(0, |v| *v); 236 if in_flight >= self.max_concurrent {
251 if current_in_flight >= self.max_concurrent {
252 return false; 237 return false;
253 } 238 }
254 239
255 // Check rate limit (sliding window)
256 let now = Instant::now(); 240 let now = Instant::now();
257 let window = Duration::from_secs(60); 241 let window = Duration::from_secs(60);
258 self.request_times 242 self.request_times
@@ -263,90 +247,56 @@ impl DomainThrottle {
263 }) 247 })
264 } 248 }
265 249
266 /// Check if identifier is currently fetching from this domain 250 /// Get next URL to try from available URLs, excluding already-tried URLs.
267 pub fn is_in_progress(&self, domain: &str, identifier: &str) -> bool { 251 /// Uses round-robin to distribute load across URLs for a domain.
268 self.state 252 ///
269 .get(domain) 253 /// Returns None if:
270 .and_then(|domain_state| domain_state.get(identifier)) 254 /// - Domain is at capacity (rate limited)
271 .map_or(false, |entry| entry.0) 255 /// - All available URLs have been tried
272 } 256 pub fn get_next_url(
273 257 &self,
274 /// Get untried URLs for an identifier from a specific domain 258 domain: &str,
275 /// Returns URLs from `available_urls` that haven't been tried yet 259 available_urls: &[String],
276 pub fn get_untried_urls( 260 tried_urls: &HashSet<String>,
277 &self, 261 ) -> Option<String> {
278 domain: &str,
279 identifier: &str,
280 available_urls: &[String]
281 ) -> Vec<String> {
282 let tried = self.state
283 .get(domain)
284 .and_then(|domain_state| domain_state.get(identifier))
285 .map(|entry| entry.1.clone())
286 .unwrap_or_default();
287
288 available_urls
289 .iter()
290 .filter(|url| !tried.contains(*url))
291 .cloned()
292 .collect()
293 }
294
295 /// Mark a URL as being fetched (in progress)
296 /// Returns false if domain has no capacity or identifier already in progress for this domain
297 pub fn start_fetch(&self, domain: &str, identifier: &str, url: &str) -> bool {
298 // Check capacity
299 if !self.has_capacity(domain) { 262 if !self.has_capacity(domain) {
300 return false; 263 return None;
301 } 264 }
302 265
303 // Check if already in progress for this domain+identifier 266 // Filter to untried URLs
304 if self.is_in_progress(domain, identifier) { 267 let untried: Vec<_> = available_urls
305 return false; 268 .iter()
269 .filter(|url| !tried_urls.contains(*url))
270 .collect();
271
272 if untried.is_empty() {
273 return None;
306 } 274 }
307 275
308 // Increment in-flight counter 276 // Round-robin selection
309 *self.in_flight.entry(domain.to_string()).or_insert(0) += 1; 277 let mut index = self.round_robin_index.entry(domain.to_string()).or_insert(0);
278 let selected_index = *index % untried.len();
279 *index = (*index + 1) % untried.len();
310 280
311 // Record request time 281 Some(untried[selected_index].clone())
282 }
283
284 /// Record that a request is starting
285 pub fn start_request(&self, domain: &str) {
286 *self.in_flight.entry(domain.to_string()).or_insert(0) += 1;
312 self.request_times 287 self.request_times
313 .entry(domain.to_string()) 288 .entry(domain.to_string())
314 .or_default() 289 .or_default()
315 .push_back(Instant::now()); 290 .push_back(Instant::now());
316
317 // Mark in progress and add URL to tried set
318 self.state
319 .entry(domain.to_string())
320 .or_default()
321 .entry(identifier.to_string())
322 .and_modify(|(in_progress, tried)| {
323 *in_progress = true;
324 tried.insert(url.to_string());
325 })
326 .or_insert_with(|| {
327 let mut tried = HashSet::new();
328 tried.insert(url.to_string());
329 (true, tried)
330 });
331
332 true
333 } 291 }
334 292
335 /// Mark a fetch as complete 293 /// Record that a request completed
336 pub fn complete_fetch(&self, domain: &str, identifier: &str) { 294 pub fn complete_request(&self, domain: &str) {
337 // Decrement in-flight counter
338 if let Some(mut count) = self.in_flight.get_mut(domain) { 295 if let Some(mut count) = self.in_flight.get_mut(domain) {
339 *count = count.saturating_sub(1); 296 *count = count.saturating_sub(1);
340 } 297 }
341 298
342 // Clear in_progress flag 299 // Clean old timestamps
343 if let Some(domain_state) = self.state.get(domain) {
344 if let Some(mut entry) = domain_state.get_mut(identifier) {
345 entry.0 = false;
346 }
347 }
348
349 // Clean old request times
350 let now = Instant::now(); 300 let now = Instant::now();
351 let window = Duration::from_secs(60); 301 let window = Duration::from_secs(60);
352 if let Some(mut times) = self.request_times.get_mut(domain) { 302 if let Some(mut times) = self.request_times.get_mut(domain) {
@@ -356,135 +306,277 @@ impl DomainThrottle {
356 } 306 }
357 } 307 }
358 308
359 /// Check if all URLs have been tried for this domain+identifier 309 /// Get time until domain has capacity (for scheduling retries)
360 pub fn all_urls_tried( 310 pub fn time_until_capacity(&self, domain: &str) -> Option<Duration> {
361 &self, 311 // Check concurrent limit first
362 domain: &str, 312 let in_flight = self.in_flight.get(domain).map_or(0, |v| *v);
363 identifier: &str, 313 if in_flight >= self.max_concurrent {
364 available_urls: &[String] 314 // Can't predict when a request will complete
365 ) -> bool { 315 return Some(Duration::from_millis(100));
366 self.get_untried_urls(domain, identifier, available_urls).is_empty()
367 }
368
369 /// Remove identifier from a domain's tracking (called when all URLs tried)
370 pub fn remove_identifier_from_domain(&self, domain: &str, identifier: &str) {
371 if let Some(domain_state) = self.state.get(domain) {
372 domain_state.remove(identifier);
373 } 316 }
374 } 317
375 318 // Check rate limit
376 /// Remove identifier from all domains (called when sync complete or events expired) 319 let now = Instant::now();
377 pub fn remove_identifier(&self, identifier: &str) { 320 let window = Duration::from_secs(60);
378 for domain_entry in self.state.iter() { 321 if let Some(times) = self.request_times.get(domain) {
379 domain_entry.value().remove(identifier); 322 let recent_count = times.iter().filter(|t| now.duration_since(**t) < window).count();
323 if recent_count >= self.max_per_minute as usize {
324 // Find oldest request in window, wait until it expires
325 if let Some(oldest) = times.front() {
326 let age = now.duration_since(*oldest);
327 if age < window {
328 return Some(window - age);
329 }
330 }
331 }
380 } 332 }
381 } 333
382 334 None // Has capacity now
383 /// Get all domains where this identifier has untried URLs
384 pub fn domains_with_untried_urls(&self, identifier: &str) -> Vec<String> {
385 self.state
386 .iter()
387 .filter(|entry| entry.value().contains_key(identifier))
388 .map(|entry| entry.key().clone())
389 .collect()
390 } 335 }
391} 336}
392``` 337```
393 338
394### Modified Purgatory Structure 339### SyncContext Trait (For Testability)
395
396```rust
397pub struct Purgatory {
398 /// State events (kind 30618) indexed by repository identifier
399 state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>,
400 340
401 /// PR events (kind 1617/1618) indexed by event ID 341Abstract the external dependencies to enable unit testing:
402 pr_events: Arc<DashMap<String, PrPurgatoryEntry>>,
403 342
404 /// NEW: Sync queue - identifiers pending git data sync 343```rust
405 sync_queue: Arc<DashMap<String, SyncQueueEntry>>, 344/// Abstraction over external dependencies for sync operations.
345///
346/// This trait allows unit testing of sync logic by mocking:
347/// - Repository data fetching
348/// - OID existence checks
349/// - Git fetch operations
350/// - Event processing
351#[async_trait]
352pub trait SyncContext: Send + Sync {
353 /// Get repository data (announcements, clone URLs, etc.)
354 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData>;
406 355
407 /// NEW: Domain-level throttling and per-identifier fetch state 356 /// Get all OIDs needed for purgatory events with this identifier
408 domain_throttle: Arc<DomainThrottle>, 357 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String>;
409 358
410 git_data_path: PathBuf, 359 /// Check if an OID exists locally
360 fn oid_exists(&self, repo_path: &Path, oid: &str) -> bool;
361
362 /// Fetch OIDs from a remote server
363 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>;
364
365 /// Process events that can now be satisfied (save to DB, notify, remove from purgatory)
366 async fn process_satisfiable_events(&self, identifier: &str) -> Result<()>;
367
368 /// Check if there are still pending events for this identifier
369 fn has_pending_events(&self, identifier: &str) -> bool;
370
371 /// Find the best local repo to fetch into
372 fn find_target_repo(&self, db_repo_data: &RepositoryData) -> Option<PathBuf>;
373
374 /// Our domain (to exclude from clone URLs)
375 fn our_domain(&self) -> Option<&str>;
411} 376}
412``` 377```
413 378
414### Key Methods 379## Core Sync Logic
380
381### The Sync Step Function
415 382
416#### Adding Events to Purgatory 383This is the key abstraction that enables clean testing:
417 384
418```rust 385```rust
419impl Purgatory { 386/// Result of a single sync step
420 /// Add a state event to purgatory (user-submitted, 3min delay) 387#[derive(Debug, Clone, PartialEq)]
421 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { 388pub enum SyncStepResult {
422 // ... existing logic to add to state_events ... 389 /// Successfully tried a URL, may or may not have fetched OIDs
423 390 TriedUrl { url: String, oids_fetched: usize },
424 // Add to sync queue with 3 minute delay 391
425 self.enqueue_sync(&identifier, Duration::from_secs(180)); 392 /// All available URLs have been tried, need to wait for throttle
393 AllUrlsThrottled { wait_duration: Duration },
394
395 /// No more URLs to try (all exhausted)
396 NoMoreUrls,
397
398 /// All OIDs are now available, sync complete
399 Complete,
400
401 /// No pending events remain
402 NoPendingEvents,
403}
404
405/// Execute one step of the sync process.
406///
407/// This function is pure logic - all I/O goes through the SyncContext trait.
408/// This makes it trivially unit testable.
409pub async fn sync_identifier_step<C: SyncContext>(
410 ctx: &C,
411 identifier: &str,
412 tried_urls: &HashSet<String>,
413 throttle: &DomainThrottle,
414) -> Result<SyncStepResult> {
415 // 1. Check if we still have pending events
416 if !ctx.has_pending_events(identifier) {
417 return Ok(SyncStepResult::NoPendingEvents);
426 } 418 }
427 419
428 /// Add a PR event to purgatory (user-submitted, 3min delay) 420 // 2. Collect needed OIDs (fresh each step - may have changed)
429 pub fn add_pr(&self, event: Event, event_id: String, commit: String) { 421 let needed_oids = ctx.collect_needed_oids(identifier);
430 // ... existing logic to add to pr_events ... 422 if needed_oids.is_empty() {
431 423 // No OIDs needed - try to process events
432 // Extract identifier from event's `a` tag and enqueue sync 424 ctx.process_satisfiable_events(identifier).await?;
433 if let Some(identifier) = extract_identifier_from_pr(&event) { 425 return Ok(SyncStepResult::Complete);
434 self.enqueue_sync(&identifier, Duration::from_secs(180));
435 }
436 } 426 }
437 427
438 /// Trigger immediate sync for an identifier (called from negentropy sync) 428 // 3. Get repository data (fresh each step - announcements may have arrived)
439 /// Still applies 500ms debounce for batching burst arrivals 429 let db_repo_data = ctx.fetch_repository_data(identifier).await?;
440 pub fn trigger_immediate_sync(&self, identifier: &str) { 430
441 self.enqueue_sync(identifier, Duration::from_millis(500)); 431 // 4. Collect clone URLs, excluding our domain
432 let all_urls: Vec<String> = db_repo_data
433 .announcements
434 .iter()
435 .flat_map(|a| a.clone_urls.iter().cloned())
436 .filter(|url| ctx.our_domain().map_or(true, |d| !url.contains(d)))
437 .collect::<HashSet<_>>()
438 .into_iter()
439 .collect();
440
441 if all_urls.is_empty() {
442 return Ok(SyncStepResult::NoMoreUrls);
442 } 443 }
443 444
444 /// Internal: Add identifier to sync queue with specified delay 445 // 5. Group by domain and find an available URL
445 fn enqueue_sync(&self, identifier: &str, delay: Duration) { 446 let urls_by_domain: HashMap<String, Vec<String>> = all_urls
446 self.sync_queue 447 .iter()
447 .entry(identifier.to_string()) 448 .fold(HashMap::new(), |mut acc, url| {
448 .and_modify(|entry| { 449 acc.entry(extract_domain(url)).or_default().push(url.clone());
449 // New event arrived - reset backoff, potentially update timing 450 acc
450 entry.reset(delay); 451 });
451 }) 452
452 .or_insert_with(|| SyncQueueEntry::new(delay)); 453 // 6. Try to get a URL from any domain that has capacity
454 let mut min_wait: Option<Duration> = None;
455
456 for (domain, domain_urls) in &urls_by_domain {
457 if let Some(url) = throttle.get_next_url(domain, domain_urls, tried_urls) {
458 // Found a URL to try!
459 let target_repo = match ctx.find_target_repo(&db_repo_data) {
460 Some(path) => path,
461 None => return Ok(SyncStepResult::NoMoreUrls),
462 };
463
464 // Start the fetch
465 throttle.start_request(domain);
466 let oids_to_fetch: Vec<String> = needed_oids.iter().cloned().collect();
467 let fetch_result = ctx.fetch_oids(&target_repo, &url, &oids_to_fetch).await;
468 throttle.complete_request(domain);
469
470 let oids_fetched = match fetch_result {
471 Ok(fetched) => fetched.len(),
472 Err(e) => {
473 tracing::debug!(url = %url, error = %e, "Fetch failed");
474 0
475 }
476 };
477
478 // Try to process any events that can now be satisfied
479 if oids_fetched > 0 {
480 let _ = ctx.process_satisfiable_events(identifier).await;
481 }
482
483 return Ok(SyncStepResult::TriedUrl { url, oids_fetched });
484 } else {
485 // Domain throttled or all URLs tried
486 let untried_exist = domain_urls.iter().any(|u| !tried_urls.contains(u));
487 if untried_exist {
488 // URLs exist but domain is throttled
489 if let Some(wait) = throttle.time_until_capacity(domain) {
490 min_wait = Some(min_wait.map_or(wait, |m| m.min(wait)));
491 }
492 }
493 }
494 }
495
496 // Check if all URLs have been tried
497 let all_tried = all_urls.iter().all(|url| tried_urls.contains(url));
498 if all_tried {
499 return Ok(SyncStepResult::NoMoreUrls);
453 } 500 }
501
502 // Some URLs exist but all domains are throttled
503 Ok(SyncStepResult::AllUrlsThrottled {
504 wait_duration: min_wait.unwrap_or(Duration::from_millis(100)),
505 })
454} 506}
455``` 507```
456 508
457#### Extracting Identifier from PR Events 509### The Sync Identifier Loop
458 510
459```rust 511```rust
460/// Extract repository identifier from PR event's `a` tag 512/// Sync git data for an identifier.
461fn extract_identifier_from_pr(event: &Event) -> Option<String> { 513///
462 event.tags.iter().find_map(|tag| { 514/// Returns true if sync completed successfully (no more pending events),
463 let tag_vec = tag.clone().to_vec(); 515/// false if we exhausted all options but events remain.
464 if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { 516pub async fn sync_identifier<C: SyncContext>(
465 // Format: 30617:<owner>:<identifier> 517 ctx: &C,
466 let parts: Vec<&str> = tag_vec[1].split(':').collect(); 518 identifier: &str,
467 if parts.len() >= 3 { 519 throttle: &DomainThrottle,
468 return Some(parts[2].to_string()); 520) -> bool {
521 let mut tried_urls: HashSet<String> = HashSet::new();
522
523 loop {
524 match sync_identifier_step(ctx, identifier, &tried_urls, throttle).await {
525 Ok(SyncStepResult::TriedUrl { url, oids_fetched }) => {
526 tried_urls.insert(url.clone());
527 tracing::debug!(
528 identifier = %identifier,
529 url = %url,
530 oids_fetched = oids_fetched,
531 "Tried URL"
532 );
533 // Continue looping
534 }
535
536 Ok(SyncStepResult::AllUrlsThrottled { wait_duration }) => {
537 tracing::debug!(
538 identifier = %identifier,
539 wait_ms = wait_duration.as_millis(),
540 "All domains throttled, waiting"
541 );
542 tokio::time::sleep(wait_duration).await;
543 // Continue looping
544 }
545
546 Ok(SyncStepResult::NoMoreUrls) => {
547 tracing::debug!(identifier = %identifier, "No more URLs to try");
548 return false; // Events remain but no URLs left
549 }
550
551 Ok(SyncStepResult::Complete) => {
552 tracing::info!(identifier = %identifier, "Sync complete");
553 return true;
554 }
555
556 Ok(SyncStepResult::NoPendingEvents) => {
557 tracing::debug!(identifier = %identifier, "No pending events");
558 return true;
559 }
560
561 Err(e) => {
562 tracing::warn!(identifier = %identifier, error = %e, "Sync step error");
563 return false;
469 } 564 }
470 } 565 }
471 None 566 }
472 })
473} 567}
474``` 568```
475 569
476### Sync Loop 570### The Main Sync Loop
477
478A single background loop handles syncing. The `DomainThrottle` is just state tracking, not a separate processing queue.
479 571
480```rust 572```rust
481impl Purgatory { 573impl Purgatory {
482 /// Start the background sync loop
483 pub fn start_sync_loop( 574 pub fn start_sync_loop(
484 self: Arc<Self>, 575 self: Arc<Self>,
485 database: SharedDatabase, 576 database: SharedDatabase,
486 our_domain: Option<String>, 577 our_domain: Option<String>,
487 local_relay: Option<nostr_relay_builder::LocalRelay>, 578 local_relay: Option<nostr_relay_builder::LocalRelay>,
579 throttle: Arc<DomainThrottle>,
488 ) -> tokio::task::JoinHandle<()> { 580 ) -> tokio::task::JoinHandle<()> {
489 tokio::spawn(async move { 581 tokio::spawn(async move {
490 let mut interval = tokio::time::interval(Duration::from_secs(1)); 582 let mut interval = tokio::time::interval(Duration::from_secs(1));
@@ -492,60 +584,56 @@ impl Purgatory {
492 loop { 584 loop {
493 interval.tick().await; 585 interval.tick().await;
494 586
495 // Find ALL ready identifiers 587 // Find all ready identifiers
496 let ready_identifiers: Vec<String> = self.sync_queue 588 let ready: Vec<String> = self.sync_queue
497 .iter() 589 .iter()
498 .filter(|entry| entry.value().is_ready()) 590 .filter(|e| e.value().is_ready())
499 .map(|entry| entry.key().clone()) 591 .map(|e| e.key().clone())
500 .collect(); 592 .collect();
501 593
502 // Spawn sync tasks in parallel for each ready identifier 594 for identifier in ready {
503 for identifier in ready_identifiers { 595 // Check if events still exist
504 // Check if there are still events in purgatory for this identifier
505 if !self.has_pending_events(&identifier) { 596 if !self.has_pending_events(&identifier) {
506 self.sync_queue.remove(&identifier); 597 self.sync_queue.remove(&identifier);
507 self.domain_throttle.remove_identifier(&identifier);
508 continue; 598 continue;
509 } 599 }
510 600
511 // Mark as in progress (prevents re-spawning on next tick) 601 // Mark in progress
512 if let Some(mut entry) = self.sync_queue.get_mut(&identifier) { 602 if let Some(mut entry) = self.sync_queue.get_mut(&identifier) {
513 if entry.in_progress { 603 if entry.in_progress {
514 continue; // Already running 604 continue;
515 } 605 }
516 entry.in_progress = true; 606 entry.in_progress = true;
607 } else {
608 continue;
517 } 609 }
518 610
519 // Spawn task for this identifier 611 // Spawn sync task
520 let purgatory = self.clone(); 612 let purgatory = self.clone();
521 let db = database.clone(); 613 let db = database.clone();
522 let domain = our_domain.clone(); 614 let domain = our_domain.clone();
523 let relay = local_relay.clone(); 615 let relay = local_relay.clone();
616 let throttle = throttle.clone();
524 let id = identifier.clone(); 617 let id = identifier.clone();
525 618
526 tokio::spawn(async move { 619 tokio::spawn(async move {
527 purgatory.sync_identifier( 620 // Create the real SyncContext implementation
528 &id, 621 let ctx = RealSyncContext::new(
529 &db, 622 purgatory.clone(),
530 domain.as_deref(), 623 db,
531 relay.as_ref(), 624 domain,
532 ).await; 625 relay,
626 );
533 627
534 // Check if events remain 628 let complete = sync_identifier(&ctx, &id, &throttle).await;
535 if !purgatory.has_pending_events(&id) { 629
630 if complete || !purgatory.has_pending_events(&id) {
536 purgatory.sync_queue.remove(&id); 631 purgatory.sync_queue.remove(&id);
537 purgatory.domain_throttle.remove_identifier(&id); 632 tracing::info!(identifier = %id, "Removed from sync queue");
538 tracing::info!(identifier = %id, "Sync complete, removed from queues");
539 } else { 633 } else {
540 // Apply backoff (or not, if new event arrived during sync) 634 // Apply backoff
541 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { 635 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) {
542 entry.on_sync_complete(); 636 entry.on_sync_complete();
543 tracing::debug!(
544 identifier = %id,
545 attempt = entry.attempt_count,
546 next_attempt_secs = entry.next_attempt.duration_since(Instant::now()).as_secs(),
547 "Sync attempt complete, scheduled next attempt"
548 );
549 } 637 }
550 } 638 }
551 }); 639 });
@@ -553,746 +641,239 @@ impl Purgatory {
553 } 641 }
554 }) 642 })
555 } 643 }
556
557 /// Check if there are pending events for an identifier
558 fn has_pending_events(&self, identifier: &str) -> bool {
559 // Check state events
560 if self.state_events.get(identifier).map_or(false, |v| !v.is_empty()) {
561 return true;
562 }
563
564 // Check PR events (need to scan for matching identifier)
565 for entry in self.pr_events.iter() {
566 if let Some(ref event) = entry.value().event {
567 if extract_identifier_from_pr(event).as_deref() == Some(identifier) {
568 return true;
569 }
570 }
571 }
572
573 false
574 }
575} 644}
576``` 645```
577 646
578### Core Sync Logic 647## Testing Strategy
648
649### Unit Tests for Sync Logic
650
651The `SyncContext` trait enables pure unit tests without any I/O:
579 652
580```rust 653```rust
581impl Purgatory { 654#[cfg(test)]
582 /// Sync git data for all purgatory events with this identifier 655mod tests {
583 /// 656 use super::*;
584 /// Uses a while loop that: 657
585 /// 1. Recalculates URLs and OIDs fresh each iteration (catches new events/announcements) 658 /// Mock context for testing sync logic
586 /// 2. Gets untried URLs per domain from DomainThrottle 659 struct MockSyncContext {
587 /// 3. Skips domains that are fully throttled, tries available ones 660 pending_events: RefCell<bool>,
588 /// 4. Continues while: events remain AND OIDs missing AND untried URLs exist 661 needed_oids: RefCell<HashSet<String>>,
589 async fn sync_identifier( 662 available_urls: Vec<String>,
590 &self, 663 fetch_results: RefCell<HashMap<String, Vec<String>>>,
591 identifier: &str, 664 processed_count: RefCell<usize>,
592 database: &SharedDatabase,
593 our_domain: Option<&str>,
594 local_relay: Option<&nostr_relay_builder::LocalRelay>,
595 ) {
596 loop {
597 // 1. Check if any events remain (may have expired or been processed)
598 if !self.has_pending_events(identifier) {
599 return;
600 }
601
602 // 2. Collect all OIDs needed (fresh calculation each iteration)
603 let needed_oids = self.collect_needed_oids(identifier);
604
605 if needed_oids.is_empty() {
606 // No OIDs needed - try to process events and exit
607 self.try_process_events(identifier, database, our_domain, local_relay).await;
608 return;
609 }
610
611 // 3. Get repository data and clone URLs (fresh calculation each iteration)
612 let db_repo_data = match fetch_repository_data(database, identifier).await {
613 Ok(data) => data,
614 Err(e) => {
615 tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data");
616 return;
617 }
618 };
619
620 // 4. Collect clone URLs, excluding our domain
621 let all_clone_urls: Vec<String> = db_repo_data
622 .announcements
623 .iter()
624 .flat_map(|a| a.clone_urls.iter().cloned())
625 .filter(|url| our_domain.map_or(true, |d| !url.contains(d)))
626 .collect::<HashSet<_>>()
627 .into_iter()
628 .collect();
629
630 if all_clone_urls.is_empty() {
631 tracing::debug!(identifier = %identifier, "No external clone URLs available");
632 return;
633 }
634
635 // 5. Group URLs by domain
636 let urls_by_domain: HashMap<String, Vec<String>> = all_clone_urls
637 .iter()
638 .fold(HashMap::new(), |mut acc, url| {
639 let domain = extract_domain(url);
640 acc.entry(domain).or_default().push(url.clone());
641 acc
642 });
643
644 // 6. Find best local repo to fetch into
645 let target_repo = match self.find_target_repo(&db_repo_data, identifier) {
646 Some(path) => path,
647 None => {
648 tracing::debug!(identifier = %identifier, "No local repository found");
649 return;
650 }
651 };
652
653 // 7. Partition domains: available (have capacity) vs throttled
654 let (available_domains, throttled_domains): (Vec<_>, Vec<_>) = urls_by_domain
655 .keys()
656 .cloned()
657 .partition(|domain| self.domain_throttle.has_capacity(domain));
658
659 let mut remaining_oids: HashSet<String> = needed_oids.clone();
660 let mut any_fetch_started = false;
661
662 // 8. Try available domains first
663 for domain in &available_domains {
664 if remaining_oids.is_empty() {
665 break;
666 }
667
668 let domain_urls = urls_by_domain.get(domain).unwrap();
669 let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls);
670
671 for url in untried_urls {
672 if remaining_oids.is_empty() {
673 break;
674 }
675
676 // Skip if already in progress for this domain+identifier
677 if self.domain_throttle.is_in_progress(domain, identifier) {
678 break; // Wait for current fetch to complete
679 }
680
681 // Try to start fetch (checks capacity again, marks in_progress)
682 if !self.domain_throttle.start_fetch(domain, identifier, &url) {
683 break; // Domain at capacity
684 }
685
686 any_fetch_started = true;
687
688 // Fetch OIDs
689 let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect();
690 let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await;
691
692 // Mark fetch complete
693 self.domain_throttle.complete_fetch(domain, identifier);
694
695 match fetch_result {
696 Ok(fetched_oids) => {
697 if !fetched_oids.is_empty() {
698 let fetched_count = fetched_oids.len();
699 for oid in fetched_oids {
700 remaining_oids.remove(&oid);
701 }
702 tracing::info!(
703 identifier = %identifier,
704 url = %url,
705 fetched = fetched_count,
706 remaining = remaining_oids.len(),
707 "Fetched OIDs from server"
708 );
709 }
710 }
711 Err(e) => {
712 tracing::debug!(
713 identifier = %identifier,
714 url = %url,
715 error = %e,
716 "Failed to fetch from server"
717 );
718 }
719 }
720 }
721
722 // Clean up if all URLs tried for this domain
723 if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) {
724 self.domain_throttle.remove_identifier_from_domain(domain, identifier);
725 }
726 }
727
728 // 9. If still need OIDs, try throttled domains (they might have capacity now)
729 if !remaining_oids.is_empty() {
730 for domain in &throttled_domains {
731 if remaining_oids.is_empty() {
732 break;
733 }
734
735 // Re-check capacity (might have freed up)
736 if !self.domain_throttle.has_capacity(domain) {
737 continue;
738 }
739
740 let domain_urls = urls_by_domain.get(domain).unwrap();
741 let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls);
742
743 for url in untried_urls {
744 if remaining_oids.is_empty() {
745 break;
746 }
747
748 if self.domain_throttle.is_in_progress(domain, identifier) {
749 break;
750 }
751
752 if !self.domain_throttle.start_fetch(domain, identifier, &url) {
753 break;
754 }
755
756 any_fetch_started = true;
757
758 let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect();
759 let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await;
760
761 self.domain_throttle.complete_fetch(domain, identifier);
762
763 if let Ok(fetched_oids) = fetch_result {
764 for oid in fetched_oids {
765 remaining_oids.remove(&oid);
766 }
767 }
768 }
769
770 if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) {
771 self.domain_throttle.remove_identifier_from_domain(domain, identifier);
772 }
773 }
774 }
775
776 // 10. Try to process events that can now be satisfied
777 self.try_process_events(identifier, database, our_domain, local_relay).await;
778
779 // 11. Decide whether to continue looping
780 let still_have_events = self.has_pending_events(identifier);
781 if !still_have_events {
782 return;
783 }
784
785 let still_need_oids = !self.collect_needed_oids(identifier).is_empty();
786 if !still_need_oids {
787 // Events remain but no OIDs needed - loop to try processing again
788 continue;
789 }
790
791 // Check if there are any untried URLs left across all domains
792 let have_untried_urls = urls_by_domain.iter().any(|(domain, urls)| {
793 !self.domain_throttle.get_untried_urls(domain, identifier, urls).is_empty()
794 });
795
796 if !have_untried_urls {
797 // No more URLs to try - exit and let backoff handle retry
798 return;
799 }
800
801 // If no fetch was started this iteration (all throttled), yield briefly
802 if !any_fetch_started {
803 tokio::time::sleep(Duration::from_millis(100)).await;
804 }
805 }
806 } 665 }
807 666
808 /// Collect all OIDs needed for purgatory events with this identifier 667 #[async_trait]
809 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { 668 impl SyncContext for MockSyncContext {
810 let mut oids = HashSet::new(); 669 async fn fetch_repository_data(&self, _id: &str) -> Result<RepositoryData> {
811 670 // Return mock data with our available_urls
812 // Collect from state events 671 Ok(RepositoryData {
813 if let Some(entries) = self.state_events.get(identifier) { 672 announcements: vec![MockAnnouncement {
814 for entry in entries.iter() { 673 clone_urls: self.available_urls.clone(),
815 if let Ok(state) = RepositoryState::from_event(entry.event.clone()) { 674 ..Default::default()
816 for branch in &state.branches { 675 }],
817 if !branch.commit.starts_with("ref: ") { 676 ..Default::default()
818 oids.insert(branch.commit.clone()); 677 })
819 }
820 }
821 for tag in &state.tags {
822 if !tag.commit.starts_with("ref: ") {
823 oids.insert(tag.commit.clone());
824 }
825 }
826 }
827 }
828 } 678 }
829 679
830 // Collect from PR events 680 fn collect_needed_oids(&self, _id: &str) -> HashSet<String> {
831 for entry in self.pr_events.iter() { 681 self.needed_oids.borrow().clone()
832 if let Some(ref event) = entry.value().event {
833 if extract_identifier_from_pr(event).as_deref() == Some(identifier) {
834 if let Some(commit) = extract_commit_from_pr(event) {
835 oids.insert(commit);
836 }
837 }
838 }
839 } 682 }
840 683
841 oids 684 fn oid_exists(&self, _path: &Path, oid: &str) -> bool {
842 } 685 !self.needed_oids.borrow().contains(oid)
843
844 /// Try to process events that can now be satisfied
845 async fn try_process_events(
846 &self,
847 identifier: &str,
848 database: &SharedDatabase,
849 our_domain: Option<&str>,
850 local_relay: Option<&nostr_relay_builder::LocalRelay>,
851 ) {
852 if !self.has_pending_events(identifier) {
853 return;
854 } 686 }
855 687
856 let db_repo_data = match fetch_repository_data(database, identifier).await { 688 async fn fetch_oids(&self, _path: &Path, url: &str, _oids: &[String]) -> Result<Vec<String>> {
857 Ok(data) => data, 689 // Return pre-configured fetch result for this URL
858 Err(e) => { 690 Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default())
859 tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data for processing");
860 return;
861 }
862 };
863
864 // Process state events (oldest first)
865 if let Some(mut entries) = self.state_events.get_mut(identifier) {
866 entries.sort_by_key(|e| e.event.created_at);
867
868 let mut to_remove = Vec::new();
869
870 for entry in entries.iter() {
871 if let Ok(state) = RepositoryState::from_event(entry.event.clone()) {
872 if self.can_satisfy_state(&state, &db_repo_data) {
873 match self.process_state_event(&state, &db_repo_data, database, local_relay).await {
874 Ok(()) => {
875 to_remove.push(entry.event.id);
876 }
877 Err(e) => {
878 tracing::warn!(
879 event_id = %entry.event.id,
880 error = %e,
881 "Failed to process state event"
882 );
883 }
884 }
885 }
886 }
887 }
888
889 entries.retain(|e| !to_remove.contains(&e.event.id));
890 } 691 }
891 692
892 // Process PR events (oldest first) 693 async fn process_satisfiable_events(&self, _id: &str) -> Result<()> {
893 let mut pr_to_remove = Vec::new(); 694 *self.processed_count.borrow_mut() += 1;
894 let mut pr_entries: Vec<_> = self.pr_events 695 Ok(())
895 .iter() 696 }
896 .filter_map(|entry| {
897 entry.value().event.as_ref().and_then(|event| {
898 if extract_identifier_from_pr(event).as_deref() == Some(identifier) {
899 Some((entry.key().clone(), event.clone(), entry.value().commit.clone()))
900 } else {
901 None
902 }
903 })
904 })
905 .collect();
906 697
907 pr_entries.sort_by_key(|(_, event, _)| event.created_at); 698 fn has_pending_events(&self, _id: &str) -> bool {
699 *self.pending_events.borrow()
700 }
908 701
909 for (event_id, event, commit) in pr_entries { 702 fn find_target_repo(&self, _data: &RepositoryData) -> Option<PathBuf> {
910 if self.can_satisfy_pr(&commit, &db_repo_data) { 703 Some(PathBuf::from("/tmp/test-repo"))
911 match self.process_pr_event(&event, &commit, &db_repo_data, database, local_relay).await {
912 Ok(()) => {
913 pr_to_remove.push(event_id);
914 }
915 Err(e) => {
916 tracing::warn!(
917 event_id = %event.id,
918 error = %e,
919 "Failed to process PR event"
920 );
921 }
922 }
923 }
924 } 704 }
925 705
926 for event_id in pr_to_remove { 706 fn our_domain(&self) -> Option<&str> {
927 self.pr_events.remove(&event_id); 707 None
928 } 708 }
929 } 709 }
930 710
931 /// Check if a state event can be satisfied (all OIDs available locally) 711 #[tokio::test]
932 fn can_satisfy_state(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> bool { 712 async fn test_sync_step_no_pending_events() {
933 for announcement in &db_repo_data.announcements { 713 let ctx = MockSyncContext {
934 let repo_path = self.git_data_path.join(announcement.repo_path()); 714 pending_events: RefCell::new(false),
935 if !repo_path.exists() { 715 ..Default::default()
936 continue; 716 };
937 } 717 let throttle = DomainThrottle::new(5, 30);
938 718 let tried = HashSet::new();
939 let all_present = state.branches.iter().all(|b| { 719
940 b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit) 720 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap();
941 }) && state.tags.iter().all(|t| { 721 assert_eq!(result, SyncStepResult::NoPendingEvents);
942 oid_exists(&repo_path, &t.commit)
943 });
944
945 if all_present {
946 return true;
947 }
948 }
949 false
950 } 722 }
951 723
952 /// Check if a PR event can be satisfied (commit available locally) 724 #[tokio::test]
953 fn can_satisfy_pr(&self, commit: &str, db_repo_data: &RepositoryData) -> bool { 725 async fn test_sync_step_no_oids_needed() {
954 for announcement in &db_repo_data.announcements { 726 let ctx = MockSyncContext {
955 let repo_path = self.git_data_path.join(announcement.repo_path()); 727 pending_events: RefCell::new(true),
956 if repo_path.exists() && oid_exists(&repo_path, commit) { 728 needed_oids: RefCell::new(HashSet::new()), // Empty = no OIDs needed
957 return true; 729 ..Default::default()
958 } 730 };
959 } 731 let throttle = DomainThrottle::new(5, 30);
960 false 732 let tried = HashSet::new();
961 } 733
962} 734 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap();
963 735 assert_eq!(result, SyncStepResult::Complete);
964/// Extract commit hash from PR event's `c` tag 736 assert_eq!(*ctx.processed_count.borrow(), 1);
965fn extract_commit_from_pr(event: &Event) -> Option<String> {
966 event.tags.iter().find_map(|tag| {
967 let tag_vec = tag.clone().to_vec();
968 if tag_vec.len() >= 2 && tag_vec[0] == "c" {
969 Some(tag_vec[1].clone())
970 } else {
971 None
972 }
973 })
974}
975```
976
977### Fetching OIDs
978
979```rust
980/// Fetch specific OIDs from a remote git server
981///
982/// Returns the list of OIDs that were successfully fetched (now exist locally).
983/// Git fetch may partially succeed, so we check which OIDs are available after.
984async fn fetch_oids_from_server(
985 repo_path: &Path,
986 server_url: &str,
987 oids: &[String],
988) -> Result<Vec<String>> {
989 if oids.is_empty() {
990 return Ok(Vec::new());
991 } 737 }
992 738
993 let repo_path = repo_path.to_path_buf(); 739 #[tokio::test]
994 let server_url = server_url.to_string(); 740 async fn test_sync_step_tries_url() {
995 let oids = oids.to_vec(); 741 let mut needed = HashSet::new();
996 742 needed.insert("abc123".to_string());
997 tokio::task::spawn_blocking(move || {
998 // Build git fetch command with all OIDs
999 let mut args = vec!["fetch", "--depth=1", &server_url];
1000 args.extend(oids.iter().map(|s| s.as_str()));
1001 743
1002 tracing::debug!( 744 let mut fetch_results = HashMap::new();
1003 oids_count = oids.len(), 745 fetch_results.insert(
1004 server = %server_url, 746 "https://example.com/repo.git".to_string(),
1005 "Fetching OIDs" 747 vec!["abc123".to_string()],
1006 ); 748 );
1007 749
1008 let output = Command::new("git") 750 let ctx = MockSyncContext {
1009 .args(&args) 751 pending_events: RefCell::new(true),
1010 .current_dir(&repo_path) 752 needed_oids: RefCell::new(needed),
1011 .output(); 753 available_urls: vec!["https://example.com/repo.git".to_string()],
754 fetch_results: RefCell::new(fetch_results),
755 processed_count: RefCell::new(0),
756 };
757 let throttle = DomainThrottle::new(5, 30);
758 let tried = HashSet::new();
1012 759
1013 match output { 760 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap();
1014 Ok(result) => {
1015 // Check which OIDs we now have (regardless of command success)
1016 // git fetch may partially succeed
1017 let fetched: Vec<String> = oids
1018 .iter()
1019 .filter(|oid| oid_exists(&repo_path, oid))
1020 .cloned()
1021 .collect();
1022
1023 if !result.status.success() {
1024 let stderr = String::from_utf8_lossy(&result.stderr);
1025 tracing::debug!(
1026 server = %server_url,
1027 stderr = %stderr,
1028 fetched_count = fetched.len(),
1029 "git fetch returned non-zero but may have fetched some OIDs"
1030 );
1031 }
1032
1033 Ok(fetched)
1034 }
1035 Err(e) => {
1036 bail!("git fetch command error: {}", e)
1037 }
1038 }
1039 })
1040 .await?
1041}
1042
1043/// Extract domain from a URL for throttling
1044fn extract_domain(url: &str) -> String {
1045 url::Url::parse(url)
1046 .ok()
1047 .and_then(|u| u.host_str().map(|s| s.to_string()))
1048 .unwrap_or_else(|| url.to_string())
1049}
1050```
1051
1052### Helper Methods
1053
1054```rust
1055impl Purgatory {
1056 /// Find the best local repository to fetch OIDs into
1057 ///
1058 /// Prefers the repo with the most recent commit on its default branch,
1059 /// as it's most likely to have related git history.
1060 fn find_target_repo(&self, db_repo_data: &RepositoryData, identifier: &str) -> Option<PathBuf> {
1061 let mut best: Option<(Timestamp, PathBuf)> = None;
1062 761
1063 for announcement in &db_repo_data.announcements { 762 match result {
1064 let repo_path = self.git_data_path.join(announcement.repo_path()); 763 SyncStepResult::TriedUrl { url, oids_fetched } => {
1065 if !repo_path.exists() { 764 assert_eq!(url, "https://example.com/repo.git");
1066 continue; 765 assert_eq!(oids_fetched, 1);
1067 }
1068
1069 let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path)
1070 .unwrap_or(Timestamp::zero());
1071
1072 if best.as_ref().map_or(true, |(d, _)| commit_date > *d) {
1073 best = Some((commit_date, repo_path));
1074 } 766 }
767 _ => panic!("Expected TriedUrl, got {:?}", result),
1075 } 768 }
1076
1077 best.map(|(_, path)| path)
1078 } 769 }
1079 770
1080 /// Process a state event that can now be satisfied 771 #[tokio::test]
1081 /// 772 async fn test_sync_step_all_urls_tried() {
1082 /// Syncs OIDs to all owner repos, aligns refs, saves to DB, notifies subscribers. 773 let mut needed = HashSet::new();
1083 async fn process_state_event( 774 needed.insert("abc123".to_string());
1084 &self, 775
1085 state: &RepositoryState, 776 let ctx = MockSyncContext {
1086 db_repo_data: &RepositoryData, 777 pending_events: RefCell::new(true),
1087 database: &SharedDatabase, 778 needed_oids: RefCell::new(needed),
1088 local_relay: Option<&nostr_relay_builder::LocalRelay>, 779 available_urls: vec!["https://example.com/repo.git".to_string()],
1089 ) -> Result<()> { 780 fetch_results: RefCell::new(HashMap::new()),
1090 // Find source repo (one that has all OIDs) 781 processed_count: RefCell::new(0),
1091 let source_repo = self.find_repo_with_all_oids(state, db_repo_data) 782 };
1092 .ok_or_else(|| anyhow::anyhow!("No repo has all required OIDs"))?; 783 let throttle = DomainThrottle::new(5, 30);
1093 784
1094 // Sync to other owner repos and align refs 785 // Mark the only URL as tried
1095 let sync_result = sync_to_owner_repos(&source_repo, state, db_repo_data, &self.git_data_path); 786 let mut tried = HashSet::new();
787 tried.insert("https://example.com/repo.git".to_string());
1096 788
1097 tracing::info!( 789 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap();
1098 identifier = %state.identifier, 790 assert_eq!(result, SyncStepResult::NoMoreUrls);
1099 event_id = %state.event.id, 791 }
1100 repos_synced = sync_result.repos_synced, 792
1101 "Synced state from purgatory" 793 #[tokio::test]
1102 ); 794 async fn test_sync_step_domain_throttled() {
795 let mut needed = HashSet::new();
796 needed.insert("abc123".to_string());
797
798 let ctx = MockSyncContext {
799 pending_events: RefCell::new(true),
800 needed_oids: RefCell::new(needed),
801 available_urls: vec!["https://example.com/repo.git".to_string()],
802 fetch_results: RefCell::new(HashMap::new()),
803 processed_count: RefCell::new(0),
804 };
1103 805
1104 // Save to database 806 // Create throttle with 0 concurrent limit
1105 database.save_event(&state.event).await?; 807 let throttle = DomainThrottle::new(0, 30);
808 let tried = HashSet::new();
1106 809
1107 // Notify subscribers 810 let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap();
1108 if let Some(relay) = local_relay {
1109 relay.notify_event(state.event.clone());
1110 }
1111 811
1112 Ok(()) 812 match result {
813 SyncStepResult::AllUrlsThrottled { .. } => {}
814 _ => panic!("Expected AllUrlsThrottled, got {:?}", result),
815 }
1113 } 816 }
1114 817
1115 /// Process a PR event that can now be satisfied 818 #[tokio::test]
1116 async fn process_pr_event( 819 async fn test_full_sync_loop() {
1117 &self, 820 let mut needed = HashSet::new();
1118 event: &Event, 821 needed.insert("abc123".to_string());
1119 commit: &str, 822 needed.insert("def456".to_string());
1120 db_repo_data: &RepositoryData, 823
1121 database: &SharedDatabase, 824 let mut fetch_results = HashMap::new();
1122 local_relay: Option<&nostr_relay_builder::LocalRelay>, 825 // First URL returns one OID
1123 ) -> Result<()> { 826 fetch_results.insert(
1124 // Save to database 827 "https://server1.com/repo.git".to_string(),
1125 database.save_event(event).await?; 828 vec!["abc123".to_string()],
829 );
830 // Second URL returns the other
831 fetch_results.insert(
832 "https://server2.com/repo.git".to_string(),
833 vec!["def456".to_string()],
834 );
1126 835
1127 // Notify subscribers 836 let ctx = MockSyncContext {
1128 if let Some(relay) = local_relay { 837 pending_events: RefCell::new(true),
1129 relay.notify_event(event.clone()); 838 needed_oids: RefCell::new(needed.clone()),
1130 } 839 available_urls: vec![
840 "https://server1.com/repo.git".to_string(),
841 "https://server2.com/repo.git".to_string(),
842 ],
843 fetch_results: RefCell::new(fetch_results),
844 processed_count: RefCell::new(0),
845 };
1131 846
1132 tracing::info!( 847 // Simulate OIDs being removed as they're fetched
1133 event_id = %event.id, 848 // (In real code, collect_needed_oids would return fewer OIDs)
1134 commit = %commit,
1135 "Processed PR event from purgatory"
1136 );
1137 849
1138 Ok(()) 850 let throttle = DomainThrottle::new(5, 30);
1139 } 851 let complete = sync_identifier(&ctx, "test", &throttle).await;
1140 852
1141 /// Find a repository that has all OIDs required by a state event 853 // Should have tried both URLs
1142 fn find_repo_with_all_oids(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> Option<PathBuf> { 854 assert!(*ctx.processed_count.borrow() >= 1);
1143 for announcement in &db_repo_data.announcements {
1144 let repo_path = self.git_data_path.join(announcement.repo_path());
1145 if !repo_path.exists() {
1146 continue;
1147 }
1148
1149 let all_present = state.branches.iter().all(|b| {
1150 b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit)
1151 }) && state.tags.iter().all(|t| {
1152 oid_exists(&repo_path, &t.commit)
1153 });
1154
1155 if all_present {
1156 return Some(repo_path);
1157 }
1158 }
1159 None
1160 } 855 }
1161} 856}
1162``` 857```
1163```
1164
1165## Integration Points
1166
1167### 1. Negentropy Sync (src/purgatory/mod.rs:105-107)
1168
1169When events are added via negentropy sync, call `trigger_immediate_sync`:
1170
1171```rust
1172// In negentropy sync handler
1173purgatory.add_state(event, identifier.clone(), author);
1174purgatory.trigger_immediate_sync(&identifier); // NEW: triggers 500ms debounced sync
1175```
1176
1177### 2. Relay Startup
1178
1179Start the sync loop when the relay starts:
1180
1181```rust
1182// In main.rs or server setup
1183let domain_throttle = Arc::new(DomainThrottle::new(5, 30)); // 5 concurrent, 30/min
1184let purgatory = Arc::new(Purgatory::new(git_data_path, domain_throttle));
1185
1186let sync_handle = purgatory.clone().start_sync_loop(
1187 database.clone(),
1188 Some(domain.clone()),
1189 Some(local_relay.clone()),
1190);
1191```
1192
1193### 3. Shutdown
1194
1195The sync loop will naturally stop when the purgatory is dropped. No special shutdown handling needed since all state is in-memory.
1196
1197## Testing Strategy
1198
1199### Unit Tests
1200
12011. **SyncQueueEntry**
1202 - Verify backoff calculation: 20s → 40s → 80s → 120s → 120s
1203 - Verify `on_new_event()` resets attempt_count and updates next_attempt if sooner
1204 - Verify `on_sync_complete()` applies backoff only if next_attempt is in the past
1205 - Verify `is_ready()` respects both `next_attempt` and `in_progress`
1206
12072. **DomainThrottle**
1208 - Verify concurrent limit: 6th request to same domain blocked
1209 - Verify rate limit: 31st request in a minute blocked
1210 - Verify `has_capacity()` checks both limits
1211 - Verify `get_untried_urls()` returns only URLs not in urls_tried
1212 - Verify `start_fetch()` fails if already in_progress for domain+identifier
1213 - Verify `start_fetch()` adds URL to urls_tried
1214 - Verify `complete_fetch()` decrements in_flight and clears in_progress
1215 - Verify `all_urls_tried()` correctly identifies when done
1216 - Verify `remove_identifier_from_domain()` cleans up state
1217 - Verify `remove_identifier()` removes from all domains
1218
12193. **OID collection**
1220 - Verify OIDs extracted from state events correctly
1221 - Verify OIDs extracted from PR events correctly
1222 - Verify deduplication works across state and PR events
1223
12244. **Identifier extraction**
1225 - Verify `extract_identifier_from_pr()` handles various `a` tag formats
1226 - Verify `extract_commit_from_pr()` extracts `c` tag correctly
1227 858
1228### Integration Tests 859### Integration Tests
1229 860
12301. **Sync against own implementation** 8611. **Sync against own implementation**: Two ngit-grasp instances syncing
1231 - Start two ngit-grasp instances 8622. **Burst handling**: 10 events in 100ms, verify debounce
1232 - Push to one, verify other can sync via purgatory 8633. **Backoff behavior**: Unreachable URLs, verify timing
1233 - Verify partial OID availability handled correctly (some OIDs fetched, others missing) 8644. **Rate limiting**: Verify 30 req/min and 5 concurrent limits
1234 8655. **Parallel identifiers**: 5 identifiers sync in parallel
12352. **Burst handling**
1236 - Submit 10 events for same identifier within 100ms
1237 - Verify debounce: sync doesn't start until 500ms after last event
1238 - Verify only one sync operation runs (not 10)
1239
12403. **Backoff behavior**
1241 - Configure with unreachable clone URLs
1242 - Verify backoff timing: 20s, 40s, 80s, 120s, then stays at 120s
1243 - Verify new event arriving resets attempt_count to 0
1244 - Verify new event during sync prevents backoff (next_attempt already in future)
1245
12464. **Rate limiting**
1247 - Configure with single domain having multiple URLs
1248 - Trigger many sync operations
1249 - Verify only 30 requests made in first minute
1250 - Verify only 5 concurrent requests per domain
1251
12525. **Concurrent limit per domain+identifier**
1253 - Start fetch for domain+identifier
1254 - Verify second fetch attempt for same domain+identifier blocked
1255 - Verify fetch for different identifier on same domain allowed (up to 5)
1256
12576. **Parallel identifier processing**
1258 - Add events for 5 different identifiers
1259 - Verify all 5 sync tasks start in parallel (not serial)
1260 - Verify `in_progress` flag prevents duplicate tasks for same identifier
1261
12627. **Dynamic URL/OID recalculation**
1263 - Start sync for identifier
1264 - While sync is running, add new announcement with additional clone URL
1265 - Verify sync picks up new URL in next while loop iteration
1266 - Similarly for new events adding new OIDs
1267
12688. **urls_tried cleanup**
1269 - Sync identifier, exhaust all URLs for a domain
1270 - Verify identifier removed from that domain's state
1271 - Add new announcement with new URL for same domain
1272 - Verify new URL is tried on next sync attempt
1273
12749. **Mixed state and PR events**
1275 - Add state event and PR event for same identifier
1276 - Verify both OID sets collected
1277 - Verify both events processed when OIDs arrive
1278
127910. **Available domains first**
1280 - Have 10 URLs: 8 from available domains, 2 from throttled domain
1281 - Verify available domains tried first
1282 - Verify throttled domain only contacted if available didn't satisfy all OIDs
1283 866
1284## Migration Path 867## Migration Path
1285 868
12861. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle) 8691. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle, SyncContext trait)
12872. **Phase 2**: Implement sync loop alongside existing `start_state_sync` 8702. **Phase 2**: Implement `sync_identifier_step` with unit tests
12883. **Phase 3**: Migrate state events to use new sync loop 8713. **Phase 3**: Implement main sync loop alongside existing `start_state_sync`
12894. **Phase 4**: Add PR event syncing 8724. **Phase 4**: Add PR event syncing
12905. **Phase 5**: Remove old `start_state_sync` code 8735. **Phase 5**: Remove old `start_state_sync` code
1291 874
1292## Configuration 875## Configuration
1293 876
1294New configuration options:
1295
1296| Option | CLI Flag | Environment Variable | Default | 877| Option | CLI Flag | Environment Variable | Default |
1297|--------|----------|---------------------|---------| 878|--------|----------|---------------------|---------|
1298| Sync loop interval | `--sync-loop-interval-ms` | `NGIT_SYNC_LOOP_INTERVAL_MS` | `1000` | 879| Sync loop interval | `--sync-loop-interval-ms` | `NGIT_SYNC_LOOP_INTERVAL_MS` | `1000` |
@@ -1305,24 +886,14 @@ New configuration options:
1305 886
1306### Metrics 887### Metrics
1307 888
1308- `purgatory_sync_queue_size` - Number of identifiers pending sync 889- `purgatory_sync_queue_size` - Identifiers pending sync
1309- `purgatory_sync_attempts_total` - Counter of sync attempts per identifier 890- `purgatory_sync_attempts_total` - Sync attempts per identifier
1310- `purgatory_sync_oids_fetched_total` - Counter of OIDs successfully fetched 891- `purgatory_sync_oids_fetched_total` - OIDs successfully fetched
1311- `purgatory_domain_in_flight` - Gauge of in-flight requests per domain 892- `purgatory_domain_in_flight` - In-flight requests per domain
1312- `purgatory_domain_requests_total` - Counter of requests per domain 893- `purgatory_domain_requests_total` - Total requests per domain
1313- `purgatory_sync_backoff_seconds` - Histogram of backoff durations applied
1314 894
1315### Logging 895### Logging
1316 896
1317Key log points:
1318- `INFO`: Successful sync completion, OIDs fetched 897- `INFO`: Successful sync completion, OIDs fetched
1319- `DEBUG`: Domain capacity checks, backoff applied, urls_tried state 898- `DEBUG`: URL attempts, throttle decisions, backoff applied
1320- `WARN`: Fetch failures, processing errors 899- `WARN`: Fetch failures, processing errors
1321
1322## Open Questions
1323
13241. **PR placeholder handling**: Current code has `add_pr_placeholder()` for git-data-first scenario. How should this interact with the new sync system? (Probably: placeholders don't need syncing since git data already exists)
1325
13262. **Memory bounds**: Should we limit sync queue size? What happens if thousands of identifiers are pending?
1327
13283. **Persistence**: Currently all purgatory state is in-memory. Should sync queue state survive restarts? (Probably no - events will be re-synced via negentropy on restart)