diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-06 14:31:53 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-06 14:38:58 +0000 |
| commit | 534b9e23524a1f651590eccbfef3398fa7fbc495 (patch) | |
| tree | 3bbf1fdc0c0b9b08795cd205166a2ee680d8bfc7 | |
| parent | 71914d92a1bc296b3a585b60989cf0f1bf70dd4b (diff) | |
docs: purgatory git sync design simplify
| -rw-r--r-- | docs/explanation/purgatory-sync-redesign.md | 1487 |
1 files changed, 529 insertions, 958 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index ec02e75..76d01b8 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md | |||
| @@ -25,6 +25,7 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**, | |||
| 25 | 3. Domain-based throttling (30 requests/minute per domain) | 25 | 3. Domain-based throttling (30 requests/minute per domain) |
| 26 | 4. Exponential backoff per identifier (20s → 2m, then 2m intervals) | 26 | 4. Exponential backoff per identifier (20s → 2m, then 2m intervals) |
| 27 | 5. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default) | 27 | 5. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default) |
| 28 | 6. **Clean separation of concerns**: Domain throttle handles rate limiting only; sync logic tracks its own tried URLs | ||
| 28 | 29 | ||
| 29 | ## Architecture | 30 | ## Architecture |
| 30 | 31 | ||
| @@ -69,36 +70,49 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**, | |||
| 69 | │ │ ┌─────────────────────────────────────┐ │ │ | 70 | │ │ ┌─────────────────────────────────────┐ │ │ |
| 70 | │ │ │ sync_identifier() │ │ │ | 71 | │ │ │ sync_identifier() │ │ │ |
| 71 | │ │ │ │ │ │ | 72 | │ │ │ │ │ │ |
| 72 | │ │ │ while (events && oids && urls): │ │ │ | 73 | │ │ │ Owns its own tried_urls: HashSet │ │ │ |
| 73 | │ │ │ 1. Recalc URLs/OIDs (fresh) │ │ │ | 74 | │ │ │ │ │ │ |
| 74 | │ │ │ 2. Get untried URLs per domain │ │ │ | 75 | │ │ │ loop: │ │ │ |
| 75 | │ │ │ 3. Skip throttled domains │ │ │ | 76 | │ │ │ sync_identifier_step() │ │ │ |
| 76 | │ │ │ 4. Try available URLs │ │ │ | 77 | │ │ │ → (url_tried, complete) │ │ │ |
| 77 | │ │ │ (respecting domain limits) │ │ │ | 78 | │ │ │ if complete: break │ │ │ |
| 78 | │ │ │ 5. Process satisfiable events │ │ │ | 79 | │ │ │ tried_urls.insert(url_tried) │ │ │ |
| 79 | │ │ │ 6. Loop catches new events/URLs │ │ │ | ||
| 80 | │ │ └─────────────────────────────────────┘ │ │ | 80 | │ │ └─────────────────────────────────────┘ │ │ |
| 81 | │ │ │ │ │ | 81 | │ │ │ │ │ |
| 82 | │ │ ▼ │ │ | 82 | │ │ ▼ │ │ |
| 83 | │ │ ┌─────────────────────────────────────┐ │ │ | 83 | │ │ ┌─────────────────────────────────────┐ │ │ |
| 84 | │ │ │ Domain Throttle │ │ │ | 84 | │ │ │ Domain Throttle │ │ │ |
| 85 | │ │ │ (rate limiting only) │ │ │ | ||
| 85 | │ │ │ │ │ │ | 86 | │ │ │ │ │ │ |
| 86 | │ │ │ Per-domain state: │ │ │ | 87 | │ │ │ Per-domain state: │ │ │ |
| 87 | │ │ │ - 5 concurrent requests max │ │ │ | 88 | │ │ │ - in_flight: u32 │ │ │ |
| 88 | │ │ │ - 30 requests/min sliding window │ │ │ | 89 | │ │ │ - request_times: VecDeque │ │ │ |
| 89 | │ │ │ │ │ │ | ||
| 90 | │ │ │ Per (domain, identifier) state: │ │ │ | ||
| 91 | │ │ │ - in_progress: bool │ │ │ | ||
| 92 | │ │ │ - urls_tried: HashSet<String> │ │ │ | ||
| 93 | │ │ │ │ │ │ | 90 | │ │ │ │ │ │ |
| 94 | │ │ │ Identifier removed when all URLs │ │ │ | 91 | │ │ │ Round-robin via: │ │ │ |
| 95 | │ │ │ tried. Re-added on next sync. │ │ │ | 92 | │ │ │ - last_used_index per domain │ │ │ |
| 93 | │ │ │ - Caller passes tried_urls │ │ │ | ||
| 96 | │ │ └─────────────────────────────────────┘ │ │ | 94 | │ │ └─────────────────────────────────────┘ │ │ |
| 97 | │ │ │ │ | 95 | │ │ │ │ |
| 98 | │ └───────────────────────────────────────────────────────────────────────┘ │ | 96 | │ └───────────────────────────────────────────────────────────────────────┘ │ |
| 99 | └──────────────────────────────────────────────────────────────────────────────┘ | 97 | └──────────────────────────────────────────────────────────────────────────────┘ |
| 100 | ``` | 98 | ``` |
| 101 | 99 | ||
| 100 | ### Key Design Principle: Separation of Concerns | ||
| 101 | |||
| 102 | The previous design conflated two concerns in `DomainThrottle`: | ||
| 103 | 1. **Rate limiting** (per-domain): How many requests can we make to a domain? | ||
| 104 | 2. **URL tracking** (per-identifier): Which URLs have we tried for this sync? | ||
| 105 | |||
| 106 | The new design cleanly separates these: | ||
| 107 | |||
| 108 | - **`DomainThrottle`**: Only handles rate limiting. Tracks in-flight requests and request timestamps per domain. Uses round-robin internally to distribute load across URLs. | ||
| 109 | - **`sync_identifier`**: Owns its `tried_urls: HashSet<String>`. Passes this to the throttle when requesting a URL to try. | ||
| 110 | |||
| 111 | This separation enables: | ||
| 112 | - **Unit testing** of sync logic with a mock throttle | ||
| 113 | - **Simpler state management** - throttle doesn't need cleanup when identifiers complete | ||
| 114 | - **Clearer reasoning** about each component's responsibility | ||
| 115 | |||
| 102 | ### Flow Summary | 116 | ### Flow Summary |
| 103 | 117 | ||
| 104 | 1. **Event arrives** → added to state_events/pr_events + sync_queue with delay | 118 | 1. **Event arrives** → added to state_events/pr_events + sync_queue with delay |
| @@ -109,29 +123,21 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**, | |||
| 109 | 2. **Main sync loop** (every 1s): | 123 | 2. **Main sync loop** (every 1s): |
| 110 | - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`) | 124 | - Finds ALL ready identifiers (where `!in_progress && next_attempt <= now`) |
| 111 | - Spawns parallel tasks for each (marks `in_progress = true`) | 125 | - Spawns parallel tasks for each (marks `in_progress = true`) |
| 112 | - Each `sync_identifier()` task runs a while loop: | 126 | - Each `sync_identifier()` task: |
| 113 | - Recalculates URLs and OIDs fresh each iteration (catches new events/announcements) | 127 | - Creates fresh `tried_urls: HashSet<String>` |
| 114 | - For each domain, gets untried URLs (via `domain_throttle`) | 128 | - Loops calling `sync_identifier_step()` until complete |
| 115 | - Skips domains that are fully throttled | 129 | - Step returns `(Option<url_tried>, complete)` - clean testable interface |
| 116 | - Tries non-throttled domains first, then throttled if still need OIDs | 130 | - When task completes: apply backoff or remove from queue |
| 117 | - Respects per-domain concurrency (5 max) and rate limits (30/min) | ||
| 118 | - Processes satisfiable events | ||
| 119 | - Continues while: events remain AND OIDs missing AND untried URLs exist | ||
| 120 | - When task completes (regardless of outcome): | ||
| 121 | - If `next_attempt` is in the future (new event arrived): just clear `in_progress` | ||
| 122 | - Otherwise: increment `attempt_count`, set `next_attempt = now + backoff`, clear `in_progress` | ||
| 123 | - If no pending events remain: remove identifier from sync_queue | ||
| 124 | 131 | ||
| 125 | 3. **Domain throttle state management**: | 132 | 3. **Domain throttle**: |
| 126 | - Tracks `(in_progress, urls_tried)` per `(domain, identifier)` pair | 133 | - Pure rate limiting: tracks in_flight count and request timestamps per domain |
| 127 | - `in_progress` prevents parallel fetches to same domain for same identifier | 134 | - `get_next_url()` takes `available_urls` and `tried_urls`, returns next URL to try |
| 128 | - `urls_tried` tracks which URLs have been attempted | 135 | - Uses round-robin internally to distribute load |
| 129 | - When all URLs for a domain+identifier are tried, that entry is removed | 136 | - No per-identifier state needed |
| 130 | - Entry may be re-added on next sync attempt if new URLs available | ||
| 131 | 137 | ||
| 132 | ### New Data Structures | 138 | ## Data Structures |
| 133 | 139 | ||
| 134 | #### SyncQueueEntry | 140 | ### SyncQueueEntry |
| 135 | 141 | ||
| 136 | Tracks sync state for each identifier in the main sync queue: | 142 | Tracks sync state for each identifier in the main sync queue: |
| 137 | 143 | ||
| @@ -140,7 +146,6 @@ Tracks sync state for each identifier in the main sync queue: | |||
| 140 | #[derive(Debug, Clone)] | 146 | #[derive(Debug, Clone)] |
| 141 | pub struct SyncQueueEntry { | 147 | pub struct SyncQueueEntry { |
| 142 | /// Don't attempt sync before this time | 148 | /// Don't attempt sync before this time |
| 143 | /// Set for: initial delay (3min user / 500ms sync), backoff after attempts | ||
| 144 | pub next_attempt: Instant, | 149 | pub next_attempt: Instant, |
| 145 | 150 | ||
| 146 | /// Number of sync attempts (for backoff calculation) | 151 | /// Number of sync attempts (for backoff calculation) |
| @@ -148,12 +153,10 @@ pub struct SyncQueueEntry { | |||
| 148 | pub attempt_count: u32, | 153 | pub attempt_count: u32, |
| 149 | 154 | ||
| 150 | /// Whether a sync is currently in progress for this identifier | 155 | /// Whether a sync is currently in progress for this identifier |
| 151 | /// Prevents concurrent sync runs for the same identifier | ||
| 152 | pub in_progress: bool, | 156 | pub in_progress: bool, |
| 153 | } | 157 | } |
| 154 | 158 | ||
| 155 | impl SyncQueueEntry { | 159 | impl SyncQueueEntry { |
| 156 | /// Create new entry with specified delay | ||
| 157 | pub fn new(delay: Duration) -> Self { | 160 | pub fn new(delay: Duration) -> Self { |
| 158 | Self { | 161 | Self { |
| 159 | next_attempt: Instant::now() + delay, | 162 | next_attempt: Instant::now() + delay, |
| @@ -162,97 +165,78 @@ impl SyncQueueEntry { | |||
| 162 | } | 165 | } |
| 163 | } | 166 | } |
| 164 | 167 | ||
| 165 | /// Calculate backoff duration: 20s, 40s, 80s, 120s (max 2min) | 168 | /// Calculate backoff: 20s, 40s, 80s, 120s (capped at 2min) |
| 166 | pub fn backoff(&self) -> Duration { | 169 | pub fn backoff(&self) -> Duration { |
| 167 | let base = Duration::from_secs(20); | 170 | let base = Duration::from_secs(20); |
| 168 | let multiplier = 2u32.saturating_pow(self.attempt_count.saturating_sub(1).min(3)); | 171 | let multiplier = 2u32.saturating_pow(self.attempt_count.saturating_sub(1).min(3)); |
| 169 | let backoff = base * multiplier; | 172 | (base * multiplier).min(Duration::from_secs(120)) |
| 170 | backoff.min(Duration::from_secs(120)) // Cap at 2 minutes | ||
| 171 | } | 173 | } |
| 172 | 174 | ||
| 173 | /// Check if this entry is ready to sync (not in progress, delay passed) | ||
| 174 | pub fn is_ready(&self) -> bool { | 175 | pub fn is_ready(&self) -> bool { |
| 175 | !self.in_progress && Instant::now() >= self.next_attempt | 176 | !self.in_progress && Instant::now() >= self.next_attempt |
| 176 | } | 177 | } |
| 177 | 178 | ||
| 178 | /// Called when new event arrives - resets attempt_count, may update next_attempt | 179 | /// Called when new event arrives - resets attempt_count |
| 179 | pub fn on_new_event(&mut self, delay: Duration) { | 180 | pub fn on_new_event(&mut self, delay: Duration) { |
| 180 | self.attempt_count = 0; | 181 | self.attempt_count = 0; |
| 181 | let new_attempt = Instant::now() + delay; | 182 | let new_attempt = Instant::now() + delay; |
| 182 | // Only bring forward if new time is sooner | ||
| 183 | if new_attempt < self.next_attempt { | 183 | if new_attempt < self.next_attempt { |
| 184 | self.next_attempt = new_attempt; | 184 | self.next_attempt = new_attempt; |
| 185 | } | 185 | } |
| 186 | } | 186 | } |
| 187 | 187 | ||
| 188 | /// Called when sync attempt completes | 188 | /// Called when sync attempt completes |
| 189 | /// If next_attempt is in the future (new event arrived during sync), just clear in_progress | ||
| 190 | /// Otherwise, apply backoff | ||
| 191 | pub fn on_sync_complete(&mut self) { | 189 | pub fn on_sync_complete(&mut self) { |
| 192 | self.in_progress = false; | 190 | self.in_progress = false; |
| 193 | let now = Instant::now(); | 191 | if self.next_attempt <= Instant::now() { |
| 194 | if self.next_attempt <= now { | ||
| 195 | // No new event arrived during sync - apply backoff | ||
| 196 | self.attempt_count += 1; | 192 | self.attempt_count += 1; |
| 197 | self.next_attempt = now + self.backoff(); | 193 | self.next_attempt = Instant::now() + self.backoff(); |
| 198 | } | 194 | } |
| 199 | // else: new event arrived during sync, next_attempt already set, don't apply backoff | ||
| 200 | } | 195 | } |
| 201 | } | 196 | } |
| 202 | ``` | 197 | ``` |
| 203 | 198 | ||
| 204 | #### DomainThrottle | 199 | ### DomainThrottle (Rate Limiting Only) |
| 205 | |||
| 206 | Tracks per-domain rate limiting and per-(domain, identifier) fetch state: | ||
| 207 | 200 | ||
| 208 | ```rust | 201 | ```rust |
| 209 | /// Tracks domain-level rate limiting and per-identifier fetch state | 202 | /// Domain-level rate limiting with round-robin URL selection. |
| 210 | /// | 203 | /// |
| 211 | /// Rate limits: 5 concurrent requests, 30 requests/minute per domain | 204 | /// This struct ONLY handles rate limiting. It does not track which URLs |
| 205 | /// have been tried - that's the caller's responsibility. | ||
| 212 | /// | 206 | /// |
| 213 | /// Per (domain, identifier): tracks which URLs have been tried and whether | 207 | /// Rate limits: 5 concurrent requests, 30 requests/minute per domain |
| 214 | /// a fetch is currently in progress. When all URLs are tried, the identifier | ||
| 215 | /// is removed from that domain's tracking. It may be re-added on the next | ||
| 216 | /// sync attempt if new URLs become available. | ||
| 217 | pub struct DomainThrottle { | 208 | pub struct DomainThrottle { |
| 218 | /// Per-domain, per-identifier state: (in_progress, urls_tried) | 209 | /// In-flight request count per domain |
| 219 | /// domain -> identifier -> (in_progress, urls_tried) | ||
| 220 | state: DashMap<String, DashMap<String, (bool, HashSet<String>)>>, | ||
| 221 | |||
| 222 | /// Count of currently in-flight requests per domain | ||
| 223 | in_flight: DashMap<String, u32>, | 210 | in_flight: DashMap<String, u32>, |
| 224 | 211 | ||
| 225 | /// Request timestamps per domain (sliding window for rate limiting) | 212 | /// Request timestamps per domain (sliding window) |
| 226 | request_times: DashMap<String, VecDeque<Instant>>, | 213 | request_times: DashMap<String, VecDeque<Instant>>, |
| 227 | 214 | ||
| 228 | /// Maximum concurrent requests per domain | 215 | /// Round-robin index per domain (for fair URL distribution) |
| 229 | max_concurrent: u32, | 216 | round_robin_index: DashMap<String, usize>, |
| 230 | 217 | ||
| 231 | /// Maximum requests per minute per domain | 218 | max_concurrent: u32, |
| 232 | max_per_minute: u32, | 219 | max_per_minute: u32, |
| 233 | } | 220 | } |
| 234 | 221 | ||
| 235 | impl DomainThrottle { | 222 | impl DomainThrottle { |
| 236 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { | 223 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self { |
| 237 | Self { | 224 | Self { |
| 238 | state: DashMap::new(), | ||
| 239 | in_flight: DashMap::new(), | 225 | in_flight: DashMap::new(), |
| 240 | request_times: DashMap::new(), | 226 | request_times: DashMap::new(), |
| 227 | round_robin_index: DashMap::new(), | ||
| 241 | max_concurrent, | 228 | max_concurrent, |
| 242 | max_per_minute, | 229 | max_per_minute, |
| 243 | } | 230 | } |
| 244 | } | 231 | } |
| 245 | 232 | ||
| 246 | /// Check if domain has capacity for another request | 233 | /// Check if domain has capacity for another request |
| 247 | /// (both concurrent limit and rate limit) | ||
| 248 | pub fn has_capacity(&self, domain: &str) -> bool { | 234 | pub fn has_capacity(&self, domain: &str) -> bool { |
| 249 | // Check concurrent limit | 235 | let in_flight = self.in_flight.get(domain).map_or(0, |v| *v); |
| 250 | let current_in_flight = self.in_flight.get(domain).map_or(0, |v| *v); | 236 | if in_flight >= self.max_concurrent { |
| 251 | if current_in_flight >= self.max_concurrent { | ||
| 252 | return false; | 237 | return false; |
| 253 | } | 238 | } |
| 254 | 239 | ||
| 255 | // Check rate limit (sliding window) | ||
| 256 | let now = Instant::now(); | 240 | let now = Instant::now(); |
| 257 | let window = Duration::from_secs(60); | 241 | let window = Duration::from_secs(60); |
| 258 | self.request_times | 242 | self.request_times |
| @@ -263,90 +247,56 @@ impl DomainThrottle { | |||
| 263 | }) | 247 | }) |
| 264 | } | 248 | } |
| 265 | 249 | ||
| 266 | /// Check if identifier is currently fetching from this domain | 250 | /// Get next URL to try from available URLs, excluding already-tried URLs. |
| 267 | pub fn is_in_progress(&self, domain: &str, identifier: &str) -> bool { | 251 | /// Uses round-robin to distribute load across URLs for a domain. |
| 268 | self.state | 252 | /// |
| 269 | .get(domain) | 253 | /// Returns None if: |
| 270 | .and_then(|domain_state| domain_state.get(identifier)) | 254 | /// - Domain is at capacity (rate limited) |
| 271 | .map_or(false, |entry| entry.0) | 255 | /// - All available URLs have been tried |
| 272 | } | 256 | pub fn get_next_url( |
| 273 | 257 | &self, | |
| 274 | /// Get untried URLs for an identifier from a specific domain | 258 | domain: &str, |
| 275 | /// Returns URLs from `available_urls` that haven't been tried yet | 259 | available_urls: &[String], |
| 276 | pub fn get_untried_urls( | 260 | tried_urls: &HashSet<String>, |
| 277 | &self, | 261 | ) -> Option<String> { |
| 278 | domain: &str, | ||
| 279 | identifier: &str, | ||
| 280 | available_urls: &[String] | ||
| 281 | ) -> Vec<String> { | ||
| 282 | let tried = self.state | ||
| 283 | .get(domain) | ||
| 284 | .and_then(|domain_state| domain_state.get(identifier)) | ||
| 285 | .map(|entry| entry.1.clone()) | ||
| 286 | .unwrap_or_default(); | ||
| 287 | |||
| 288 | available_urls | ||
| 289 | .iter() | ||
| 290 | .filter(|url| !tried.contains(*url)) | ||
| 291 | .cloned() | ||
| 292 | .collect() | ||
| 293 | } | ||
| 294 | |||
| 295 | /// Mark a URL as being fetched (in progress) | ||
| 296 | /// Returns false if domain has no capacity or identifier already in progress for this domain | ||
| 297 | pub fn start_fetch(&self, domain: &str, identifier: &str, url: &str) -> bool { | ||
| 298 | // Check capacity | ||
| 299 | if !self.has_capacity(domain) { | 262 | if !self.has_capacity(domain) { |
| 300 | return false; | 263 | return None; |
| 301 | } | 264 | } |
| 302 | 265 | ||
| 303 | // Check if already in progress for this domain+identifier | 266 | // Filter to untried URLs |
| 304 | if self.is_in_progress(domain, identifier) { | 267 | let untried: Vec<_> = available_urls |
| 305 | return false; | 268 | .iter() |
| 269 | .filter(|url| !tried_urls.contains(*url)) | ||
| 270 | .collect(); | ||
| 271 | |||
| 272 | if untried.is_empty() { | ||
| 273 | return None; | ||
| 306 | } | 274 | } |
| 307 | 275 | ||
| 308 | // Increment in-flight counter | 276 | // Round-robin selection |
| 309 | *self.in_flight.entry(domain.to_string()).or_insert(0) += 1; | 277 | let mut index = self.round_robin_index.entry(domain.to_string()).or_insert(0); |
| 278 | let selected_index = *index % untried.len(); | ||
| 279 | *index = (*index + 1) % untried.len(); | ||
| 310 | 280 | ||
| 311 | // Record request time | 281 | Some(untried[selected_index].clone()) |
| 282 | } | ||
| 283 | |||
| 284 | /// Record that a request is starting | ||
| 285 | pub fn start_request(&self, domain: &str) { | ||
| 286 | *self.in_flight.entry(domain.to_string()).or_insert(0) += 1; | ||
| 312 | self.request_times | 287 | self.request_times |
| 313 | .entry(domain.to_string()) | 288 | .entry(domain.to_string()) |
| 314 | .or_default() | 289 | .or_default() |
| 315 | .push_back(Instant::now()); | 290 | .push_back(Instant::now()); |
| 316 | |||
| 317 | // Mark in progress and add URL to tried set | ||
| 318 | self.state | ||
| 319 | .entry(domain.to_string()) | ||
| 320 | .or_default() | ||
| 321 | .entry(identifier.to_string()) | ||
| 322 | .and_modify(|(in_progress, tried)| { | ||
| 323 | *in_progress = true; | ||
| 324 | tried.insert(url.to_string()); | ||
| 325 | }) | ||
| 326 | .or_insert_with(|| { | ||
| 327 | let mut tried = HashSet::new(); | ||
| 328 | tried.insert(url.to_string()); | ||
| 329 | (true, tried) | ||
| 330 | }); | ||
| 331 | |||
| 332 | true | ||
| 333 | } | 291 | } |
| 334 | 292 | ||
| 335 | /// Mark a fetch as complete | 293 | /// Record that a request completed |
| 336 | pub fn complete_fetch(&self, domain: &str, identifier: &str) { | 294 | pub fn complete_request(&self, domain: &str) { |
| 337 | // Decrement in-flight counter | ||
| 338 | if let Some(mut count) = self.in_flight.get_mut(domain) { | 295 | if let Some(mut count) = self.in_flight.get_mut(domain) { |
| 339 | *count = count.saturating_sub(1); | 296 | *count = count.saturating_sub(1); |
| 340 | } | 297 | } |
| 341 | 298 | ||
| 342 | // Clear in_progress flag | 299 | // Clean old timestamps |
| 343 | if let Some(domain_state) = self.state.get(domain) { | ||
| 344 | if let Some(mut entry) = domain_state.get_mut(identifier) { | ||
| 345 | entry.0 = false; | ||
| 346 | } | ||
| 347 | } | ||
| 348 | |||
| 349 | // Clean old request times | ||
| 350 | let now = Instant::now(); | 300 | let now = Instant::now(); |
| 351 | let window = Duration::from_secs(60); | 301 | let window = Duration::from_secs(60); |
| 352 | if let Some(mut times) = self.request_times.get_mut(domain) { | 302 | if let Some(mut times) = self.request_times.get_mut(domain) { |
| @@ -356,135 +306,277 @@ impl DomainThrottle { | |||
| 356 | } | 306 | } |
| 357 | } | 307 | } |
| 358 | 308 | ||
| 359 | /// Check if all URLs have been tried for this domain+identifier | 309 | /// Get time until domain has capacity (for scheduling retries) |
| 360 | pub fn all_urls_tried( | 310 | pub fn time_until_capacity(&self, domain: &str) -> Option<Duration> { |
| 361 | &self, | 311 | // Check concurrent limit first |
| 362 | domain: &str, | 312 | let in_flight = self.in_flight.get(domain).map_or(0, |v| *v); |
| 363 | identifier: &str, | 313 | if in_flight >= self.max_concurrent { |
| 364 | available_urls: &[String] | 314 | // Can't predict when a request will complete |
| 365 | ) -> bool { | 315 | return Some(Duration::from_millis(100)); |
| 366 | self.get_untried_urls(domain, identifier, available_urls).is_empty() | ||
| 367 | } | ||
| 368 | |||
| 369 | /// Remove identifier from a domain's tracking (called when all URLs tried) | ||
| 370 | pub fn remove_identifier_from_domain(&self, domain: &str, identifier: &str) { | ||
| 371 | if let Some(domain_state) = self.state.get(domain) { | ||
| 372 | domain_state.remove(identifier); | ||
| 373 | } | 316 | } |
| 374 | } | 317 | |
| 375 | 318 | // Check rate limit | |
| 376 | /// Remove identifier from all domains (called when sync complete or events expired) | 319 | let now = Instant::now(); |
| 377 | pub fn remove_identifier(&self, identifier: &str) { | 320 | let window = Duration::from_secs(60); |
| 378 | for domain_entry in self.state.iter() { | 321 | if let Some(times) = self.request_times.get(domain) { |
| 379 | domain_entry.value().remove(identifier); | 322 | let recent_count = times.iter().filter(|t| now.duration_since(**t) < window).count(); |
| 323 | if recent_count >= self.max_per_minute as usize { | ||
| 324 | // Find oldest request in window, wait until it expires | ||
| 325 | if let Some(oldest) = times.front() { | ||
| 326 | let age = now.duration_since(*oldest); | ||
| 327 | if age < window { | ||
| 328 | return Some(window - age); | ||
| 329 | } | ||
| 330 | } | ||
| 331 | } | ||
| 380 | } | 332 | } |
| 381 | } | 333 | |
| 382 | 334 | None // Has capacity now | |
| 383 | /// Get all domains where this identifier has untried URLs | ||
| 384 | pub fn domains_with_untried_urls(&self, identifier: &str) -> Vec<String> { | ||
| 385 | self.state | ||
| 386 | .iter() | ||
| 387 | .filter(|entry| entry.value().contains_key(identifier)) | ||
| 388 | .map(|entry| entry.key().clone()) | ||
| 389 | .collect() | ||
| 390 | } | 335 | } |
| 391 | } | 336 | } |
| 392 | ``` | 337 | ``` |
| 393 | 338 | ||
| 394 | ### Modified Purgatory Structure | 339 | ### SyncContext Trait (For Testability) |
| 395 | |||
| 396 | ```rust | ||
| 397 | pub struct Purgatory { | ||
| 398 | /// State events (kind 30618) indexed by repository identifier | ||
| 399 | state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>, | ||
| 400 | 340 | ||
| 401 | /// PR events (kind 1617/1618) indexed by event ID | 341 | Abstract the external dependencies to enable unit testing: |
| 402 | pr_events: Arc<DashMap<String, PrPurgatoryEntry>>, | ||
| 403 | 342 | ||
| 404 | /// NEW: Sync queue - identifiers pending git data sync | 343 | ```rust |
| 405 | sync_queue: Arc<DashMap<String, SyncQueueEntry>>, | 344 | /// Abstraction over external dependencies for sync operations. |
| 345 | /// | ||
| 346 | /// This trait allows unit testing of sync logic by mocking: | ||
| 347 | /// - Repository data fetching | ||
| 348 | /// - OID existence checks | ||
| 349 | /// - Git fetch operations | ||
| 350 | /// - Event processing | ||
| 351 | #[async_trait] | ||
| 352 | pub trait SyncContext: Send + Sync { | ||
| 353 | /// Get repository data (announcements, clone URLs, etc.) | ||
| 354 | async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData>; | ||
| 406 | 355 | ||
| 407 | /// NEW: Domain-level throttling and per-identifier fetch state | 356 | /// Get all OIDs needed for purgatory events with this identifier |
| 408 | domain_throttle: Arc<DomainThrottle>, | 357 | fn collect_needed_oids(&self, identifier: &str) -> HashSet<String>; |
| 409 | 358 | ||
| 410 | git_data_path: PathBuf, | 359 | /// Check if an OID exists locally |
| 360 | fn oid_exists(&self, repo_path: &Path, oid: &str) -> bool; | ||
| 361 | |||
| 362 | /// Fetch OIDs from a remote server | ||
| 363 | async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>; | ||
| 364 | |||
| 365 | /// Process events that can now be satisfied (save to DB, notify, remove from purgatory) | ||
| 366 | async fn process_satisfiable_events(&self, identifier: &str) -> Result<()>; | ||
| 367 | |||
| 368 | /// Check if there are still pending events for this identifier | ||
| 369 | fn has_pending_events(&self, identifier: &str) -> bool; | ||
| 370 | |||
| 371 | /// Find the best local repo to fetch into | ||
| 372 | fn find_target_repo(&self, db_repo_data: &RepositoryData) -> Option<PathBuf>; | ||
| 373 | |||
| 374 | /// Our domain (to exclude from clone URLs) | ||
| 375 | fn our_domain(&self) -> Option<&str>; | ||
| 411 | } | 376 | } |
| 412 | ``` | 377 | ``` |
| 413 | 378 | ||
| 414 | ### Key Methods | 379 | ## Core Sync Logic |
| 380 | |||
| 381 | ### The Sync Step Function | ||
| 415 | 382 | ||
| 416 | #### Adding Events to Purgatory | 383 | This is the key abstraction that enables clean testing: |
| 417 | 384 | ||
| 418 | ```rust | 385 | ```rust |
| 419 | impl Purgatory { | 386 | /// Result of a single sync step |
| 420 | /// Add a state event to purgatory (user-submitted, 3min delay) | 387 | #[derive(Debug, Clone, PartialEq)] |
| 421 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | 388 | pub enum SyncStepResult { |
| 422 | // ... existing logic to add to state_events ... | 389 | /// Successfully tried a URL, may or may not have fetched OIDs |
| 423 | 390 | TriedUrl { url: String, oids_fetched: usize }, | |
| 424 | // Add to sync queue with 3 minute delay | 391 | |
| 425 | self.enqueue_sync(&identifier, Duration::from_secs(180)); | 392 | /// All available URLs have been tried, need to wait for throttle |
| 393 | AllUrlsThrottled { wait_duration: Duration }, | ||
| 394 | |||
| 395 | /// No more URLs to try (all exhausted) | ||
| 396 | NoMoreUrls, | ||
| 397 | |||
| 398 | /// All OIDs are now available, sync complete | ||
| 399 | Complete, | ||
| 400 | |||
| 401 | /// No pending events remain | ||
| 402 | NoPendingEvents, | ||
| 403 | } | ||
| 404 | |||
| 405 | /// Execute one step of the sync process. | ||
| 406 | /// | ||
| 407 | /// This function is pure logic - all I/O goes through the SyncContext trait. | ||
| 408 | /// This makes it trivially unit testable. | ||
| 409 | pub async fn sync_identifier_step<C: SyncContext>( | ||
| 410 | ctx: &C, | ||
| 411 | identifier: &str, | ||
| 412 | tried_urls: &HashSet<String>, | ||
| 413 | throttle: &DomainThrottle, | ||
| 414 | ) -> Result<SyncStepResult> { | ||
| 415 | // 1. Check if we still have pending events | ||
| 416 | if !ctx.has_pending_events(identifier) { | ||
| 417 | return Ok(SyncStepResult::NoPendingEvents); | ||
| 426 | } | 418 | } |
| 427 | 419 | ||
| 428 | /// Add a PR event to purgatory (user-submitted, 3min delay) | 420 | // 2. Collect needed OIDs (fresh each step - may have changed) |
| 429 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 421 | let needed_oids = ctx.collect_needed_oids(identifier); |
| 430 | // ... existing logic to add to pr_events ... | 422 | if needed_oids.is_empty() { |
| 431 | 423 | // No OIDs needed - try to process events | |
| 432 | // Extract identifier from event's `a` tag and enqueue sync | 424 | ctx.process_satisfiable_events(identifier).await?; |
| 433 | if let Some(identifier) = extract_identifier_from_pr(&event) { | 425 | return Ok(SyncStepResult::Complete); |
| 434 | self.enqueue_sync(&identifier, Duration::from_secs(180)); | ||
| 435 | } | ||
| 436 | } | 426 | } |
| 437 | 427 | ||
| 438 | /// Trigger immediate sync for an identifier (called from negentropy sync) | 428 | // 3. Get repository data (fresh each step - announcements may have arrived) |
| 439 | /// Still applies 500ms debounce for batching burst arrivals | 429 | let db_repo_data = ctx.fetch_repository_data(identifier).await?; |
| 440 | pub fn trigger_immediate_sync(&self, identifier: &str) { | 430 | |
| 441 | self.enqueue_sync(identifier, Duration::from_millis(500)); | 431 | // 4. Collect clone URLs, excluding our domain |
| 432 | let all_urls: Vec<String> = db_repo_data | ||
| 433 | .announcements | ||
| 434 | .iter() | ||
| 435 | .flat_map(|a| a.clone_urls.iter().cloned()) | ||
| 436 | .filter(|url| ctx.our_domain().map_or(true, |d| !url.contains(d))) | ||
| 437 | .collect::<HashSet<_>>() | ||
| 438 | .into_iter() | ||
| 439 | .collect(); | ||
| 440 | |||
| 441 | if all_urls.is_empty() { | ||
| 442 | return Ok(SyncStepResult::NoMoreUrls); | ||
| 442 | } | 443 | } |
| 443 | 444 | ||
| 444 | /// Internal: Add identifier to sync queue with specified delay | 445 | // 5. Group by domain and find an available URL |
| 445 | fn enqueue_sync(&self, identifier: &str, delay: Duration) { | 446 | let urls_by_domain: HashMap<String, Vec<String>> = all_urls |
| 446 | self.sync_queue | 447 | .iter() |
| 447 | .entry(identifier.to_string()) | 448 | .fold(HashMap::new(), |mut acc, url| { |
| 448 | .and_modify(|entry| { | 449 | acc.entry(extract_domain(url)).or_default().push(url.clone()); |
| 449 | // New event arrived - reset backoff, potentially update timing | 450 | acc |
| 450 | entry.reset(delay); | 451 | }); |
| 451 | }) | 452 | |
| 452 | .or_insert_with(|| SyncQueueEntry::new(delay)); | 453 | // 6. Try to get a URL from any domain that has capacity |
| 454 | let mut min_wait: Option<Duration> = None; | ||
| 455 | |||
| 456 | for (domain, domain_urls) in &urls_by_domain { | ||
| 457 | if let Some(url) = throttle.get_next_url(domain, domain_urls, tried_urls) { | ||
| 458 | // Found a URL to try! | ||
| 459 | let target_repo = match ctx.find_target_repo(&db_repo_data) { | ||
| 460 | Some(path) => path, | ||
| 461 | None => return Ok(SyncStepResult::NoMoreUrls), | ||
| 462 | }; | ||
| 463 | |||
| 464 | // Start the fetch | ||
| 465 | throttle.start_request(domain); | ||
| 466 | let oids_to_fetch: Vec<String> = needed_oids.iter().cloned().collect(); | ||
| 467 | let fetch_result = ctx.fetch_oids(&target_repo, &url, &oids_to_fetch).await; | ||
| 468 | throttle.complete_request(domain); | ||
| 469 | |||
| 470 | let oids_fetched = match fetch_result { | ||
| 471 | Ok(fetched) => fetched.len(), | ||
| 472 | Err(e) => { | ||
| 473 | tracing::debug!(url = %url, error = %e, "Fetch failed"); | ||
| 474 | 0 | ||
| 475 | } | ||
| 476 | }; | ||
| 477 | |||
| 478 | // Try to process any events that can now be satisfied | ||
| 479 | if oids_fetched > 0 { | ||
| 480 | let _ = ctx.process_satisfiable_events(identifier).await; | ||
| 481 | } | ||
| 482 | |||
| 483 | return Ok(SyncStepResult::TriedUrl { url, oids_fetched }); | ||
| 484 | } else { | ||
| 485 | // Domain throttled or all URLs tried | ||
| 486 | let untried_exist = domain_urls.iter().any(|u| !tried_urls.contains(u)); | ||
| 487 | if untried_exist { | ||
| 488 | // URLs exist but domain is throttled | ||
| 489 | if let Some(wait) = throttle.time_until_capacity(domain) { | ||
| 490 | min_wait = Some(min_wait.map_or(wait, |m| m.min(wait))); | ||
| 491 | } | ||
| 492 | } | ||
| 493 | } | ||
| 494 | } | ||
| 495 | |||
| 496 | // Check if all URLs have been tried | ||
| 497 | let all_tried = all_urls.iter().all(|url| tried_urls.contains(url)); | ||
| 498 | if all_tried { | ||
| 499 | return Ok(SyncStepResult::NoMoreUrls); | ||
| 453 | } | 500 | } |
| 501 | |||
| 502 | // Some URLs exist but all domains are throttled | ||
| 503 | Ok(SyncStepResult::AllUrlsThrottled { | ||
| 504 | wait_duration: min_wait.unwrap_or(Duration::from_millis(100)), | ||
| 505 | }) | ||
| 454 | } | 506 | } |
| 455 | ``` | 507 | ``` |
| 456 | 508 | ||
| 457 | #### Extracting Identifier from PR Events | 509 | ### The Sync Identifier Loop |
| 458 | 510 | ||
| 459 | ```rust | 511 | ```rust |
| 460 | /// Extract repository identifier from PR event's `a` tag | 512 | /// Sync git data for an identifier. |
| 461 | fn extract_identifier_from_pr(event: &Event) -> Option<String> { | 513 | /// |
| 462 | event.tags.iter().find_map(|tag| { | 514 | /// Returns true if sync completed successfully (no more pending events), |
| 463 | let tag_vec = tag.clone().to_vec(); | 515 | /// false if we exhausted all options but events remain. |
| 464 | if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { | 516 | pub async fn sync_identifier<C: SyncContext>( |
| 465 | // Format: 30617:<owner>:<identifier> | 517 | ctx: &C, |
| 466 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | 518 | identifier: &str, |
| 467 | if parts.len() >= 3 { | 519 | throttle: &DomainThrottle, |
| 468 | return Some(parts[2].to_string()); | 520 | ) -> bool { |
| 521 | let mut tried_urls: HashSet<String> = HashSet::new(); | ||
| 522 | |||
| 523 | loop { | ||
| 524 | match sync_identifier_step(ctx, identifier, &tried_urls, throttle).await { | ||
| 525 | Ok(SyncStepResult::TriedUrl { url, oids_fetched }) => { | ||
| 526 | tried_urls.insert(url.clone()); | ||
| 527 | tracing::debug!( | ||
| 528 | identifier = %identifier, | ||
| 529 | url = %url, | ||
| 530 | oids_fetched = oids_fetched, | ||
| 531 | "Tried URL" | ||
| 532 | ); | ||
| 533 | // Continue looping | ||
| 534 | } | ||
| 535 | |||
| 536 | Ok(SyncStepResult::AllUrlsThrottled { wait_duration }) => { | ||
| 537 | tracing::debug!( | ||
| 538 | identifier = %identifier, | ||
| 539 | wait_ms = wait_duration.as_millis(), | ||
| 540 | "All domains throttled, waiting" | ||
| 541 | ); | ||
| 542 | tokio::time::sleep(wait_duration).await; | ||
| 543 | // Continue looping | ||
| 544 | } | ||
| 545 | |||
| 546 | Ok(SyncStepResult::NoMoreUrls) => { | ||
| 547 | tracing::debug!(identifier = %identifier, "No more URLs to try"); | ||
| 548 | return false; // Events remain but no URLs left | ||
| 549 | } | ||
| 550 | |||
| 551 | Ok(SyncStepResult::Complete) => { | ||
| 552 | tracing::info!(identifier = %identifier, "Sync complete"); | ||
| 553 | return true; | ||
| 554 | } | ||
| 555 | |||
| 556 | Ok(SyncStepResult::NoPendingEvents) => { | ||
| 557 | tracing::debug!(identifier = %identifier, "No pending events"); | ||
| 558 | return true; | ||
| 559 | } | ||
| 560 | |||
| 561 | Err(e) => { | ||
| 562 | tracing::warn!(identifier = %identifier, error = %e, "Sync step error"); | ||
| 563 | return false; | ||
| 469 | } | 564 | } |
| 470 | } | 565 | } |
| 471 | None | 566 | } |
| 472 | }) | ||
| 473 | } | 567 | } |
| 474 | ``` | 568 | ``` |
| 475 | 569 | ||
| 476 | ### Sync Loop | 570 | ### The Main Sync Loop |
| 477 | |||
| 478 | A single background loop handles syncing. The `DomainThrottle` is just state tracking, not a separate processing queue. | ||
| 479 | 571 | ||
| 480 | ```rust | 572 | ```rust |
| 481 | impl Purgatory { | 573 | impl Purgatory { |
| 482 | /// Start the background sync loop | ||
| 483 | pub fn start_sync_loop( | 574 | pub fn start_sync_loop( |
| 484 | self: Arc<Self>, | 575 | self: Arc<Self>, |
| 485 | database: SharedDatabase, | 576 | database: SharedDatabase, |
| 486 | our_domain: Option<String>, | 577 | our_domain: Option<String>, |
| 487 | local_relay: Option<nostr_relay_builder::LocalRelay>, | 578 | local_relay: Option<nostr_relay_builder::LocalRelay>, |
| 579 | throttle: Arc<DomainThrottle>, | ||
| 488 | ) -> tokio::task::JoinHandle<()> { | 580 | ) -> tokio::task::JoinHandle<()> { |
| 489 | tokio::spawn(async move { | 581 | tokio::spawn(async move { |
| 490 | let mut interval = tokio::time::interval(Duration::from_secs(1)); | 582 | let mut interval = tokio::time::interval(Duration::from_secs(1)); |
| @@ -492,60 +584,56 @@ impl Purgatory { | |||
| 492 | loop { | 584 | loop { |
| 493 | interval.tick().await; | 585 | interval.tick().await; |
| 494 | 586 | ||
| 495 | // Find ALL ready identifiers | 587 | // Find all ready identifiers |
| 496 | let ready_identifiers: Vec<String> = self.sync_queue | 588 | let ready: Vec<String> = self.sync_queue |
| 497 | .iter() | 589 | .iter() |
| 498 | .filter(|entry| entry.value().is_ready()) | 590 | .filter(|e| e.value().is_ready()) |
| 499 | .map(|entry| entry.key().clone()) | 591 | .map(|e| e.key().clone()) |
| 500 | .collect(); | 592 | .collect(); |
| 501 | 593 | ||
| 502 | // Spawn sync tasks in parallel for each ready identifier | 594 | for identifier in ready { |
| 503 | for identifier in ready_identifiers { | 595 | // Check if events still exist |
| 504 | // Check if there are still events in purgatory for this identifier | ||
| 505 | if !self.has_pending_events(&identifier) { | 596 | if !self.has_pending_events(&identifier) { |
| 506 | self.sync_queue.remove(&identifier); | 597 | self.sync_queue.remove(&identifier); |
| 507 | self.domain_throttle.remove_identifier(&identifier); | ||
| 508 | continue; | 598 | continue; |
| 509 | } | 599 | } |
| 510 | 600 | ||
| 511 | // Mark as in progress (prevents re-spawning on next tick) | 601 | // Mark in progress |
| 512 | if let Some(mut entry) = self.sync_queue.get_mut(&identifier) { | 602 | if let Some(mut entry) = self.sync_queue.get_mut(&identifier) { |
| 513 | if entry.in_progress { | 603 | if entry.in_progress { |
| 514 | continue; // Already running | 604 | continue; |
| 515 | } | 605 | } |
| 516 | entry.in_progress = true; | 606 | entry.in_progress = true; |
| 607 | } else { | ||
| 608 | continue; | ||
| 517 | } | 609 | } |
| 518 | 610 | ||
| 519 | // Spawn task for this identifier | 611 | // Spawn sync task |
| 520 | let purgatory = self.clone(); | 612 | let purgatory = self.clone(); |
| 521 | let db = database.clone(); | 613 | let db = database.clone(); |
| 522 | let domain = our_domain.clone(); | 614 | let domain = our_domain.clone(); |
| 523 | let relay = local_relay.clone(); | 615 | let relay = local_relay.clone(); |
| 616 | let throttle = throttle.clone(); | ||
| 524 | let id = identifier.clone(); | 617 | let id = identifier.clone(); |
| 525 | 618 | ||
| 526 | tokio::spawn(async move { | 619 | tokio::spawn(async move { |
| 527 | purgatory.sync_identifier( | 620 | // Create the real SyncContext implementation |
| 528 | &id, | 621 | let ctx = RealSyncContext::new( |
| 529 | &db, | 622 | purgatory.clone(), |
| 530 | domain.as_deref(), | 623 | db, |
| 531 | relay.as_ref(), | 624 | domain, |
| 532 | ).await; | 625 | relay, |
| 626 | ); | ||
| 533 | 627 | ||
| 534 | // Check if events remain | 628 | let complete = sync_identifier(&ctx, &id, &throttle).await; |
| 535 | if !purgatory.has_pending_events(&id) { | 629 | |
| 630 | if complete || !purgatory.has_pending_events(&id) { | ||
| 536 | purgatory.sync_queue.remove(&id); | 631 | purgatory.sync_queue.remove(&id); |
| 537 | purgatory.domain_throttle.remove_identifier(&id); | 632 | tracing::info!(identifier = %id, "Removed from sync queue"); |
| 538 | tracing::info!(identifier = %id, "Sync complete, removed from queues"); | ||
| 539 | } else { | 633 | } else { |
| 540 | // Apply backoff (or not, if new event arrived during sync) | 634 | // Apply backoff |
| 541 | if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { | 635 | if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { |
| 542 | entry.on_sync_complete(); | 636 | entry.on_sync_complete(); |
| 543 | tracing::debug!( | ||
| 544 | identifier = %id, | ||
| 545 | attempt = entry.attempt_count, | ||
| 546 | next_attempt_secs = entry.next_attempt.duration_since(Instant::now()).as_secs(), | ||
| 547 | "Sync attempt complete, scheduled next attempt" | ||
| 548 | ); | ||
| 549 | } | 637 | } |
| 550 | } | 638 | } |
| 551 | }); | 639 | }); |
| @@ -553,746 +641,239 @@ impl Purgatory { | |||
| 553 | } | 641 | } |
| 554 | }) | 642 | }) |
| 555 | } | 643 | } |
| 556 | |||
| 557 | /// Check if there are pending events for an identifier | ||
| 558 | fn has_pending_events(&self, identifier: &str) -> bool { | ||
| 559 | // Check state events | ||
| 560 | if self.state_events.get(identifier).map_or(false, |v| !v.is_empty()) { | ||
| 561 | return true; | ||
| 562 | } | ||
| 563 | |||
| 564 | // Check PR events (need to scan for matching identifier) | ||
| 565 | for entry in self.pr_events.iter() { | ||
| 566 | if let Some(ref event) = entry.value().event { | ||
| 567 | if extract_identifier_from_pr(event).as_deref() == Some(identifier) { | ||
| 568 | return true; | ||
| 569 | } | ||
| 570 | } | ||
| 571 | } | ||
| 572 | |||
| 573 | false | ||
| 574 | } | ||
| 575 | } | 644 | } |
| 576 | ``` | 645 | ``` |
| 577 | 646 | ||
| 578 | ### Core Sync Logic | 647 | ## Testing Strategy |
| 648 | |||
| 649 | ### Unit Tests for Sync Logic | ||
| 650 | |||
| 651 | The `SyncContext` trait enables pure unit tests without any I/O: | ||
| 579 | 652 | ||
| 580 | ```rust | 653 | ```rust |
| 581 | impl Purgatory { | 654 | #[cfg(test)] |
| 582 | /// Sync git data for all purgatory events with this identifier | 655 | mod tests { |
| 583 | /// | 656 | use super::*; |
| 584 | /// Uses a while loop that: | 657 | |
| 585 | /// 1. Recalculates URLs and OIDs fresh each iteration (catches new events/announcements) | 658 | /// Mock context for testing sync logic |
| 586 | /// 2. Gets untried URLs per domain from DomainThrottle | 659 | struct MockSyncContext { |
| 587 | /// 3. Skips domains that are fully throttled, tries available ones | 660 | pending_events: RefCell<bool>, |
| 588 | /// 4. Continues while: events remain AND OIDs missing AND untried URLs exist | 661 | needed_oids: RefCell<HashSet<String>>, |
| 589 | async fn sync_identifier( | 662 | available_urls: Vec<String>, |
| 590 | &self, | 663 | fetch_results: RefCell<HashMap<String, Vec<String>>>, |
| 591 | identifier: &str, | 664 | processed_count: RefCell<usize>, |
| 592 | database: &SharedDatabase, | ||
| 593 | our_domain: Option<&str>, | ||
| 594 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 595 | ) { | ||
| 596 | loop { | ||
| 597 | // 1. Check if any events remain (may have expired or been processed) | ||
| 598 | if !self.has_pending_events(identifier) { | ||
| 599 | return; | ||
| 600 | } | ||
| 601 | |||
| 602 | // 2. Collect all OIDs needed (fresh calculation each iteration) | ||
| 603 | let needed_oids = self.collect_needed_oids(identifier); | ||
| 604 | |||
| 605 | if needed_oids.is_empty() { | ||
| 606 | // No OIDs needed - try to process events and exit | ||
| 607 | self.try_process_events(identifier, database, our_domain, local_relay).await; | ||
| 608 | return; | ||
| 609 | } | ||
| 610 | |||
| 611 | // 3. Get repository data and clone URLs (fresh calculation each iteration) | ||
| 612 | let db_repo_data = match fetch_repository_data(database, identifier).await { | ||
| 613 | Ok(data) => data, | ||
| 614 | Err(e) => { | ||
| 615 | tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data"); | ||
| 616 | return; | ||
| 617 | } | ||
| 618 | }; | ||
| 619 | |||
| 620 | // 4. Collect clone URLs, excluding our domain | ||
| 621 | let all_clone_urls: Vec<String> = db_repo_data | ||
| 622 | .announcements | ||
| 623 | .iter() | ||
| 624 | .flat_map(|a| a.clone_urls.iter().cloned()) | ||
| 625 | .filter(|url| our_domain.map_or(true, |d| !url.contains(d))) | ||
| 626 | .collect::<HashSet<_>>() | ||
| 627 | .into_iter() | ||
| 628 | .collect(); | ||
| 629 | |||
| 630 | if all_clone_urls.is_empty() { | ||
| 631 | tracing::debug!(identifier = %identifier, "No external clone URLs available"); | ||
| 632 | return; | ||
| 633 | } | ||
| 634 | |||
| 635 | // 5. Group URLs by domain | ||
| 636 | let urls_by_domain: HashMap<String, Vec<String>> = all_clone_urls | ||
| 637 | .iter() | ||
| 638 | .fold(HashMap::new(), |mut acc, url| { | ||
| 639 | let domain = extract_domain(url); | ||
| 640 | acc.entry(domain).or_default().push(url.clone()); | ||
| 641 | acc | ||
| 642 | }); | ||
| 643 | |||
| 644 | // 6. Find best local repo to fetch into | ||
| 645 | let target_repo = match self.find_target_repo(&db_repo_data, identifier) { | ||
| 646 | Some(path) => path, | ||
| 647 | None => { | ||
| 648 | tracing::debug!(identifier = %identifier, "No local repository found"); | ||
| 649 | return; | ||
| 650 | } | ||
| 651 | }; | ||
| 652 | |||
| 653 | // 7. Partition domains: available (have capacity) vs throttled | ||
| 654 | let (available_domains, throttled_domains): (Vec<_>, Vec<_>) = urls_by_domain | ||
| 655 | .keys() | ||
| 656 | .cloned() | ||
| 657 | .partition(|domain| self.domain_throttle.has_capacity(domain)); | ||
| 658 | |||
| 659 | let mut remaining_oids: HashSet<String> = needed_oids.clone(); | ||
| 660 | let mut any_fetch_started = false; | ||
| 661 | |||
| 662 | // 8. Try available domains first | ||
| 663 | for domain in &available_domains { | ||
| 664 | if remaining_oids.is_empty() { | ||
| 665 | break; | ||
| 666 | } | ||
| 667 | |||
| 668 | let domain_urls = urls_by_domain.get(domain).unwrap(); | ||
| 669 | let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls); | ||
| 670 | |||
| 671 | for url in untried_urls { | ||
| 672 | if remaining_oids.is_empty() { | ||
| 673 | break; | ||
| 674 | } | ||
| 675 | |||
| 676 | // Skip if already in progress for this domain+identifier | ||
| 677 | if self.domain_throttle.is_in_progress(domain, identifier) { | ||
| 678 | break; // Wait for current fetch to complete | ||
| 679 | } | ||
| 680 | |||
| 681 | // Try to start fetch (checks capacity again, marks in_progress) | ||
| 682 | if !self.domain_throttle.start_fetch(domain, identifier, &url) { | ||
| 683 | break; // Domain at capacity | ||
| 684 | } | ||
| 685 | |||
| 686 | any_fetch_started = true; | ||
| 687 | |||
| 688 | // Fetch OIDs | ||
| 689 | let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect(); | ||
| 690 | let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await; | ||
| 691 | |||
| 692 | // Mark fetch complete | ||
| 693 | self.domain_throttle.complete_fetch(domain, identifier); | ||
| 694 | |||
| 695 | match fetch_result { | ||
| 696 | Ok(fetched_oids) => { | ||
| 697 | if !fetched_oids.is_empty() { | ||
| 698 | let fetched_count = fetched_oids.len(); | ||
| 699 | for oid in fetched_oids { | ||
| 700 | remaining_oids.remove(&oid); | ||
| 701 | } | ||
| 702 | tracing::info!( | ||
| 703 | identifier = %identifier, | ||
| 704 | url = %url, | ||
| 705 | fetched = fetched_count, | ||
| 706 | remaining = remaining_oids.len(), | ||
| 707 | "Fetched OIDs from server" | ||
| 708 | ); | ||
| 709 | } | ||
| 710 | } | ||
| 711 | Err(e) => { | ||
| 712 | tracing::debug!( | ||
| 713 | identifier = %identifier, | ||
| 714 | url = %url, | ||
| 715 | error = %e, | ||
| 716 | "Failed to fetch from server" | ||
| 717 | ); | ||
| 718 | } | ||
| 719 | } | ||
| 720 | } | ||
| 721 | |||
| 722 | // Clean up if all URLs tried for this domain | ||
| 723 | if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) { | ||
| 724 | self.domain_throttle.remove_identifier_from_domain(domain, identifier); | ||
| 725 | } | ||
| 726 | } | ||
| 727 | |||
| 728 | // 9. If still need OIDs, try throttled domains (they might have capacity now) | ||
| 729 | if !remaining_oids.is_empty() { | ||
| 730 | for domain in &throttled_domains { | ||
| 731 | if remaining_oids.is_empty() { | ||
| 732 | break; | ||
| 733 | } | ||
| 734 | |||
| 735 | // Re-check capacity (might have freed up) | ||
| 736 | if !self.domain_throttle.has_capacity(domain) { | ||
| 737 | continue; | ||
| 738 | } | ||
| 739 | |||
| 740 | let domain_urls = urls_by_domain.get(domain).unwrap(); | ||
| 741 | let untried_urls = self.domain_throttle.get_untried_urls(domain, identifier, domain_urls); | ||
| 742 | |||
| 743 | for url in untried_urls { | ||
| 744 | if remaining_oids.is_empty() { | ||
| 745 | break; | ||
| 746 | } | ||
| 747 | |||
| 748 | if self.domain_throttle.is_in_progress(domain, identifier) { | ||
| 749 | break; | ||
| 750 | } | ||
| 751 | |||
| 752 | if !self.domain_throttle.start_fetch(domain, identifier, &url) { | ||
| 753 | break; | ||
| 754 | } | ||
| 755 | |||
| 756 | any_fetch_started = true; | ||
| 757 | |||
| 758 | let oids_to_fetch: Vec<String> = remaining_oids.iter().cloned().collect(); | ||
| 759 | let fetch_result = fetch_oids_from_server(&target_repo, &url, &oids_to_fetch).await; | ||
| 760 | |||
| 761 | self.domain_throttle.complete_fetch(domain, identifier); | ||
| 762 | |||
| 763 | if let Ok(fetched_oids) = fetch_result { | ||
| 764 | for oid in fetched_oids { | ||
| 765 | remaining_oids.remove(&oid); | ||
| 766 | } | ||
| 767 | } | ||
| 768 | } | ||
| 769 | |||
| 770 | if self.domain_throttle.all_urls_tried(domain, identifier, domain_urls) { | ||
| 771 | self.domain_throttle.remove_identifier_from_domain(domain, identifier); | ||
| 772 | } | ||
| 773 | } | ||
| 774 | } | ||
| 775 | |||
| 776 | // 10. Try to process events that can now be satisfied | ||
| 777 | self.try_process_events(identifier, database, our_domain, local_relay).await; | ||
| 778 | |||
| 779 | // 11. Decide whether to continue looping | ||
| 780 | let still_have_events = self.has_pending_events(identifier); | ||
| 781 | if !still_have_events { | ||
| 782 | return; | ||
| 783 | } | ||
| 784 | |||
| 785 | let still_need_oids = !self.collect_needed_oids(identifier).is_empty(); | ||
| 786 | if !still_need_oids { | ||
| 787 | // Events remain but no OIDs needed - loop to try processing again | ||
| 788 | continue; | ||
| 789 | } | ||
| 790 | |||
| 791 | // Check if there are any untried URLs left across all domains | ||
| 792 | let have_untried_urls = urls_by_domain.iter().any(|(domain, urls)| { | ||
| 793 | !self.domain_throttle.get_untried_urls(domain, identifier, urls).is_empty() | ||
| 794 | }); | ||
| 795 | |||
| 796 | if !have_untried_urls { | ||
| 797 | // No more URLs to try - exit and let backoff handle retry | ||
| 798 | return; | ||
| 799 | } | ||
| 800 | |||
| 801 | // If no fetch was started this iteration (all throttled), yield briefly | ||
| 802 | if !any_fetch_started { | ||
| 803 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 804 | } | ||
| 805 | } | ||
| 806 | } | 665 | } |
| 807 | 666 | ||
| 808 | /// Collect all OIDs needed for purgatory events with this identifier | 667 | #[async_trait] |
| 809 | fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { | 668 | impl SyncContext for MockSyncContext { |
| 810 | let mut oids = HashSet::new(); | 669 | async fn fetch_repository_data(&self, _id: &str) -> Result<RepositoryData> { |
| 811 | 670 | // Return mock data with our available_urls | |
| 812 | // Collect from state events | 671 | Ok(RepositoryData { |
| 813 | if let Some(entries) = self.state_events.get(identifier) { | 672 | announcements: vec![MockAnnouncement { |
| 814 | for entry in entries.iter() { | 673 | clone_urls: self.available_urls.clone(), |
| 815 | if let Ok(state) = RepositoryState::from_event(entry.event.clone()) { | 674 | ..Default::default() |
| 816 | for branch in &state.branches { | 675 | }], |
| 817 | if !branch.commit.starts_with("ref: ") { | 676 | ..Default::default() |
| 818 | oids.insert(branch.commit.clone()); | 677 | }) |
| 819 | } | ||
| 820 | } | ||
| 821 | for tag in &state.tags { | ||
| 822 | if !tag.commit.starts_with("ref: ") { | ||
| 823 | oids.insert(tag.commit.clone()); | ||
| 824 | } | ||
| 825 | } | ||
| 826 | } | ||
| 827 | } | ||
| 828 | } | 678 | } |
| 829 | 679 | ||
| 830 | // Collect from PR events | 680 | fn collect_needed_oids(&self, _id: &str) -> HashSet<String> { |
| 831 | for entry in self.pr_events.iter() { | 681 | self.needed_oids.borrow().clone() |
| 832 | if let Some(ref event) = entry.value().event { | ||
| 833 | if extract_identifier_from_pr(event).as_deref() == Some(identifier) { | ||
| 834 | if let Some(commit) = extract_commit_from_pr(event) { | ||
| 835 | oids.insert(commit); | ||
| 836 | } | ||
| 837 | } | ||
| 838 | } | ||
| 839 | } | 682 | } |
| 840 | 683 | ||
| 841 | oids | 684 | fn oid_exists(&self, _path: &Path, oid: &str) -> bool { |
| 842 | } | 685 | !self.needed_oids.borrow().contains(oid) |
| 843 | |||
| 844 | /// Try to process events that can now be satisfied | ||
| 845 | async fn try_process_events( | ||
| 846 | &self, | ||
| 847 | identifier: &str, | ||
| 848 | database: &SharedDatabase, | ||
| 849 | our_domain: Option<&str>, | ||
| 850 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 851 | ) { | ||
| 852 | if !self.has_pending_events(identifier) { | ||
| 853 | return; | ||
| 854 | } | 686 | } |
| 855 | 687 | ||
| 856 | let db_repo_data = match fetch_repository_data(database, identifier).await { | 688 | async fn fetch_oids(&self, _path: &Path, url: &str, _oids: &[String]) -> Result<Vec<String>> { |
| 857 | Ok(data) => data, | 689 | // Return pre-configured fetch result for this URL |
| 858 | Err(e) => { | 690 | Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) |
| 859 | tracing::warn!(identifier = %identifier, error = %e, "Failed to fetch repo data for processing"); | ||
| 860 | return; | ||
| 861 | } | ||
| 862 | }; | ||
| 863 | |||
| 864 | // Process state events (oldest first) | ||
| 865 | if let Some(mut entries) = self.state_events.get_mut(identifier) { | ||
| 866 | entries.sort_by_key(|e| e.event.created_at); | ||
| 867 | |||
| 868 | let mut to_remove = Vec::new(); | ||
| 869 | |||
| 870 | for entry in entries.iter() { | ||
| 871 | if let Ok(state) = RepositoryState::from_event(entry.event.clone()) { | ||
| 872 | if self.can_satisfy_state(&state, &db_repo_data) { | ||
| 873 | match self.process_state_event(&state, &db_repo_data, database, local_relay).await { | ||
| 874 | Ok(()) => { | ||
| 875 | to_remove.push(entry.event.id); | ||
| 876 | } | ||
| 877 | Err(e) => { | ||
| 878 | tracing::warn!( | ||
| 879 | event_id = %entry.event.id, | ||
| 880 | error = %e, | ||
| 881 | "Failed to process state event" | ||
| 882 | ); | ||
| 883 | } | ||
| 884 | } | ||
| 885 | } | ||
| 886 | } | ||
| 887 | } | ||
| 888 | |||
| 889 | entries.retain(|e| !to_remove.contains(&e.event.id)); | ||
| 890 | } | 691 | } |
| 891 | 692 | ||
| 892 | // Process PR events (oldest first) | 693 | async fn process_satisfiable_events(&self, _id: &str) -> Result<()> { |
| 893 | let mut pr_to_remove = Vec::new(); | 694 | *self.processed_count.borrow_mut() += 1; |
| 894 | let mut pr_entries: Vec<_> = self.pr_events | 695 | Ok(()) |
| 895 | .iter() | 696 | } |
| 896 | .filter_map(|entry| { | ||
| 897 | entry.value().event.as_ref().and_then(|event| { | ||
| 898 | if extract_identifier_from_pr(event).as_deref() == Some(identifier) { | ||
| 899 | Some((entry.key().clone(), event.clone(), entry.value().commit.clone())) | ||
| 900 | } else { | ||
| 901 | None | ||
| 902 | } | ||
| 903 | }) | ||
| 904 | }) | ||
| 905 | .collect(); | ||
| 906 | 697 | ||
| 907 | pr_entries.sort_by_key(|(_, event, _)| event.created_at); | 698 | fn has_pending_events(&self, _id: &str) -> bool { |
| 699 | *self.pending_events.borrow() | ||
| 700 | } | ||
| 908 | 701 | ||
| 909 | for (event_id, event, commit) in pr_entries { | 702 | fn find_target_repo(&self, _data: &RepositoryData) -> Option<PathBuf> { |
| 910 | if self.can_satisfy_pr(&commit, &db_repo_data) { | 703 | Some(PathBuf::from("/tmp/test-repo")) |
| 911 | match self.process_pr_event(&event, &commit, &db_repo_data, database, local_relay).await { | ||
| 912 | Ok(()) => { | ||
| 913 | pr_to_remove.push(event_id); | ||
| 914 | } | ||
| 915 | Err(e) => { | ||
| 916 | tracing::warn!( | ||
| 917 | event_id = %event.id, | ||
| 918 | error = %e, | ||
| 919 | "Failed to process PR event" | ||
| 920 | ); | ||
| 921 | } | ||
| 922 | } | ||
| 923 | } | ||
| 924 | } | 704 | } |
| 925 | 705 | ||
| 926 | for event_id in pr_to_remove { | 706 | fn our_domain(&self) -> Option<&str> { |
| 927 | self.pr_events.remove(&event_id); | 707 | None |
| 928 | } | 708 | } |
| 929 | } | 709 | } |
| 930 | 710 | ||
| 931 | /// Check if a state event can be satisfied (all OIDs available locally) | 711 | #[tokio::test] |
| 932 | fn can_satisfy_state(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> bool { | 712 | async fn test_sync_step_no_pending_events() { |
| 933 | for announcement in &db_repo_data.announcements { | 713 | let ctx = MockSyncContext { |
| 934 | let repo_path = self.git_data_path.join(announcement.repo_path()); | 714 | pending_events: RefCell::new(false), |
| 935 | if !repo_path.exists() { | 715 | ..Default::default() |
| 936 | continue; | 716 | }; |
| 937 | } | 717 | let throttle = DomainThrottle::new(5, 30); |
| 938 | 718 | let tried = HashSet::new(); | |
| 939 | let all_present = state.branches.iter().all(|b| { | 719 | |
| 940 | b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit) | 720 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); |
| 941 | }) && state.tags.iter().all(|t| { | 721 | assert_eq!(result, SyncStepResult::NoPendingEvents); |
| 942 | oid_exists(&repo_path, &t.commit) | ||
| 943 | }); | ||
| 944 | |||
| 945 | if all_present { | ||
| 946 | return true; | ||
| 947 | } | ||
| 948 | } | ||
| 949 | false | ||
| 950 | } | 722 | } |
| 951 | 723 | ||
| 952 | /// Check if a PR event can be satisfied (commit available locally) | 724 | #[tokio::test] |
| 953 | fn can_satisfy_pr(&self, commit: &str, db_repo_data: &RepositoryData) -> bool { | 725 | async fn test_sync_step_no_oids_needed() { |
| 954 | for announcement in &db_repo_data.announcements { | 726 | let ctx = MockSyncContext { |
| 955 | let repo_path = self.git_data_path.join(announcement.repo_path()); | 727 | pending_events: RefCell::new(true), |
| 956 | if repo_path.exists() && oid_exists(&repo_path, commit) { | 728 | needed_oids: RefCell::new(HashSet::new()), // Empty = no OIDs needed |
| 957 | return true; | 729 | ..Default::default() |
| 958 | } | 730 | }; |
| 959 | } | 731 | let throttle = DomainThrottle::new(5, 30); |
| 960 | false | 732 | let tried = HashSet::new(); |
| 961 | } | 733 | |
| 962 | } | 734 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); |
| 963 | 735 | assert_eq!(result, SyncStepResult::Complete); | |
| 964 | /// Extract commit hash from PR event's `c` tag | 736 | assert_eq!(*ctx.processed_count.borrow(), 1); |
| 965 | fn extract_commit_from_pr(event: &Event) -> Option<String> { | ||
| 966 | event.tags.iter().find_map(|tag| { | ||
| 967 | let tag_vec = tag.clone().to_vec(); | ||
| 968 | if tag_vec.len() >= 2 && tag_vec[0] == "c" { | ||
| 969 | Some(tag_vec[1].clone()) | ||
| 970 | } else { | ||
| 971 | None | ||
| 972 | } | ||
| 973 | }) | ||
| 974 | } | ||
| 975 | ``` | ||
| 976 | |||
| 977 | ### Fetching OIDs | ||
| 978 | |||
| 979 | ```rust | ||
| 980 | /// Fetch specific OIDs from a remote git server | ||
| 981 | /// | ||
| 982 | /// Returns the list of OIDs that were successfully fetched (now exist locally). | ||
| 983 | /// Git fetch may partially succeed, so we check which OIDs are available after. | ||
| 984 | async fn fetch_oids_from_server( | ||
| 985 | repo_path: &Path, | ||
| 986 | server_url: &str, | ||
| 987 | oids: &[String], | ||
| 988 | ) -> Result<Vec<String>> { | ||
| 989 | if oids.is_empty() { | ||
| 990 | return Ok(Vec::new()); | ||
| 991 | } | 737 | } |
| 992 | 738 | ||
| 993 | let repo_path = repo_path.to_path_buf(); | 739 | #[tokio::test] |
| 994 | let server_url = server_url.to_string(); | 740 | async fn test_sync_step_tries_url() { |
| 995 | let oids = oids.to_vec(); | 741 | let mut needed = HashSet::new(); |
| 996 | 742 | needed.insert("abc123".to_string()); | |
| 997 | tokio::task::spawn_blocking(move || { | ||
| 998 | // Build git fetch command with all OIDs | ||
| 999 | let mut args = vec!["fetch", "--depth=1", &server_url]; | ||
| 1000 | args.extend(oids.iter().map(|s| s.as_str())); | ||
| 1001 | 743 | ||
| 1002 | tracing::debug!( | 744 | let mut fetch_results = HashMap::new(); |
| 1003 | oids_count = oids.len(), | 745 | fetch_results.insert( |
| 1004 | server = %server_url, | 746 | "https://example.com/repo.git".to_string(), |
| 1005 | "Fetching OIDs" | 747 | vec!["abc123".to_string()], |
| 1006 | ); | 748 | ); |
| 1007 | 749 | ||
| 1008 | let output = Command::new("git") | 750 | let ctx = MockSyncContext { |
| 1009 | .args(&args) | 751 | pending_events: RefCell::new(true), |
| 1010 | .current_dir(&repo_path) | 752 | needed_oids: RefCell::new(needed), |
| 1011 | .output(); | 753 | available_urls: vec!["https://example.com/repo.git".to_string()], |
| 754 | fetch_results: RefCell::new(fetch_results), | ||
| 755 | processed_count: RefCell::new(0), | ||
| 756 | }; | ||
| 757 | let throttle = DomainThrottle::new(5, 30); | ||
| 758 | let tried = HashSet::new(); | ||
| 1012 | 759 | ||
| 1013 | match output { | 760 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); |
| 1014 | Ok(result) => { | ||
| 1015 | // Check which OIDs we now have (regardless of command success) | ||
| 1016 | // git fetch may partially succeed | ||
| 1017 | let fetched: Vec<String> = oids | ||
| 1018 | .iter() | ||
| 1019 | .filter(|oid| oid_exists(&repo_path, oid)) | ||
| 1020 | .cloned() | ||
| 1021 | .collect(); | ||
| 1022 | |||
| 1023 | if !result.status.success() { | ||
| 1024 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 1025 | tracing::debug!( | ||
| 1026 | server = %server_url, | ||
| 1027 | stderr = %stderr, | ||
| 1028 | fetched_count = fetched.len(), | ||
| 1029 | "git fetch returned non-zero but may have fetched some OIDs" | ||
| 1030 | ); | ||
| 1031 | } | ||
| 1032 | |||
| 1033 | Ok(fetched) | ||
| 1034 | } | ||
| 1035 | Err(e) => { | ||
| 1036 | bail!("git fetch command error: {}", e) | ||
| 1037 | } | ||
| 1038 | } | ||
| 1039 | }) | ||
| 1040 | .await? | ||
| 1041 | } | ||
| 1042 | |||
| 1043 | /// Extract domain from a URL for throttling | ||
| 1044 | fn extract_domain(url: &str) -> String { | ||
| 1045 | url::Url::parse(url) | ||
| 1046 | .ok() | ||
| 1047 | .and_then(|u| u.host_str().map(|s| s.to_string())) | ||
| 1048 | .unwrap_or_else(|| url.to_string()) | ||
| 1049 | } | ||
| 1050 | ``` | ||
| 1051 | |||
| 1052 | ### Helper Methods | ||
| 1053 | |||
| 1054 | ```rust | ||
| 1055 | impl Purgatory { | ||
| 1056 | /// Find the best local repository to fetch OIDs into | ||
| 1057 | /// | ||
| 1058 | /// Prefers the repo with the most recent commit on its default branch, | ||
| 1059 | /// as it's most likely to have related git history. | ||
| 1060 | fn find_target_repo(&self, db_repo_data: &RepositoryData, identifier: &str) -> Option<PathBuf> { | ||
| 1061 | let mut best: Option<(Timestamp, PathBuf)> = None; | ||
| 1062 | 761 | ||
| 1063 | for announcement in &db_repo_data.announcements { | 762 | match result { |
| 1064 | let repo_path = self.git_data_path.join(announcement.repo_path()); | 763 | SyncStepResult::TriedUrl { url, oids_fetched } => { |
| 1065 | if !repo_path.exists() { | 764 | assert_eq!(url, "https://example.com/repo.git"); |
| 1066 | continue; | 765 | assert_eq!(oids_fetched, 1); |
| 1067 | } | ||
| 1068 | |||
| 1069 | let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path) | ||
| 1070 | .unwrap_or(Timestamp::zero()); | ||
| 1071 | |||
| 1072 | if best.as_ref().map_or(true, |(d, _)| commit_date > *d) { | ||
| 1073 | best = Some((commit_date, repo_path)); | ||
| 1074 | } | 766 | } |
| 767 | _ => panic!("Expected TriedUrl, got {:?}", result), | ||
| 1075 | } | 768 | } |
| 1076 | |||
| 1077 | best.map(|(_, path)| path) | ||
| 1078 | } | 769 | } |
| 1079 | 770 | ||
| 1080 | /// Process a state event that can now be satisfied | 771 | #[tokio::test] |
| 1081 | /// | 772 | async fn test_sync_step_all_urls_tried() { |
| 1082 | /// Syncs OIDs to all owner repos, aligns refs, saves to DB, notifies subscribers. | 773 | let mut needed = HashSet::new(); |
| 1083 | async fn process_state_event( | 774 | needed.insert("abc123".to_string()); |
| 1084 | &self, | 775 | |
| 1085 | state: &RepositoryState, | 776 | let ctx = MockSyncContext { |
| 1086 | db_repo_data: &RepositoryData, | 777 | pending_events: RefCell::new(true), |
| 1087 | database: &SharedDatabase, | 778 | needed_oids: RefCell::new(needed), |
| 1088 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | 779 | available_urls: vec!["https://example.com/repo.git".to_string()], |
| 1089 | ) -> Result<()> { | 780 | fetch_results: RefCell::new(HashMap::new()), |
| 1090 | // Find source repo (one that has all OIDs) | 781 | processed_count: RefCell::new(0), |
| 1091 | let source_repo = self.find_repo_with_all_oids(state, db_repo_data) | 782 | }; |
| 1092 | .ok_or_else(|| anyhow::anyhow!("No repo has all required OIDs"))?; | 783 | let throttle = DomainThrottle::new(5, 30); |
| 1093 | 784 | ||
| 1094 | // Sync to other owner repos and align refs | 785 | // Mark the only URL as tried |
| 1095 | let sync_result = sync_to_owner_repos(&source_repo, state, db_repo_data, &self.git_data_path); | 786 | let mut tried = HashSet::new(); |
| 787 | tried.insert("https://example.com/repo.git".to_string()); | ||
| 1096 | 788 | ||
| 1097 | tracing::info!( | 789 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); |
| 1098 | identifier = %state.identifier, | 790 | assert_eq!(result, SyncStepResult::NoMoreUrls); |
| 1099 | event_id = %state.event.id, | 791 | } |
| 1100 | repos_synced = sync_result.repos_synced, | 792 | |
| 1101 | "Synced state from purgatory" | 793 | #[tokio::test] |
| 1102 | ); | 794 | async fn test_sync_step_domain_throttled() { |
| 795 | let mut needed = HashSet::new(); | ||
| 796 | needed.insert("abc123".to_string()); | ||
| 797 | |||
| 798 | let ctx = MockSyncContext { | ||
| 799 | pending_events: RefCell::new(true), | ||
| 800 | needed_oids: RefCell::new(needed), | ||
| 801 | available_urls: vec!["https://example.com/repo.git".to_string()], | ||
| 802 | fetch_results: RefCell::new(HashMap::new()), | ||
| 803 | processed_count: RefCell::new(0), | ||
| 804 | }; | ||
| 1103 | 805 | ||
| 1104 | // Save to database | 806 | // Create throttle with 0 concurrent limit |
| 1105 | database.save_event(&state.event).await?; | 807 | let throttle = DomainThrottle::new(0, 30); |
| 808 | let tried = HashSet::new(); | ||
| 1106 | 809 | ||
| 1107 | // Notify subscribers | 810 | let result = sync_identifier_step(&ctx, "test", &tried, &throttle).await.unwrap(); |
| 1108 | if let Some(relay) = local_relay { | ||
| 1109 | relay.notify_event(state.event.clone()); | ||
| 1110 | } | ||
| 1111 | 811 | ||
| 1112 | Ok(()) | 812 | match result { |
| 813 | SyncStepResult::AllUrlsThrottled { .. } => {} | ||
| 814 | _ => panic!("Expected AllUrlsThrottled, got {:?}", result), | ||
| 815 | } | ||
| 1113 | } | 816 | } |
| 1114 | 817 | ||
| 1115 | /// Process a PR event that can now be satisfied | 818 | #[tokio::test] |
| 1116 | async fn process_pr_event( | 819 | async fn test_full_sync_loop() { |
| 1117 | &self, | 820 | let mut needed = HashSet::new(); |
| 1118 | event: &Event, | 821 | needed.insert("abc123".to_string()); |
| 1119 | commit: &str, | 822 | needed.insert("def456".to_string()); |
| 1120 | db_repo_data: &RepositoryData, | 823 | |
| 1121 | database: &SharedDatabase, | 824 | let mut fetch_results = HashMap::new(); |
| 1122 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | 825 | // First URL returns one OID |
| 1123 | ) -> Result<()> { | 826 | fetch_results.insert( |
| 1124 | // Save to database | 827 | "https://server1.com/repo.git".to_string(), |
| 1125 | database.save_event(event).await?; | 828 | vec!["abc123".to_string()], |
| 829 | ); | ||
| 830 | // Second URL returns the other | ||
| 831 | fetch_results.insert( | ||
| 832 | "https://server2.com/repo.git".to_string(), | ||
| 833 | vec!["def456".to_string()], | ||
| 834 | ); | ||
| 1126 | 835 | ||
| 1127 | // Notify subscribers | 836 | let ctx = MockSyncContext { |
| 1128 | if let Some(relay) = local_relay { | 837 | pending_events: RefCell::new(true), |
| 1129 | relay.notify_event(event.clone()); | 838 | needed_oids: RefCell::new(needed.clone()), |
| 1130 | } | 839 | available_urls: vec![ |
| 840 | "https://server1.com/repo.git".to_string(), | ||
| 841 | "https://server2.com/repo.git".to_string(), | ||
| 842 | ], | ||
| 843 | fetch_results: RefCell::new(fetch_results), | ||
| 844 | processed_count: RefCell::new(0), | ||
| 845 | }; | ||
| 1131 | 846 | ||
| 1132 | tracing::info!( | 847 | // Simulate OIDs being removed as they're fetched |
| 1133 | event_id = %event.id, | 848 | // (In real code, collect_needed_oids would return fewer OIDs) |
| 1134 | commit = %commit, | ||
| 1135 | "Processed PR event from purgatory" | ||
| 1136 | ); | ||
| 1137 | 849 | ||
| 1138 | Ok(()) | 850 | let throttle = DomainThrottle::new(5, 30); |
| 1139 | } | 851 | let complete = sync_identifier(&ctx, "test", &throttle).await; |
| 1140 | 852 | ||
| 1141 | /// Find a repository that has all OIDs required by a state event | 853 | // Should have tried both URLs |
| 1142 | fn find_repo_with_all_oids(&self, state: &RepositoryState, db_repo_data: &RepositoryData) -> Option<PathBuf> { | 854 | assert!(*ctx.processed_count.borrow() >= 1); |
| 1143 | for announcement in &db_repo_data.announcements { | ||
| 1144 | let repo_path = self.git_data_path.join(announcement.repo_path()); | ||
| 1145 | if !repo_path.exists() { | ||
| 1146 | continue; | ||
| 1147 | } | ||
| 1148 | |||
| 1149 | let all_present = state.branches.iter().all(|b| { | ||
| 1150 | b.commit.starts_with("ref: ") || oid_exists(&repo_path, &b.commit) | ||
| 1151 | }) && state.tags.iter().all(|t| { | ||
| 1152 | oid_exists(&repo_path, &t.commit) | ||
| 1153 | }); | ||
| 1154 | |||
| 1155 | if all_present { | ||
| 1156 | return Some(repo_path); | ||
| 1157 | } | ||
| 1158 | } | ||
| 1159 | None | ||
| 1160 | } | 855 | } |
| 1161 | } | 856 | } |
| 1162 | ``` | 857 | ``` |
| 1163 | ``` | ||
| 1164 | |||
| 1165 | ## Integration Points | ||
| 1166 | |||
| 1167 | ### 1. Negentropy Sync (src/purgatory/mod.rs:105-107) | ||
| 1168 | |||
| 1169 | When events are added via negentropy sync, call `trigger_immediate_sync`: | ||
| 1170 | |||
| 1171 | ```rust | ||
| 1172 | // In negentropy sync handler | ||
| 1173 | purgatory.add_state(event, identifier.clone(), author); | ||
| 1174 | purgatory.trigger_immediate_sync(&identifier); // NEW: triggers 500ms debounced sync | ||
| 1175 | ``` | ||
| 1176 | |||
| 1177 | ### 2. Relay Startup | ||
| 1178 | |||
| 1179 | Start the sync loop when the relay starts: | ||
| 1180 | |||
| 1181 | ```rust | ||
| 1182 | // In main.rs or server setup | ||
| 1183 | let domain_throttle = Arc::new(DomainThrottle::new(5, 30)); // 5 concurrent, 30/min | ||
| 1184 | let purgatory = Arc::new(Purgatory::new(git_data_path, domain_throttle)); | ||
| 1185 | |||
| 1186 | let sync_handle = purgatory.clone().start_sync_loop( | ||
| 1187 | database.clone(), | ||
| 1188 | Some(domain.clone()), | ||
| 1189 | Some(local_relay.clone()), | ||
| 1190 | ); | ||
| 1191 | ``` | ||
| 1192 | |||
| 1193 | ### 3. Shutdown | ||
| 1194 | |||
| 1195 | The sync loop will naturally stop when the purgatory is dropped. No special shutdown handling needed since all state is in-memory. | ||
| 1196 | |||
| 1197 | ## Testing Strategy | ||
| 1198 | |||
| 1199 | ### Unit Tests | ||
| 1200 | |||
| 1201 | 1. **SyncQueueEntry** | ||
| 1202 | - Verify backoff calculation: 20s → 40s → 80s → 120s → 120s | ||
| 1203 | - Verify `on_new_event()` resets attempt_count and updates next_attempt if sooner | ||
| 1204 | - Verify `on_sync_complete()` applies backoff only if next_attempt is in the past | ||
| 1205 | - Verify `is_ready()` respects both `next_attempt` and `in_progress` | ||
| 1206 | |||
| 1207 | 2. **DomainThrottle** | ||
| 1208 | - Verify concurrent limit: 6th request to same domain blocked | ||
| 1209 | - Verify rate limit: 31st request in a minute blocked | ||
| 1210 | - Verify `has_capacity()` checks both limits | ||
| 1211 | - Verify `get_untried_urls()` returns only URLs not in urls_tried | ||
| 1212 | - Verify `start_fetch()` fails if already in_progress for domain+identifier | ||
| 1213 | - Verify `start_fetch()` adds URL to urls_tried | ||
| 1214 | - Verify `complete_fetch()` decrements in_flight and clears in_progress | ||
| 1215 | - Verify `all_urls_tried()` correctly identifies when done | ||
| 1216 | - Verify `remove_identifier_from_domain()` cleans up state | ||
| 1217 | - Verify `remove_identifier()` removes from all domains | ||
| 1218 | |||
| 1219 | 3. **OID collection** | ||
| 1220 | - Verify OIDs extracted from state events correctly | ||
| 1221 | - Verify OIDs extracted from PR events correctly | ||
| 1222 | - Verify deduplication works across state and PR events | ||
| 1223 | |||
| 1224 | 4. **Identifier extraction** | ||
| 1225 | - Verify `extract_identifier_from_pr()` handles various `a` tag formats | ||
| 1226 | - Verify `extract_commit_from_pr()` extracts `c` tag correctly | ||
| 1227 | 858 | ||
| 1228 | ### Integration Tests | 859 | ### Integration Tests |
| 1229 | 860 | ||
| 1230 | 1. **Sync against own implementation** | 861 | 1. **Sync against own implementation**: Two ngit-grasp instances syncing |
| 1231 | - Start two ngit-grasp instances | 862 | 2. **Burst handling**: 10 events in 100ms, verify debounce |
| 1232 | - Push to one, verify other can sync via purgatory | 863 | 3. **Backoff behavior**: Unreachable URLs, verify timing |
| 1233 | - Verify partial OID availability handled correctly (some OIDs fetched, others missing) | 864 | 4. **Rate limiting**: Verify 30 req/min and 5 concurrent limits |
| 1234 | 865 | 5. **Parallel identifiers**: 5 identifiers sync in parallel | |
| 1235 | 2. **Burst handling** | ||
| 1236 | - Submit 10 events for same identifier within 100ms | ||
| 1237 | - Verify debounce: sync doesn't start until 500ms after last event | ||
| 1238 | - Verify only one sync operation runs (not 10) | ||
| 1239 | |||
| 1240 | 3. **Backoff behavior** | ||
| 1241 | - Configure with unreachable clone URLs | ||
| 1242 | - Verify backoff timing: 20s, 40s, 80s, 120s, then stays at 120s | ||
| 1243 | - Verify new event arriving resets attempt_count to 0 | ||
| 1244 | - Verify new event during sync prevents backoff (next_attempt already in future) | ||
| 1245 | |||
| 1246 | 4. **Rate limiting** | ||
| 1247 | - Configure with single domain having multiple URLs | ||
| 1248 | - Trigger many sync operations | ||
| 1249 | - Verify only 30 requests made in first minute | ||
| 1250 | - Verify only 5 concurrent requests per domain | ||
| 1251 | |||
| 1252 | 5. **Concurrent limit per domain+identifier** | ||
| 1253 | - Start fetch for domain+identifier | ||
| 1254 | - Verify second fetch attempt for same domain+identifier blocked | ||
| 1255 | - Verify fetch for different identifier on same domain allowed (up to 5) | ||
| 1256 | |||
| 1257 | 6. **Parallel identifier processing** | ||
| 1258 | - Add events for 5 different identifiers | ||
| 1259 | - Verify all 5 sync tasks start in parallel (not serial) | ||
| 1260 | - Verify `in_progress` flag prevents duplicate tasks for same identifier | ||
| 1261 | |||
| 1262 | 7. **Dynamic URL/OID recalculation** | ||
| 1263 | - Start sync for identifier | ||
| 1264 | - While sync is running, add new announcement with additional clone URL | ||
| 1265 | - Verify sync picks up new URL in next while loop iteration | ||
| 1266 | - Similarly for new events adding new OIDs | ||
| 1267 | |||
| 1268 | 8. **urls_tried cleanup** | ||
| 1269 | - Sync identifier, exhaust all URLs for a domain | ||
| 1270 | - Verify identifier removed from that domain's state | ||
| 1271 | - Add new announcement with new URL for same domain | ||
| 1272 | - Verify new URL is tried on next sync attempt | ||
| 1273 | |||
| 1274 | 9. **Mixed state and PR events** | ||
| 1275 | - Add state event and PR event for same identifier | ||
| 1276 | - Verify both OID sets collected | ||
| 1277 | - Verify both events processed when OIDs arrive | ||
| 1278 | |||
| 1279 | 10. **Available domains first** | ||
| 1280 | - Have 10 URLs: 8 from available domains, 2 from throttled domain | ||
| 1281 | - Verify available domains tried first | ||
| 1282 | - Verify throttled domain only contacted if available didn't satisfy all OIDs | ||
| 1283 | 866 | ||
| 1284 | ## Migration Path | 867 | ## Migration Path |
| 1285 | 868 | ||
| 1286 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle) | 869 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, DomainThrottle, SyncContext trait) |
| 1287 | 2. **Phase 2**: Implement sync loop alongside existing `start_state_sync` | 870 | 2. **Phase 2**: Implement `sync_identifier_step` with unit tests |
| 1288 | 3. **Phase 3**: Migrate state events to use new sync loop | 871 | 3. **Phase 3**: Implement main sync loop alongside existing `start_state_sync` |
| 1289 | 4. **Phase 4**: Add PR event syncing | 872 | 4. **Phase 4**: Add PR event syncing |
| 1290 | 5. **Phase 5**: Remove old `start_state_sync` code | 873 | 5. **Phase 5**: Remove old `start_state_sync` code |
| 1291 | 874 | ||
| 1292 | ## Configuration | 875 | ## Configuration |
| 1293 | 876 | ||
| 1294 | New configuration options: | ||
| 1295 | |||
| 1296 | | Option | CLI Flag | Environment Variable | Default | | 877 | | Option | CLI Flag | Environment Variable | Default | |
| 1297 | |--------|----------|---------------------|---------| | 878 | |--------|----------|---------------------|---------| |
| 1298 | | Sync loop interval | `--sync-loop-interval-ms` | `NGIT_SYNC_LOOP_INTERVAL_MS` | `1000` | | 879 | | Sync loop interval | `--sync-loop-interval-ms` | `NGIT_SYNC_LOOP_INTERVAL_MS` | `1000` | |
| @@ -1305,24 +886,14 @@ New configuration options: | |||
| 1305 | 886 | ||
| 1306 | ### Metrics | 887 | ### Metrics |
| 1307 | 888 | ||
| 1308 | - `purgatory_sync_queue_size` - Number of identifiers pending sync | 889 | - `purgatory_sync_queue_size` - Identifiers pending sync |
| 1309 | - `purgatory_sync_attempts_total` - Counter of sync attempts per identifier | 890 | - `purgatory_sync_attempts_total` - Sync attempts per identifier |
| 1310 | - `purgatory_sync_oids_fetched_total` - Counter of OIDs successfully fetched | 891 | - `purgatory_sync_oids_fetched_total` - OIDs successfully fetched |
| 1311 | - `purgatory_domain_in_flight` - Gauge of in-flight requests per domain | 892 | - `purgatory_domain_in_flight` - In-flight requests per domain |
| 1312 | - `purgatory_domain_requests_total` - Counter of requests per domain | 893 | - `purgatory_domain_requests_total` - Total requests per domain |
| 1313 | - `purgatory_sync_backoff_seconds` - Histogram of backoff durations applied | ||
| 1314 | 894 | ||
| 1315 | ### Logging | 895 | ### Logging |
| 1316 | 896 | ||
| 1317 | Key log points: | ||
| 1318 | - `INFO`: Successful sync completion, OIDs fetched | 897 | - `INFO`: Successful sync completion, OIDs fetched |
| 1319 | - `DEBUG`: Domain capacity checks, backoff applied, urls_tried state | 898 | - `DEBUG`: URL attempts, throttle decisions, backoff applied |
| 1320 | - `WARN`: Fetch failures, processing errors | 899 | - `WARN`: Fetch failures, processing errors |
| 1321 | |||
| 1322 | ## Open Questions | ||
| 1323 | |||
| 1324 | 1. **PR placeholder handling**: Current code has `add_pr_placeholder()` for git-data-first scenario. How should this interact with the new sync system? (Probably: placeholders don't need syncing since git data already exists) | ||
| 1325 | |||
| 1326 | 2. **Memory bounds**: Should we limit sync queue size? What happens if thousands of identifiers are pending? | ||
| 1327 | |||
| 1328 | 3. **Persistence**: Currently all purgatory state is in-memory. Should sync queue state survive restarts? (Probably no - events will be re-synced via negentropy on restart) | ||