diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-06 14:27:59 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-06 14:27:59 +0000 |
| commit | 71914d92a1bc296b3a585b60989cf0f1bf70dd4b (patch) | |
| tree | 693eb746b25a2c4547297efcf2102ba4dba2e171 /docs/explanation/purgatory-sync-redesign.md | |
| parent | 96203000f81a46976834971dd26e1a79465e6303 (diff) | |
docs: purgatory git sync design
Diffstat (limited to 'docs/explanation/purgatory-sync-redesign.md')
| -rw-r--r-- | docs/explanation/purgatory-sync-redesign.md | 1328 |
1 files changed, 1328 insertions, 0 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md new file mode 100644 index 0000000..ec02e75 --- /dev/null +++ b/docs/explanation/purgatory-sync-redesign.md | |||
| @@ -0,0 +1,1328 @@ | |||
| 1 | # Purgatory Sync Redesign | ||
| 2 | |||
| 3 | ## Status | ||
| 4 | |||
| 5 | **Proposed** - January 2026 | ||
| 6 | |||
| 7 | ## Context | ||
| 8 | |||
| 9 | The current purgatory sync implementation (`start_state_sync` at `src/purgatory/mod.rs:510`) has several limitations: | ||
| 10 | |||
| 11 | 1. **Per-event syncing**: Each state event triggers its own independent sync operation | ||
| 12 | 2. **No PR event syncing**: PR events enter purgatory but don't trigger git data fetching | ||
| 13 | 3. **No batching**: Multiple events for the same repository cause redundant fetch requests | ||
| 14 | 4. **No rate limiting**: Can overwhelm remote git servers or get rate-limited | ||
| 15 | 5. **No coordination**: Multiple concurrent syncs may fetch the same OIDs | ||
| 16 | |||
| 17 | When syncing a new repository, we often receive multiple state and PR events in a burst. The current approach creates unnecessary load on remote servers and doesn't handle this common case efficiently. | ||
| 18 | |||
| 19 | ## Decision | ||
| 20 | |||
| 21 | Redesign purgatory sync to be **identifier-based** rather than **event-based**, with: | ||
| 22 | |||
| 23 | 1. A background sync loop that processes identifiers, not individual events | ||
| 24 | 2. Batched OID fetching across all purgatory events for an identifier | ||
| 25 | 3. Domain-based throttling (30 requests/minute per domain) | ||
| 26 | 4. Exponential backoff per identifier (20s → 2m, then 2m intervals) | ||
| 27 | 5. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default) | ||
| 28 | |||
| 29 | ## Architecture | ||
| 30 | |||
| 31 | ### Overview | ||
| 32 | |||
| 33 | ``` | ||
| 34 | ┌──────────────────────────────────────────────────────────────────────────────┐ | ||
| 35 | │ Purgatory │ | ||
| 36 | │ │ | ||
| 37 | │ ┌─────────────────┐ ┌─────────────────┐ │ | ||
| 38 | │ │ State Events │ │ PR Events │ │ | ||
| 39 | │ │ (by identifier)│ │ (by event_id) │ │ | ||
| 40 | │ └────────┬────────┘ └────────┬────────┘ │ | ||
| 41 | │ │ │ │ | ||
| 42 | │ └──────────┬─────────┘ │ | ||
| 43 | │ │ add_state() / add_pr() / trigger_immediate_sync() │ | ||
| 44 | │ ▼ │ | ||
| 45 | │ ┌──────────────────────────┐ │ | ||
| 46 | │ │ Sync Queue │ │ | ||
| 47 | │ │ DashMap<id, Entry> │ │ | ||
| 48 | │ │ │ │ | ||
| 49 | │ │ Entry { │ │ | ||
| 50 | │ │ next_attempt, │ ← delay/backoff timer │ | ||
| 51 | │ │ attempt_count, │ ← for backoff calculation │ | ||
| 52 | │ │ in_progress, │ ← prevents concurrent runs │ | ||
| 53 | │ │ } │ │ | ||
| 54 | │ └────────────┬─────────────┘ │ | ||
| 55 | │ │ │ | ||
| 56 | │ ┌─────────────────────┼─────────────────────────────────────────────────┐ │ | ||
| 57 | │ │ ▼ │ │ | ||
| 58 | │ │ ┌─────────────────────┐ │ │ | ||
| 59 | │ │ │ Main Sync Loop │ (every 1s) │ │ | ||
| 60 | │ │ │ │ │ │ | ||
| 61 | │ │ │ 1. Find ALL ready │ │ │ | ||
| 62 | │ │ │ identifiers │ │ │ | ||
| 63 | │ │ │ 2. Spawn parallel │ │ │ | ||
| 64 | │ │ │ tasks for each │───────┐ │ │ | ||
| 65 | │ │ │ 3. Apply backoff │ │ (parallel tasks) │ │ | ||
| 66 | │ │ │ when done │ │ │ │ | ||
| 67 | │ │ └─────────────────────┘ │ │ │ | ||
| 68 | │ │ ▼ │ │ | ||
| 69 | │ │ ┌─────────────────────────────────────┐ │ │ | ||
| 70 | │ │ │ sync_identifier() │ │ │ | ||
| 71 | │ │ │ │ │ │ | ||
| 72 | │ │ │ while (events && oids && urls): │ │ │ | ||
| 73 | │ │ │ 1. Recalc URLs/OIDs (fresh) │ │ │ | ||
| 74 | │ │ │ 2. Get untried URLs per domain │ │ │ | ||
| 75 | │ │ │ 3. Skip throttled domains │ │ │ | ||
| 76 | │ │ │ 4. Try available URLs │ │ │ | ||
| 77 | │ │ │ (respecting domain limits) │ │ │ | ||
| 78 | │ │ │ 5. Process satisfiable events │ │ │ | ||
| 79 | │ │ │ 6. Loop catches new events/URLs │ │ │ | ||
| 80 | │ │ └─────────────────────────────────────┘ │ │ | ||
| 81 | │ │ │ │ │ | ||
| 82 | │ │ ▼ │ │ | ||
| 83 | │ │ ┌─────────────────────────────────────┐ │ │ | ||
| 84 | │ │ │ Domain Throttle │ │ │ | ||
| 85 | │ │ │ │ │ │ | ||
| 86 | │ │ │ Per-domain state: │ │ │ | ||
| 87 | │ │ │ - 5 concurrent requests max │ │ │ | ||
| 88 | │ │ │ - 30 requests/min sliding window │ │ │ | ||
| 89 | │ │ │ │ │ │ | ||
| 90 | │ │ │ Per (domain, identifier) state: │ │ │ | ||
| 91 | │ │ │ - in_progress: bool │ │ │ | ||
| 92 | │ │ │ - urls_tried: HashSet<String> │ │ │ | ||
| 93 | │ │ │ │ │ │ | ||
| 94 | │ │ │ Identifier removed when all URLs │ │ │ | ||
| 95 | │ │ │ tried. Re-added on next sync. │ │ │ | ||
| 96 | │ │ └─────────────────────────────────────┘ │ │ | ||
| 97 | │ │ │ │ | ||
| 98 | │ └───────────────────────────────────────────────────────────────────────┘ │ | ||
| 99 | └──────────────────────────────────────────────────────────────────────────────┘ | ||
| 100 | ``` | ||
| 101 | |||
| 102 | ### Flow Summary | ||
| 103 | |||
| 104 | 1. **Event arrives** → added to state_events/pr_events + sync_queue with delay | ||
| 105 | - User-submitted: 3 minute delay (expect git push to follow) | ||
| 106 | - Sync-triggered: 500ms delay (batch burst arrivals) | ||
| 107 | - `enqueue_sync()` resets `attempt_count` to 0 and updates `next_attempt` if needed | ||
| 108 | |||
| 109 | 2. **Main sync loop** (every 1s): | ||
| 110 | - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`) | ||
| 111 | - Spawns parallel tasks for each (marks `in_progress = true`) | ||
| 112 | - Each `sync_identifier()` task runs a while loop: | ||
| 113 | - Recalculates URLs and OIDs fresh each iteration (catches new events/announcements) | ||
| 114 | - For each domain, gets untried URLs (via `domain_throttle`) | ||
| 115 | - Skips domains that are fully throttled | ||
| 116 | - Tries non-throttled domains first, then throttled if still need OIDs | ||
| 117 | - Respects per-domain concurrency (5 max) and rate limits (30/min) | ||
| 118 | - Processes satisfiable events | ||
| 119 | - Continues while: events remain AND OIDs missing AND untried URLs exist | ||
| 120 | - When task completes (regardless of outcome): | ||
| 121 | - If `next_attempt` is in the future (new event arrived): just clear `in_progress` | ||
| 122 | - Otherwise: increment `attempt_count`, set `next_attempt = now + backoff`, clear `in_progress` | ||
| 123 | - If no pending events remain: remove identifier from sync_queue | ||
| 124 | |||
| 125 | 3. **Domain throttle state management**: | ||
| 126 | - Tracks `(in_progress, urls_tried)` per `(domain, identifier)` pair | ||
| 127 | - `in_progress` prevents parallel fetches to same domain for same identifier | ||
| 128 | - `urls_tried` tracks which URLs have been attempted | ||
| 129 | - When all URLs for a domain+identifier are tried, that entry is removed | ||
| 130 | - Entry may be re-added on next sync attempt if new URLs available | ||
| 131 | |||
| 132 | ### New Data Structures | ||
| 133 | |||
| 134 | #### SyncQueueEntry | ||
| 135 | |||
| 136 | Tracks sync state for each identifier in the main sync queue: | ||
| 137 | |||
| 138 | ```rust | ||
| 139 | /// Entry in the sync queue tracking when/how to sync an identifier | ||
| 140 | #[derive(Debug, Clone)] | ||
| 141 | pub struct SyncQueueEntry { | ||
| 142 | /// Don't attempt sync before this time | ||
| 143 | /// Set for: initial delay (3min user / 500ms sync), backoff after attempts | ||
| 144 | pub next_attempt: Instant, | ||
| 145 | |||
| 146 | /// Number of sync attempts (for backoff calculation) | ||
| 147 | /// Reset to 0 when new event arrives for this identifier | ||
| 148 | pub attempt_count: u32, | ||
| 149 | |||
| 150 | /// Whether a sync is currently in progress for this identifier | ||
| 151 | /// Prevents concurrent sync runs for the same identifier | ||
| 152 | pub in_progress: bool, | ||
| 153 | } | ||
| 154 | |||
| 155 | impl SyncQueueEntry { | ||
| 156 | /// Create new entry with specified delay | ||
| 157 | pub fn new(delay: Duration) -> Self { | ||
| 158 | Self { | ||
| 159 | next_attempt: Instant::now() + delay, | ||
| 160 | attempt_count: 0, | ||
| 161 | in_progress: false, | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | /// Calculate backoff duration: 20s, 40s, 80s, 120s (max 2min) | ||
| 166 | pub fn backoff(&self) -> Duration { | ||
| 167 | let base = Duration::from_secs(20); | ||
| 168 | let multiplier = 2u32.saturating_pow(self.attempt_count.saturating_sub(1).min(3)); | ||
| 169 | let backoff = base * multiplier; | ||
| 170 | backoff.min(Duration::from_secs(120)) // Cap at 2 minutes | ||
| 171 | } | ||
| 172 | |||
| 173 | /// Check if this entry is ready to sync (not in progress, delay passed) | ||
| 174 | pub fn is_ready(&self) -> bool { | ||
| 175 | !self.in_progress && Instant::now() >= self.next_attempt | ||
| 176 | } | ||
| 177 | |||
| 178 | /// Called when new event arrives - resets attempt_count, may update next_attempt | ||
| 179 | pub fn on_new_event(&mut self, delay: Duration) { | ||
| 180 | self.attempt_count = 0; | ||
| 181 | let new_attempt = Instant::now() + delay; | ||
| 182 | // Only bring forward if new time is sooner | ||
| 183 | if new_attempt < self.next_attempt { | ||
| 184 | self.next_attempt = new_attempt; | ||
| 185 | } | ||
| 186 | } | ||
| 187 | |||
| 188 | /// Called when sync attempt completes | ||
| 189 | /// If next_attempt is in the future (new event arrived during sync), just clear in_progress | ||
| 190 | /// Otherwise, apply backoff | ||
| 191 | pub fn on_sync_complete(&mut self) { | ||
| 192 | self.in_progress = false; | ||
| 193 | let now = Instant::now(); | ||
| 194 | if self.next_attempt <= now { | ||
| 195 | // No new event arrived during sync - apply backoff | ||
| 196 | self.attempt_count += 1; | ||
| 197 | self.next_attempt = now + self.backoff(); | ||
| 198 | } | ||
| 199 | // else: new event arrived during sync, next_attempt already set, don't apply backoff | ||
| 200 | } | ||
| 201 | } | ||
| 202 | ``` | ||
| 203 | |||
| 204 | #### DomainThrottle | ||
| 205 | |||
| 206 | Tracks per-domain rate limiting and per-(domain, identifier) fetch state: | ||
| 207 | |||
| 208 | ```rust | ||
| 209 | /// Tracks domain-level rate limiting and per-identifier fetch state | ||
| 210 | /// | ||
| 211 | /// Rate limits: 5 concurrent requests, 30 requests/minute per domain | ||
| 212 | /// | ||
| 213 | /// Per (domain, identifier): tracks which URLs have been tried and whether | ||
| 214 | /// a fetch is currently in progress. When all URLs are tried, the identifier | ||
| 215 | /// is removed from that domain's tracking. It may be re-added on the next | ||
| 216 | /// sync attempt if new URLs become available. | ||
| 217 | pub struct DomainThrottle { | ||
| 218 | /// Per-domain, per-identifier state: (in_progress, urls_tried) | ||
| 219 | /// domain -> identifier -> (in_progress, urls_tried) | ||
| 220 | state: DashMap<String, DashMap<String, (bool, HashSet<String>)>>, | ||
| 221 | |||
| 222 | /// Count of currently in-flight requests per domain | ||
| 223 | in_flight: DashMap<String, u32>, | ||
| 224 | |||
| 225 | /// Request timestamps per domain (sliding window for rate limiting) | ||
| 226 | request_times: DashMap<String, VecDeque<Instant>>, | ||
| 227 | |||
| 228 | /// Maximum concurrent requests per domain | ||
| 229 | max_concurrent: u32, | ||
| 230 | |||
| 231 | /// Maximum requests per minute per domain | ||
| 232 | max_per_minute: u32, | ||
| 233 | } | ||
| 234 | |||
| 235 | impl DomainThrottle { | ||
| 236 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { | ||
| 237 | Self { | ||
| 238 | state: DashMap::new(), | ||
| 239 | in_flight: DashMap::new(), | ||
| 240 | request_times: DashMap::new(), | ||
| 241 | max_concurrent, | ||
| 242 | max_per_minute, | ||
| 243 | } | ||
| 244 | } | ||
| 245 | |||
| 246 | /// Check if domain has capacity for another request | ||
| 247 | /// (both concurrent limit and rate limit) | ||
| 248 | pub fn has_capacity(&self, domain: &str) -> bool { | ||
| 249 | // Check concurrent limit | ||
| 250 | let current_in_flight = self.in_flight.get(domain).map_or(0, |v| *v); | ||
| 251 | if current_in_flight >= self.max_concurrent { | ||
| 252 | return false; | ||
| 253 | } | ||
| 254 | |||
| 255 | // Check rate limit (sliding window) | ||
| 256 | let now = Instant::now(); | ||
| 257 | let window = Duration::from_secs(60); | ||
| 258 | self.request_times | ||
| 259 | .get(domain) | ||
| 260 | .map_or(true, |times| { | ||
| 261 | times.iter().filter(|t| now.duration_since(**t) < window).count() | ||
| 262 | < self.max_per_minute as usize | ||
| 263 | }) | ||
| 264 | } | ||
| 265 | |||
| 266 | /// Check if identifier is currently fetching from this domain | ||
| 267 | pub fn is_in_progress(&self, domain: &str, identifier: &str) -> bool { | ||
| 268 | self.state | ||
| 269 | .get(domain) | ||
| 270 | .and_then(|domain_state| domain_state.get(identifier)) | ||
| 271 | .map_or(false, |entry| entry.0) | ||
| 272 | } | ||
| 273 | |||
| 274 | /// Get untried URLs for an identifier from a specific domain | ||
| 275 | /// Returns URLs from `available_urls` that haven't been tried yet | ||
| 276 | pub fn get_untried_urls( | ||
| 277 | &self, | ||
| 278 | domain: &str, | ||
| 279 | identifier: &str, | ||
| 280 | available_urls: &[String] | ||
| 281 | ) -> Vec<String> { | ||
| 282 | let tried = self.state | ||
| 283 | .get(domain) | ||
| 284 | .and_then(|domain_state| domain_state.get(identifier)) | ||
| 285 | .map(|entry| entry.1.clone()) | ||
| 286 | .unwrap_or_default(); | ||
| 287 | |||
| 288 | available_urls | ||
| 289 | .iter() | ||
| 290 | .filter(|url| !tried.contains(*url)) | ||
| 291 | .cloned() | ||
| 292 | .collect() | ||
| 293 | } | ||
| 294 | |||
| 295 | /// Mark a URL as being fetched (in progress) | ||
| 296 | /// Returns false if domain has no capacity or identifier already in progress for this domain | ||
| 297 | pub fn start_fetch(&self, domain: &str, identifier: &str, url: &str) -> bool { | ||
| 298 | // Check capacity | ||
| 299 | if !self.has_capacity(domain) { | ||
| 300 | return false; | ||
| 301 | } | ||
| 302 | |||
| 303 | // Check if already in progress for this domain+identifier | ||
| 304 | if self.is_in_progress(domain, identifier) { | ||
| 305 | return false; | ||
| 306 | } | ||
| 307 | |||
| 308 | // Increment in-flight counter | ||
| 309 | *self.in_flight.entry(domain.to_string()).or_insert(0) += 1; | ||
| 310 | |||
| 311 | // Record request time | ||
| 312 | self.request_times | ||
| 313 | .entry(domain.to_string()) | ||
| 314 | .or_default() | ||
| 315 | .push_back(Instant::now()); | ||
| 316 | |||
| 317 | // Mark in progress and add URL to tried set | ||
| 318 | self.state | ||
| 319 | .entry(domain.to_string()) | ||
| 320 | .or_default() | ||
| 321 | .entry(identifier.to_string()) | ||
| 322 | .and_modify(|(in_progress, tried)| { | ||
| 323 | *in_progress = true; | ||
| 324 | tried.insert(url.to_string()); | ||
| 325 | }) | ||
| 326 | .or_insert_with(|| { | ||
| 327 | let mut tried = HashSet::new(); | ||
| 328 | tried.insert(url.to_string()); | ||
| 329 | (true, tried) | ||
| 330 | }); | ||
| 331 | |||
| 332 | true | ||
| 333 | } | ||
| 334 | |||
| 335 | /// Mark a fetch as complete | ||
| 336 | pub fn complete_fetch(&self, domain: &str, identifier: &str) { | ||
| 337 | // Decrement in-flight counter | ||
| 338 | if let Some(mut count) = self.in_flight.get_mut(domain) { | ||
| 339 | *count = count.saturating_sub(1); | ||
| 340 | } | ||
| 341 | |||
| 342 | // Clear in_progress flag | ||
| 343 | if let Some(domain_state) = self.state.get(domain) { | ||
| 344 | if let Some(mut entry) = domain_state.get_mut(identifier) { | ||
| 345 | entry.0 = false; | ||
| 346 | } | ||
| 347 | } | ||
| 348 | |||
| 349 | // Clean old request times | ||
| 350 | let now = Instant::now(); | ||
| 351 | let window = Duration::from_secs(60); | ||
| 352 | if let Some(mut times) = self.request_times.get_mut(domain) { | ||
| 353 | while times.front().map_or(false, |t| now.duration_since(*t) >= window) { | ||
| 354 | times.pop_front(); | ||
| 355 | } | ||
| 356 | } | ||
| 357 | } | ||
| 358 | |||
| 359 | /// Check if all URLs have been tried for this domain+identifier | ||
| 360 | pub fn all_urls_tried( | ||
| 361 | &self, | ||
| 362 | domain: &str, | ||
| 363 | identifier: &str, | ||
| 364 | available_urls: &[String] | ||
| 365 | ) -> bool { | ||
| 366 | self.get_untried_urls(domain, identifier, available_urls).is_empty() | ||
| 367 | } | ||
| 368 | |||
| 369 | /// Remove identifier from a domain's tracking (called when all URLs tried) | ||
| 370 | pub fn remove_identifier_from_domain(&self, domain: &str, identifier: &str) { | ||
| 371 | if let Some(domain_state) = self.state.get(domain) { | ||
| 372 | domain_state.remove(identifier); | ||
| 373 | } | ||
| 374 | } | ||
| 375 | |||
| 376 | /// Remove identifier from all domains (called when sync complete or events expired) | ||
| 377 | pub fn remove_identifier(&self, identifier: &str) { | ||
| 378 | for domain_entry in self.state.iter() { | ||
| 379 | domain_entry.value().remove(identifier); | ||
| 380 | } | ||
| 381 | } | ||
| 382 | |||
| 383 | /// Get all domains where this identifier has untried URLs | ||
| 384 | pub fn domains_with_untried_urls(&self, identifier: &str) -> Vec<String> { | ||
| 385 | self.state | ||
| 386 | .iter() | ||
| 387 | .filter(|entry| entry.value().contains_key(identifier)) | ||
| 388 | .map(|entry| entry.key().clone()) | ||
| 389 | .collect() | ||
| 390 | } | ||
| 391 | } | ||
| 392 | ``` | ||
| 393 | |||
| 394 | ### Modified Purgatory Structure | ||
| 395 | |||
| 396 | ```rust | ||
| 397 | pub struct Purgatory { | ||
| 398 | /// State events (kind 30618) indexed by repository identifier | ||
| 399 | state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>, | ||
| 400 | |||
| 401 | /// PR events (kind 1617/1618) indexed by event ID | ||
| 402 | pr_events: Arc<DashMap<String, PrPurgatoryEntry>>, | ||
| 403 | |||
| 404 | /// NEW: Sync queue - identifiers pending git data sync | ||
| 405 | sync_queue: Arc<DashMap<String, SyncQueueEntry>>, | ||
| 406 | |||
| 407 | /// NEW: Domain-level throttling and per-identifier fetch state | ||
| 408 | domain_throttle: Arc<DomainThrottle>, | ||
| 409 | |||
| 410 | git_data_path: PathBuf, | ||
| 411 | } | ||
| 412 | ``` | ||
| 413 | |||
| 414 | ### Key Methods | ||
| 415 | |||
| 416 | #### Adding Events to Purgatory | ||
| 417 | |||
| 418 | ```rust | ||
| 419 | impl Purgatory { | ||
| 420 | /// Add a state event to purgatory (user-submitted, 3min delay) | ||
| 421 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | ||
| 422 | // ... existing logic to add to state_events ... | ||
| 423 | |||
| 424 | // Add to sync queue with 3 minute delay | ||
| 425 | self.enqueue_sync(&identifier, Duration::from_secs(180)); | ||
| 426 | } | ||
| 427 | |||
| 428 | /// Add a PR event to purgatory (user-submitted, 3min delay) | ||
| 429 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | ||
| 430 | // ... existing logic to add to pr_events ... | ||
| 431 | |||
| 432 | // Extract identifier from event's `a` tag and enqueue sync | ||
| 433 | if let Some(identifier) = extract_identifier_from_pr(&event) { | ||
| 434 | self.enqueue_sync(&identifier, Duration::from_secs(180)); | ||
| 435 | } | ||
| 436 | } | ||
| 437 | |||
| 438 | /// Trigger immediate sync for an identifier (called from negentropy sync) | ||
| 439 | /// Still applies 500ms debounce for batching burst arrivals | ||
| 440 | pub fn trigger_immediate_sync(&self, identifier: &str) { | ||
| 441 | self.enqueue_sync(identifier, Duration::from_millis(500)); | ||
| 442 | } | ||
| 443 | |||
| 444 | /// Internal: Add identifier to sync queue with specified delay | ||
| 445 | fn enqueue_sync(&self, identifier: &str, delay: Duration) { | ||
| 446 | self.sync_queue | ||
| 447 | .entry(identifier.to_string()) | ||
| 448 | .and_modify(|entry| { | ||
| 449 | // New event arrived - reset backoff, potentially update timing | ||
| 450 | entry.reset(delay); | ||
| 451 | }) | ||
| 452 | .or_insert_with(|| SyncQueueEntry::new(delay)); | ||
| 453 | } | ||
| 454 | } | ||
| 455 | ``` | ||
| 456 | |||
| 457 | #### Extracting Identifier from PR Events | ||
| 458 | |||
| 459 | ```rust | ||
| 460 | /// Extract repository identifier from PR event's `a` tag | ||
| 461 | fn extract_identifier_from_pr(event: &Event) -> Option<String> { | ||
| 462 | event.tags.iter().find_map(|tag| { | ||
| 463 | let tag_vec = tag.clone().to_vec(); | ||
| 464 | if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { | ||
| 465 | // Format: 30617:<owner>:<identifier> | ||
| 466 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 467 | if parts.len() >= 3 { | ||
| 468 | return Some(parts[2].to_string()); | ||
| 469 | } | ||
| 470 | } | ||
| 471 | None | ||
| 472 | }) | ||
| 473 | } | ||
| 474 | ``` | ||
| 475 | |||
| 476 | ### Sync Loop | ||
| 477 | |||
| 478 | A single background loop handles syncing. The `DomainThrottle` is just state tracking, not a separate processing queue. | ||
| 479 | |||
| 480 | ```rust | ||
| 481 | impl Purgatory { | ||
| 482 | /// Start the background sync loop | ||
| 483 | pub fn start_sync_loop( | ||
| 484 | self: Arc<Self>, | ||
| 485 | database: SharedDatabase, | ||
| 486 | our_domain: Option<String>, | ||
| 487 | local_relay: Option<nostr_relay_builder::LocalRelay>, | ||
| 488 | ) -> tokio::task::JoinHandle<()> { | ||
| 489 | tokio::spawn(async move { | ||
| 490 | let mut interval = tokio::time::interval(Duration::from_secs(1)); | ||
| 491 | |||
| 492 | loop { | ||
| 493 | interval.tick().await; | ||
| 494 | |||
| 495 | // Find ALL ready identifiers | ||
| 496 | let ready_identifiers: Vec<String> = self.sync_queue | ||
| 497 | .iter() | ||
| 498 | .filter(|entry| entry.value().is_ready()) | ||
| 499 | .map(|entry| entry.key().clone()) | ||
| 500 | .collect(); | ||
| 501 | |||
| 502 | // Spawn sync tasks in parallel for each ready identifier | ||
| 503 | for identifier in ready_identifiers { | ||
| 504 | // Check if there are still events in purgatory for this identifier | ||
| 505 | if !self.has_pending_events(&identifier) { | ||
| 506 | self.sync_queue.remove(&identifier); | ||
| 507 | self.domain_throttle.remove_identifier(&identifier); | ||
| 508 | continue; | ||
| 509 | } | ||
| 510 | |||
| 511 | // Mark as in progress (prevents re-spawning on next tick) | ||
| 512 | if let Some(mut entry) = self.sync_queue.get_mut(&identifier) { | ||
| 513 | if entry.in_progress { | ||
| 514 | continue; // Already running | ||
| 515 | } | ||
| 516 | entry.in_progress = true; | ||
| 517 | } | ||
| 518 | |||
| 519 | // Spawn task for this identifier | ||
| 520 | let purgatory = self.clone(); | ||
| 521 | let db = database.clone(); | ||
| 522 | let domain = our_domain.clone(); | ||
| 523 | let relay = local_relay.clone(); | ||
| 524 | let id = identifier.clone(); | ||
| 525 | |||
| 526 | tokio::spawn(async move { | ||
| 527 | purgatory.sync_identifier( | ||
| 528 | &id, | ||
| 529 | &db, | ||
| 530 | domain.as_deref(), | ||
| 531 | relay.as_ref(), | ||
| 532 | ).await; | ||
| 533 | |||
| 534 | // Check if events remain | ||
| 535 | if !purgatory.has_pending_events(&id) { | ||
| 536 | purgatory.sync_queue.remove(&id); | ||
| 537 | purgatory.domain_throttle.remove_identifier(&id); | ||
| 538 | tracing::info!(identifier = %id, "Sync complete, removed from queues"); | ||
| 539 | } else { | ||
| 540 | // Apply backoff (or not, if new event arrived during sync) | ||
| 541 | if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { | ||
| 542 | entry.on_sync_complete(); | ||
| 543 | tracing::debug!( | ||
| 544 | identifier = %id, | ||
| 545 | attempt = entry.attempt_count, | ||
| 546 | next_attempt_secs = entry.next_attempt.duration_since(Instant::now()).as_secs(), | ||
| 547 | "Sync attempt complete, scheduled next attempt" | ||
| 548 | ); | ||
| 549 | } | ||
| 550 | } | ||
| 551 | }); | ||
| 552 | } | ||
| 553 | } | ||
| 554 | }) | ||
| 555 | } | ||
| 556 | |||
| 557 | /// Check if there are pending events for an identifier | ||
| 558 | fn has_pending_events(&self, identifier: &str) -> bool { | ||
| 559 | // Check state events | ||
| 560 | if self.state_events.get(identifier).map_or(false, |v| !v.is_empty()) { | ||
| 561 | return true; | ||
| 562 | } | ||
| 563 | |||
| 564 | // Check PR events (need to scan for matching identifier) | ||
| 565 | for entry in self.pr_events.iter() { | ||
| 566 | if let Some(ref event) = entry.value().event { | ||
| 567 | if extract_identifier_from_pr(event).as_deref() == Some(identifier) { | ||
| 568 | return true; | ||
| 569 | } | ||
| 570 | } | ||
| 571 | } | ||
| 572 | |||
| 573 | false | ||
| 574 | } | ||
| 575 | } | ||
| 576 | ``` | ||
| 577 | |||
| 578 | ### Core Sync Logic | ||
| 579 | |||
| 580 | ```rust | ||
| 581 | impl Purgatory { | ||
| 582 | /// Sync git data for all purgatory events with this identifier | ||
| 583 | /// | ||
| 584 | /// Uses a while loop that: | ||
| 585 | /// 1. Recalculates URLs and OIDs fresh each iteration (catches new events/announcements) | ||
| 586 | /// 2. Gets untried URLs per domain from DomainThrottle | ||
| 587 | /// 3. Skips domains that are fully throttled, tries available ones | ||
| 588 | /// 4. Continues while: events remain AND OIDs missing AND untried URLs exist | ||
| 589 | async fn sync_identifier( | ||
| 590 | &self, | ||
| 591 | identifier: &str, | ||
| 592 | database: &SharedDatabase, | ||
| 593 | our_domain: Option<&str>, | ||
| 594 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 595 | ) { | ||
| 596 | loop { | ||
| 597 | // 1. Check if any events remain (may have expired or been processed) | ||
| 598 | if !self.has_pending_events(identifier) { | ||
| 599 | return; | ||
| 600 | } | ||
| 601 | |||
| 602 | // 2. Collect all OIDs needed (fresh calculation each iteration) | ||
| 603 | let needed_oids = self.collect_needed_oids(identifier); | ||
| 604 | |||
| 605 | if needed_oids.is_empty() { | ||
| 606 | // No OIDs needed - try to process events and exit | ||
| 607 | self.try_process_events(identifier, database, our_domain, local_relay).await; | ||
| 608 | return; | ||
| 609 | } | ||
| 610 | |||
| 611 | // 3. Get repository data and clone URLs (fresh calculation each iteration) | ||
| 612 | let db_repo_data = match fetch_repository_data(database, identifier).await { | ||
| 613 | Ok(data) => data, | ||
| 614 | Err(e) => { | ||
| 615 | tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data"); | ||
| 616 | return; | ||
| 617 | } | ||
| 618 | }; | ||
| 619 | |||
| 620 | // 4. Collect clone URLs, excluding our domain | ||
| 621 | let all_clone_urls: Vec<String> = db_repo_data | ||
| 622 | .announcements | ||
| 623 | .iter() | ||
| 624 | .flat_map(|a| a.clone_urls.iter().cloned()) | ||
| 625 | .filter(|url| our_domain.map_or(true, |d| !url.contains(d))) | ||
| 626 | .collect::<HashSet<_>>() | ||
| 627 | .into_iter() | ||
| 628 | .collect(); | ||
| 629 | |||
| 630 | if all_clone_urls.is_empty() { | ||
| 631 | tracing::debug!(identifier = %identifier, "No external clone URLs available"); | ||
| 632 | return; | ||
| 633 | } | ||
| 634 | |||
| 635 | // 5. Group URLs by domain | ||
| 636 | let urls_by_domain: HashMap<String, Vec<String>> = all_clone_urls | ||
| 637 | .iter() | ||
| 638 | .fold(HashMap::new(), |mut acc, url| { | ||
| 639 | let domain = extract_domain(url); | ||
| 640 | acc.entry(domain).or_default().push(url.clone()); | ||
| 641 | acc | ||
| 642 | }); | ||
| 643 | |||
| 644 | // 6. Find best local repo to fetch into | ||
| 645 | let target_repo = match self.find_target_repo(&db_repo_data, identifier) { | ||
| 646 | Some(path) => path, | ||
| 647 | None => { | ||
| 648 | tracing::debug!(identifier = %identifier, "No local repository found"); | ||
| 649 | return; | ||
| 650 | } | ||
| 651 | }; | ||
| 652 | |||
| 653 | // 7. Partition domains: available (have capacity) vs throttled | ||
| 654 | let (available_domains, throttled_domains): (Vec<_>, Vec<_>) = urls_by_domain | ||
| 655 | .keys() | ||
| 656 | .cloned() | ||
| 657 | .partition(|domain| self.domain_throttle.has_capacity(domain)); | ||
| 658 | |||
| 659 | let mut remaining_oids: HashSet<String> = needed_oids.clone(); | ||
| 660 | let mut any_fetch_started = false; | ||
| 661 | |||
| 662 | // 8. Try available domains first | ||
| 663 | for domain in &available_domains { | ||
| 664 | if remaining_oids.is_empty() { | ||
| 665 | break; | ||
| 666 | } | ||
| 667 | |||
| 668 | let domain_urls = urls_by_domain.get(domain).unwrap(); | ||
| 669 | let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls); | ||
| 670 | |||
| 671 | for url in untried_urls { | ||
| 672 | if remaining_oids.is_empty() { | ||
| 673 | break; | ||
| 674 | } | ||
| 675 | |||
| 676 | // Skip if already in progress for this domain+identifier | ||
| 677 | if self.domain_throttle.is_in_progress(domain, identifier) { | ||
| 678 | break; // Wait for current fetch to complete | ||
| 679 | } | ||
| 680 | |||
| 681 | // Try to start fetch (checks capacity again, marks in_progress) | ||
| 682 | if !self.domain_throttle.start_fetch(domain, identifier, &url) { | ||
| 683 | break; // Domain at capacity | ||
| 684 | } | ||
| 685 | |||
| 686 | any_fetch_started = true; | ||
| 687 | |||
| 688 | // Fetch OIDs | ||
| 689 | let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect(); | ||
| 690 | let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await; | ||
| 691 | |||
| 692 | // Mark fetch complete | ||
| 693 | self.domain_throttle.complete_fetch(domain, identifier); | ||
| 694 | |||
| 695 | match fetch_result { | ||
| 696 | Ok(fetched_oids) => { | ||
| 697 | if !fetched_oids.is_empty() { | ||
| 698 | let fetched_count = fetched_oids.len(); | ||
| 699 | for oid in fetched_oids { | ||
| 700 | remaining_oids.remove(&oid); | ||
| 701 | } | ||
| 702 | tracing::info!( | ||
| 703 | identifier = %identifier, | ||
| 704 | url = %url, | ||
| 705 | fetched = fetched_count, | ||
| 706 | remaining = remaining_oids.len(), | ||
| 707 | "Fetched OIDs from server" | ||
| 708 | ); | ||
| 709 | } | ||
| 710 | } | ||
| 711 | Err(e) => { | ||
| 712 | tracing::debug!( | ||
| 713 | identifier = %identifier, | ||
| 714 | url = %url, | ||
| 715 | error = %e, | ||
| 716 | "Failed to fetch from server" | ||
| 717 | ); | ||
| 718 | } | ||
| 719 | } | ||
| 720 | } | ||
| 721 | |||
| 722 | // Clean up if all URLs tried for this domain | ||
| 723 | if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) { | ||
| 724 | self.domain_throttle.remove_identifier_from_domain(domain, identifier); | ||
| 725 | } | ||
| 726 | } | ||
| 727 | |||
| 728 | // 9. If still need OIDs, try throttled domains (they might have capacity now) | ||
| 729 | if !remaining_oids.is_empty() { | ||
| 730 | for domain in &throttled_domains { | ||
| 731 | if remaining_oids.is_empty() { | ||
| 732 | break; | ||
| 733 | } | ||
| 734 | |||
| 735 | // Re-check capacity (might have freed up) | ||
| 736 | if !self.domain_throttle.has_capacity(domain) { | ||
| 737 | continue; | ||
| 738 | } | ||
| 739 | |||
| 740 | let domain_urls = urls_by_domain.get(domain).unwrap(); | ||
| 741 | let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls); | ||
| 742 | |||
| 743 | for url in untried_urls { | ||
| 744 | if remaining_oids.is_empty() { | ||
| 745 | break; | ||
| 746 | } | ||
| 747 | |||
| 748 | if self.domain_throttle.is_in_progress(domain, identifier) { | ||
| 749 | break; | ||
| 750 | } | ||
| 751 | |||
| 752 | if !self.domain_throttle.start_fetch(domain, identifier, &url) { | ||
| 753 | break; | ||
| 754 | } | ||
| 755 | |||
| 756 | any_fetch_started = true; | ||
| 757 | |||
| 758 | let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect(); | ||
| 759 | let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await; | ||
| 760 | |||
| 761 | self.domain_throttle.complete_fetch(domain, identifier); | ||
| 762 | |||
| 763 | if let Ok(fetched_oids) = fetch_result { | ||
| 764 | for oid in fetched_oids { | ||
| 765 | remaining_oids.remove(&oid); | ||
| 766 | } | ||
| 767 | } | ||
| 768 | } | ||
| 769 | |||
| 770 | if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) { | ||
| 771 | self.domain_throttle.remove_identifier_from_domain(domain, identifier); | ||
| 772 | } | ||
| 773 | } | ||
| 774 | } | ||
| 775 | |||
| 776 | // 10. Try to process events that can now be satisfied | ||
| 777 | self.try_process_events(identifier, database, our_domain, local_relay).await; | ||
| 778 | |||
| 779 | // 11. Decide whether to continue looping | ||
| 780 | let still_have_events = self.has_pending_events(identifier); | ||
| 781 | if !still_have_events { | ||
| 782 | return; | ||
| 783 | } | ||
| 784 | |||
| 785 | let still_need_oids = !self.collect_needed_oids(identifier).is_empty(); | ||
| 786 | if !still_need_oids { | ||
| 787 | // Events remain but no OIDs needed - loop to try processing again | ||
| 788 | continue; | ||
| 789 | } | ||
| 790 | |||
| 791 | // Check if there are any untried URLs left across all domains | ||
| 792 | let have_untried_urls = urls_by_domain.iter().any(|(domain, urls)| { | ||
| 793 | !self.domain_throttle.get_untried_urls(domain, identifier, urls).is_empty() | ||
| 794 | }); | ||
| 795 | |||
| 796 | if !have_untried_urls { | ||
| 797 | // No more URLs to try - exit and let backoff handle retry | ||
| 798 | return; | ||
| 799 | } | ||
| 800 | |||
| 801 | // If no fetch was started this iteration (all throttled), yield briefly | ||
| 802 | if !any_fetch_started { | ||
| 803 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 804 | } | ||
| 805 | } | ||
| 806 | } | ||
| 807 | |||
| 808 | /// Collect all OIDs needed for purgatory events with this identifier | ||
| 809 | fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { | ||
| 810 | let mut oids = HashSet::new(); | ||
| 811 | |||
| 812 | // Collect from state events | ||
| 813 | if let Some(entries) = self.state_events.get(identifier) { | ||
| 814 | for entry in entries.iter() { | ||
| 815 | if let Ok(state) = RepositoryState::from_event(entry.event.clone()) { | ||
| 816 | for branch in &state.branches { | ||
| 817 | if !branch.commit.starts_with("ref: ") { | ||
| 818 | oids.insert(branch.commit.clone()); | ||
| 819 | } | ||
| 820 | } | ||
| 821 | for tag in &state.tags { | ||
| 822 | if !tag.commit.starts_with("ref: ") { | ||
| 823 | oids.insert(tag.commit.clone()); | ||
| 824 | } | ||
| 825 | } | ||
| 826 | } | ||
| 827 | } | ||
| 828 | } | ||
| 829 | |||
| 830 | // Collect from PR events | ||
| 831 | for entry in self.pr_events.iter() { | ||
| 832 | if let Some(ref event) = entry.value().event { | ||
| 833 | if extract_identifier_from_pr(event).as_deref() == Some(identifier) { | ||
| 834 | if let Some(commit) = extract_commit_from_pr(event) { | ||
| 835 | oids.insert(commit); | ||
| 836 | } | ||
| 837 | } | ||
| 838 | } | ||
| 839 | } | ||
| 840 | |||
| 841 | oids | ||
| 842 | } | ||
| 843 | |||
| 844 | /// Try to process events that can now be satisfied | ||
| 845 | async fn try_process_events( | ||
| 846 | &self, | ||
| 847 | identifier: &str, | ||
| 848 | database: &SharedDatabase, | ||
| 849 | our_domain: Option<&str>, | ||
| 850 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 851 | ) { | ||
| 852 | if !self.has_pending_events(identifier) { | ||
| 853 | return; | ||
| 854 | } | ||
| 855 | |||
| 856 | let db_repo_data = match fetch_repository_data(database, identifier).await { | ||
| 857 | Ok(data) => data, | ||
| 858 | Err(e) => { | ||
| 859 | tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data for processing"); | ||
| 860 | return; | ||
| 861 | } | ||
| 862 | }; | ||
| 863 | |||
| 864 | // Process state events (oldest first) | ||
| 865 | if let Some(mut entries) = self.state_events.get_mut(identifier) { | ||
| 866 | entries.sort_by_key(|e| e.event.created_at); | ||
| 867 | |||
| 868 | let mut to_remove = Vec::new(); | ||
| 869 | |||
| 870 | for entry in entries.iter() { | ||
| 871 | if let Ok(state) = RepositoryState::from_event(entry.event.clone()) { | ||
| 872 | if self.can_satisfy_state(&state, &db_repo_data) { | ||
| 873 | match self.process_state_event(&state, &db_repo_data, database, local_relay).await { | ||
| 874 | Ok(()) => { | ||
| 875 | to_remove.push(entry.event.id); | ||
| 876 | } | ||
| 877 | Err(e) => { | ||
| 878 | tracing::warn!( | ||
| 879 | event_id = %entry.event.id, | ||
| 880 | error = %e, | ||
| 881 | "Failed to process state event" | ||
| 882 | ); | ||
| 883 | } | ||
| 884 | } | ||
| 885 | } | ||
| 886 | } | ||
| 887 | } | ||
| 888 | |||
| 889 | entries.retain(|e| !to_remove.contains(&e.event.id)); | ||
| 890 | } | ||
| 891 | |||
| 892 | // Process PR events (oldest first) | ||
| 893 | let mut pr_to_remove = Vec::new(); | ||
| 894 | let mut pr_entries: Vec<_> = self.pr_events | ||
| 895 | .iter() | ||
| 896 | .filter_map(|entry| { | ||
| 897 | entry.value().event.as_ref().and_then(|event| { | ||
| 898 | if extract_identifier_from_pr(event).as_deref() == Some(identifier) { | ||
| 899 | Some((entry.key().clone(), event.clone(), entry.value().commit.clone())) | ||
| 900 | } else { | ||
| 901 | None | ||
| 902 | } | ||
| 903 | }) | ||
| 904 | }) | ||
| 905 | .collect(); | ||
| 906 | |||
| 907 | pr_entries.sort_by_key(|(_, event, _)| event.created_at); | ||
| 908 | |||
| 909 | for (event_id, event, commit) in pr_entries { | ||
| 910 | if self.can_satisfy_pr(&commit, &db_repo_data) { | ||
| 911 | match self.process_pr_event(&event, &commit, &db_repo_data, database, local_relay).await { | ||
| 912 | Ok(()) => { | ||
| 913 | pr_to_remove.push(event_id); | ||
| 914 | } | ||
| 915 | Err(e) => { | ||
| 916 | tracing::warn!( | ||
| 917 | event_id = %event.id, | ||
| 918 | error = %e, | ||
| 919 | "Failed to process PR event" | ||
| 920 | ); | ||
| 921 | } | ||
| 922 | } | ||
| 923 | } | ||
| 924 | } | ||
| 925 | |||
| 926 | for event_id in pr_to_remove { | ||
| 927 | self.pr_events.remove(&event_id); | ||
| 928 | } | ||
| 929 | } | ||
| 930 | |||
| 931 | /// Check if a state event can be satisfied (all OIDs available locally) | ||
| 932 | fn can_satisfy_state(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> bool { | ||
| 933 | for announcement in &db_repo_data.announcements { | ||
| 934 | let repo_path = self.git_data_path.join(announcement.repo_path()); | ||
| 935 | if !repo_path.exists() { | ||
| 936 | continue; | ||
| 937 | } | ||
| 938 | |||
| 939 | let all_present = state.branches.iter().all(|b| { | ||
| 940 | b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit) | ||
| 941 | }) && state.tags.iter().all(|t| { | ||
| 942 | oid_exists(&repo_path, &t.commit) | ||
| 943 | }); | ||
| 944 | |||
| 945 | if all_present { | ||
| 946 | return true; | ||
| 947 | } | ||
| 948 | } | ||
| 949 | false | ||
| 950 | } | ||
| 951 | |||
| 952 | /// Check if a PR event can be satisfied (commit available locally) | ||
| 953 | fn can_satisfy_pr(&self, commit: &str, db_repo_data: &RepositoryData) -> bool { | ||
| 954 | for announcement in &db_repo_data.announcements { | ||
| 955 | let repo_path = self.git_data_path.join(announcement.repo_path()); | ||
| 956 | if repo_path.exists() && oid_exists(&repo_path, commit) { | ||
| 957 | return true; | ||
| 958 | } | ||
| 959 | } | ||
| 960 | false | ||
| 961 | } | ||
| 962 | } | ||
| 963 | |||
| 964 | /// Extract commit hash from PR event's `c` tag | ||
| 965 | fn extract_commit_from_pr(event: &Event) -> Option<String> { | ||
| 966 | event.tags.iter().find_map(|tag| { | ||
| 967 | let tag_vec = tag.clone().to_vec(); | ||
| 968 | if tag_vec.len() >= 2 && tag_vec[0] == "c" { | ||
| 969 | Some(tag_vec[1].clone()) | ||
| 970 | } else { | ||
| 971 | None | ||
| 972 | } | ||
| 973 | }) | ||
| 974 | } | ||
| 975 | ``` | ||
| 976 | |||
| 977 | ### Fetching OIDs | ||
| 978 | |||
| 979 | ```rust | ||
| 980 | /// Fetch specific OIDs from a remote git server | ||
| 981 | /// | ||
| 982 | /// Returns the list of OIDs that were successfully fetched (now exist locally). | ||
| 983 | /// Git fetch may partially succeed, so we check which OIDs are available after. | ||
| 984 | async fn fetch_oids_from_server( | ||
| 985 | repo_path: &Path, | ||
| 986 | server_url: &str, | ||
| 987 | oids: &[String], | ||
| 988 | ) -> Result<Vec<String>> { | ||
| 989 | if oids.is_empty() { | ||
| 990 | return Ok(Vec::new()); | ||
| 991 | } | ||
| 992 | |||
| 993 | let repo_path = repo_path.to_path_buf(); | ||
| 994 | let server_url = server_url.to_string(); | ||
| 995 | let oids = oids.to_vec(); | ||
| 996 | |||
| 997 | tokio::task::spawn_blocking(move || { | ||
| 998 | // Build git fetch command with all OIDs | ||
| 999 | let mut args = vec!["fetch", "--depth=1", &server_url]; | ||
| 1000 | args.extend(oids.iter().map(|s| s.as_str())); | ||
| 1001 | |||
| 1002 | tracing::debug!( | ||
| 1003 | oids_count = oids.len(), | ||
| 1004 | server = %server_url, | ||
| 1005 | "Fetching OIDs" | ||
| 1006 | ); | ||
| 1007 | |||
| 1008 | let output = Command::new("git") | ||
| 1009 | .args(&args) | ||
| 1010 | .current_dir(&repo_path) | ||
| 1011 | .output(); | ||
| 1012 | |||
| 1013 | match output { | ||
| 1014 | Ok(result) => { | ||
| 1015 | // Check which OIDs we now have (regardless of command success) | ||
| 1016 | // git fetch may partially succeed | ||
| 1017 | let fetched: Vec<String> = oids | ||
| 1018 | .iter() | ||
| 1019 | .filter(|oid| oid_exists(&repo_path, oid)) | ||
| 1020 | .cloned() | ||
| 1021 | .collect(); | ||
| 1022 | |||
| 1023 | if !result.status.success() { | ||
| 1024 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 1025 | tracing::debug!( | ||
| 1026 | server = %server_url, | ||
| 1027 | stderr = %stderr, | ||
| 1028 | fetched_count = fetched.len(), | ||
| 1029 | "git fetch returned non-zero but may have fetched some OIDs" | ||
| 1030 | ); | ||
| 1031 | } | ||
| 1032 | |||
| 1033 | Ok(fetched) | ||
| 1034 | } | ||
| 1035 | Err(e) => { | ||
| 1036 | bail!("git fetch command error: {}", e) | ||
| 1037 | } | ||
| 1038 | } | ||
| 1039 | }) | ||
| 1040 | .await? | ||
| 1041 | } | ||
| 1042 | |||
| 1043 | /// Extract domain from a URL for throttling | ||
| 1044 | fn extract_domain(url: &str) -> String { | ||
| 1045 | url::Url::parse(url) | ||
| 1046 | .ok() | ||
| 1047 | .and_then(|u| u.host_str().map(|s| s.to_string())) | ||
| 1048 | .unwrap_or_else(|| url.to_string()) | ||
| 1049 | } | ||
| 1050 | ``` | ||
| 1051 | |||
| 1052 | ### Helper Methods | ||
| 1053 | |||
| 1054 | ```rust | ||
| 1055 | impl Purgatory { | ||
| 1056 | /// Find the best local repository to fetch OIDs into | ||
| 1057 | /// | ||
| 1058 | /// Prefers the repo with the most recent commit on its default branch, | ||
| 1059 | /// as it's most likely to have related git history. | ||
| 1060 | fn find_target_repo(&self, db_repo_data: &RepositoryData, identifier: &str) -> Option<PathBuf> { | ||
| 1061 | let mut best: Option<(Timestamp, PathBuf)> = None; | ||
| 1062 | |||
| 1063 | for announcement in &db_repo_data.announcements { | ||
| 1064 | let repo_path = self.git_data_path.join(announcement.repo_path()); | ||
| 1065 | if !repo_path.exists() { | ||
| 1066 | continue; | ||
| 1067 | } | ||
| 1068 | |||
| 1069 | let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path) | ||
| 1070 | .unwrap_or(Timestamp::zero()); | ||
| 1071 | |||
| 1072 | if best.as_ref().map_or(true, |(d, _)| commit_date > *d) { | ||
| 1073 | best = Some((commit_date, repo_path)); | ||
| 1074 | } | ||
| 1075 | } | ||
| 1076 | |||
| 1077 | best.map(|(_, path)| path) | ||
| 1078 | } | ||
| 1079 | |||
| 1080 | /// Process a state event that can now be satisfied | ||
| 1081 | /// | ||
| 1082 | /// Syncs OIDs to all owner repos, aligns refs, saves to DB, notifies subscribers. | ||
| 1083 | async fn process_state_event( | ||
| 1084 | &self, | ||
| 1085 | state: &RepositoryState, | ||
| 1086 | db_repo_data: &RepositoryData, | ||
| 1087 | database: &SharedDatabase, | ||
| 1088 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 1089 | ) -> Result<()> { | ||
| 1090 | // Find source repo (one that has all OIDs) | ||
| 1091 | let source_repo = self.find_repo_with_all_oids(state, db_repo_data) | ||
| 1092 | .ok_or_else(|| anyhow::anyhow!("No repo has all required OIDs"))?; | ||
| 1093 | |||
| 1094 | // Sync to other owner repos and align refs | ||
| 1095 | let sync_result = sync_to_owner_repos(&source_repo, state, db_repo_data, &self.git_data_path); | ||
| 1096 | |||
| 1097 | tracing::info!( | ||
| 1098 | identifier = %state.identifier, | ||
| 1099 | event_id = %state.event.id, | ||
| 1100 | repos_synced = sync_result.repos_synced, | ||
| 1101 | "Synced state from purgatory" | ||
| 1102 | ); | ||
| 1103 | |||
| 1104 | // Save to database | ||
| 1105 | database.save_event(&state.event).await?; | ||
| 1106 | |||
| 1107 | // Notify subscribers | ||
| 1108 | if let Some(relay) = local_relay { | ||
| 1109 | relay.notify_event(state.event.clone()); | ||
| 1110 | } | ||
| 1111 | |||
| 1112 | Ok(()) | ||
| 1113 | } | ||
| 1114 | |||
| 1115 | /// Process a PR event that can now be satisfied | ||
| 1116 | async fn process_pr_event( | ||
| 1117 | &self, | ||
| 1118 | event: &Event, | ||
| 1119 | commit: &str, | ||
| 1120 | db_repo_data: &RepositoryData, | ||
| 1121 | database: &SharedDatabase, | ||
| 1122 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 1123 | ) -> Result<()> { | ||
| 1124 | // Save to database | ||
| 1125 | database.save_event(event).await?; | ||
| 1126 | |||
| 1127 | // Notify subscribers | ||
| 1128 | if let Some(relay) = local_relay { | ||
| 1129 | relay.notify_event(event.clone()); | ||
| 1130 | } | ||
| 1131 | |||
| 1132 | tracing::info!( | ||
| 1133 | event_id = %event.id, | ||
| 1134 | commit = %commit, | ||
| 1135 | "Processed PR event from purgatory" | ||
| 1136 | ); | ||
| 1137 | |||
| 1138 | Ok(()) | ||
| 1139 | } | ||
| 1140 | |||
| 1141 | /// Find a repository that has all OIDs required by a state event | ||
| 1142 | fn find_repo_with_all_oids(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> Option<PathBuf> { | ||
| 1143 | for announcement in &db_repo_data.announcements { | ||
| 1144 | let repo_path = self.git_data_path.join(announcement.repo_path()); | ||
| 1145 | if !repo_path.exists() { | ||
| 1146 | continue; | ||
| 1147 | } | ||
| 1148 | |||
| 1149 | let all_present = state.branches.iter().all(|b| { | ||
| 1150 | b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit) | ||
| 1151 | }) && state.tags.iter().all(|t| { | ||
| 1152 | oid_exists(&repo_path, &t.commit) | ||
| 1153 | }); | ||
| 1154 | |||
| 1155 | if all_present { | ||
| 1156 | return Some(repo_path); | ||
| 1157 | } | ||
| 1158 | } | ||
| 1159 | None | ||
| 1160 | } | ||
| 1161 | } | ||
| 1162 | ``` | ||
| 1163 | ``` | ||
| 1164 | |||
| 1165 | ## Integration Points | ||
| 1166 | |||
| 1167 | ### 1. Negentropy Sync (src/purgatory/mod.rs:105-107) | ||
| 1168 | |||
| 1169 | When events are added via negentropy sync, call `trigger_immediate_sync`: | ||
| 1170 | |||
| 1171 | ```rust | ||
| 1172 | // In negentropy sync handler | ||
| 1173 | purgatory.add_state(event, identifier.clone(), author); | ||
| 1174 | purgatory.trigger_immediate_sync(&identifier); // NEW: triggers 500ms debounced sync | ||
| 1175 | ``` | ||
| 1176 | |||
| 1177 | ### 2. Relay Startup | ||
| 1178 | |||
| 1179 | Start the sync loop when the relay starts: | ||
| 1180 | |||
| 1181 | ```rust | ||
| 1182 | // In main.rs or server setup | ||
| 1183 | let domain_throttle = Arc::new(DomainThrottle::new(5, 30)); // 5 concurrent, 30/min | ||
| 1184 | let purgatory = Arc::new(Purgatory::new(git_data_path, domain_throttle)); | ||
| 1185 | |||
| 1186 | let sync_handle = purgatory.clone().start_sync_loop( | ||
| 1187 | database.clone(), | ||
| 1188 | Some(domain.clone()), | ||
| 1189 | Some(local_relay.clone()), | ||
| 1190 | ); | ||
| 1191 | ``` | ||
| 1192 | |||
| 1193 | ### 3. Shutdown | ||
| 1194 | |||
| 1195 | The sync loop will naturally stop when the purgatory is dropped. No special shutdown handling needed since all state is in-memory. | ||
| 1196 | |||
| 1197 | ## Testing Strategy | ||
| 1198 | |||
| 1199 | ### Unit Tests | ||
| 1200 | |||
| 1201 | 1. **SyncQueueEntry** | ||
| 1202 | - Verify backoff calculation: 20s → 40s → 80s → 120s → 120s | ||
| 1203 | - Verify `on_new_event()` resets attempt_count and updates next_attempt if sooner | ||
| 1204 | - Verify `on_sync_complete()` applies backoff only if next_attempt is in the past | ||
| 1205 | - Verify `is_ready()` respects both `next_attempt` and `in_progress` | ||
| 1206 | |||
| 1207 | 2. **DomainThrottle** | ||
| 1208 | - Verify concurrent limit: 6th request to same domain blocked | ||
| 1209 | - Verify rate limit: 31st request in a minute blocked | ||
| 1210 | - Verify `has_capacity()` checks both limits | ||
| 1211 | - Verify `get_untried_urls()` returns only URLs not in urls_tried | ||
| 1212 | - Verify `start_fetch()` fails if already in_progress for domain+identifier | ||
| 1213 | - Verify `start_fetch()` adds URL to urls_tried | ||
| 1214 | - Verify `complete_fetch()` decrements in_flight and clears in_progress | ||
| 1215 | - Verify `all_urls_tried()` correctly identifies when done | ||
| 1216 | - Verify `remove_identifier_from_domain()` cleans up state | ||
| 1217 | - Verify `remove_identifier()` removes from all domains | ||
| 1218 | |||
| 1219 | 3. **OID collection** | ||
| 1220 | - Verify OIDs extracted from state events correctly | ||
| 1221 | - Verify OIDs extracted from PR events correctly | ||
| 1222 | - Verify deduplication works across state and PR events | ||
| 1223 | |||
| 1224 | 4. **Identifier extraction** | ||
| 1225 | - Verify `extract_identifier_from_pr()` handles various `a` tag formats | ||
| 1226 | - Verify `extract_commit_from_pr()` extracts `c` tag correctly | ||
| 1227 | |||
| 1228 | ### Integration Tests | ||
| 1229 | |||
| 1230 | 1. **Sync against own implementation** | ||
| 1231 | - Start two ngit-grasp instances | ||
| 1232 | - Push to one, verify other can sync via purgatory | ||
| 1233 | - Verify partial OID availability handled correctly (some OIDs fetched, others missing) | ||
| 1234 | |||
| 1235 | 2. **Burst handling** | ||
| 1236 | - Submit 10 events for same identifier within 100ms | ||
| 1237 | - Verify debounce: sync doesn't start until 500ms after last event | ||
| 1238 | - Verify only one sync operation runs (not 10) | ||
| 1239 | |||
| 1240 | 3. **Backoff behavior** | ||
| 1241 | - Configure with unreachable clone URLs | ||
| 1242 | - Verify backoff timing: 20s, 40s, 80s, 120s, then stays at 120s | ||
| 1243 | - Verify new event arriving resets attempt_count to 0 | ||
| 1244 | - Verify new event during sync prevents backoff (next_attempt already in future) | ||
| 1245 | |||
| 1246 | 4. **Rate limiting** | ||
| 1247 | - Configure with single domain having multiple URLs | ||
| 1248 | - Trigger many sync operations | ||
| 1249 | - Verify only 30 requests made in first minute | ||
| 1250 | - Verify only 5 concurrent requests per domain | ||
| 1251 | |||
| 1252 | 5. **Concurrent limit per domain+identifier** | ||
| 1253 | - Start fetch for domain+identifier | ||
| 1254 | - Verify second fetch attempt for same domain+identifier blocked | ||
| 1255 | - Verify fetch for different identifier on same domain allowed (up to 5) | ||
| 1256 | |||
| 1257 | 6. **Parallel identifier processing** | ||
| 1258 | - Add events for 5 different identifiers | ||
| 1259 | - Verify all 5 sync tasks start in parallel (not serial) | ||
| 1260 | - Verify `in_progress` flag prevents duplicate tasks for same identifier | ||
| 1261 | |||
| 1262 | 7. **Dynamic URL/OID recalculation** | ||
| 1263 | - Start sync for identifier | ||
| 1264 | - While sync is running, add new announcement with additional clone URL | ||
| 1265 | - Verify sync picks up new URL in next while loop iteration | ||
| 1266 | - Similarly for new events adding new OIDs | ||
| 1267 | |||
| 1268 | 8. **urls_tried cleanup** | ||
| 1269 | - Sync identifier, exhaust all URLs for a domain | ||
| 1270 | - Verify identifier removed from that domain's state | ||
| 1271 | - Add new announcement with new URL for same domain | ||
| 1272 | - Verify new URL is tried on next sync attempt | ||
| 1273 | |||
| 1274 | 9. **Mixed state and PR events** | ||
| 1275 | - Add state event and PR event for same identifier | ||
| 1276 | - Verify both OID sets collected | ||
| 1277 | - Verify both events processed when OIDs arrive | ||
| 1278 | |||
| 1279 | 10. **Available domains first** | ||
| 1280 | - Have 10 URLs: 8 from available domains, 2 from throttled domain | ||
| 1281 | - Verify available domains tried first | ||
| 1282 | - Verify throttled domain only contacted if available didn't satisfy all OIDs | ||
| 1283 | |||
| 1284 | ## Migration Path | ||
| 1285 | |||
| 1286 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle) | ||
| 1287 | 2. **Phase 2**: Implement sync loop alongside existing `start_state_sync` | ||
| 1288 | 3. **Phase 3**: Migrate state events to use new sync loop | ||
| 1289 | 4. **Phase 4**: Add PR event syncing | ||
| 1290 | 5. **Phase 5**: Remove old `start_state_sync` code | ||
| 1291 | |||
| 1292 | ## Configuration | ||
| 1293 | |||
| 1294 | New configuration options: | ||
| 1295 | |||
| 1296 | | Option | CLI Flag | Environment Variable | Default | | ||
| 1297 | |--------|----------|---------------------|---------| | ||
| 1298 | | Sync loop interval | `--sync-loop-interval-ms` | `NGIT_SYNC_LOOP_INTERVAL_MS` | `1000` | | ||
| 1299 | | Domain concurrent limit | `--sync-domain-concurrent` | `NGIT_SYNC_DOMAIN_CONCURRENT` | `5` | | ||
| 1300 | | Domain rate limit | `--sync-domain-rate-limit` | `NGIT_SYNC_DOMAIN_RATE_LIMIT` | `30` | | ||
| 1301 | | Default sync delay | `--sync-default-delay-secs` | `NGIT_SYNC_DEFAULT_DELAY_SECS` | `180` | | ||
| 1302 | | Immediate sync delay | `--sync-immediate-delay-ms` | `NGIT_SYNC_IMMEDIATE_DELAY_MS` | `500` | | ||
| 1303 | |||
| 1304 | ## Observability | ||
| 1305 | |||
| 1306 | ### Metrics | ||
| 1307 | |||
| 1308 | - `purgatory_sync_queue_size` - Number of identifiers pending sync | ||
| 1309 | - `purgatory_sync_attempts_total` - Counter of sync attempts per identifier | ||
| 1310 | - `purgatory_sync_oids_fetched_total` - Counter of OIDs successfully fetched | ||
| 1311 | - `purgatory_domain_in_flight` - Gauge of in-flight requests per domain | ||
| 1312 | - `purgatory_domain_requests_total` - Counter of requests per domain | ||
| 1313 | - `purgatory_sync_backoff_seconds` - Histogram of backoff durations applied | ||
| 1314 | |||
| 1315 | ### Logging | ||
| 1316 | |||
| 1317 | Key log points: | ||
| 1318 | - `INFO`: Successful sync completion, OIDs fetched | ||
| 1319 | - `DEBUG`: Domain capacity checks, backoff applied, urls_tried state | ||
| 1320 | - `WARN`: Fetch failures, processing errors | ||
| 1321 | |||
| 1322 | ## Open Questions | ||
| 1323 | |||
| 1324 | 1. **PR placeholder handling**: Current code has `add_pr_placeholder()` for git-data-first scenario. How should this interact with the new sync system? (Probably: placeholders don't need syncing since git data already exists) | ||
| 1325 | |||
| 1326 | 2. **Memory bounds**: Should we limit sync queue size? What happens if thousands of identifiers are pending? | ||
| 1327 | |||
| 1328 | 3. **Persistence**: Currently all purgatory state is in-memory. Should sync queue state survive restarts? (Probably no - events will be re-synced via negentropy on restart) | ||