upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
Diffstat (limited to 'docs')
-rw-r--r--docs/explanation/purgatory-sync-redesign.md742
1 files changed, 415 insertions, 327 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md
index 8eaed6c..54e279a 100644
--- a/docs/explanation/purgatory-sync-redesign.md
+++ b/docs/explanation/purgatory-sync-redesign.md
@@ -1331,430 +1331,518 @@ impl Purgatory {
1331 1331
1332## Testing Strategy 1332## Testing Strategy
1333 1333
1334The testing strategy follows a clear separation: **unit tests** verify the core sync mechanics (retry, backoff, throttling, round-robin) using mocks, while **integration tests** verify end-to-end behavior with real relay instances. 1334Tests are created **only** as part of each implementation phase. See [Implementation Phases](#implementation-phases) for the complete test plan.
1335 1335
1336### Design Principles 1336### Design Principles
1337 1337
13381. **Unit tests for mechanics**: Test retry logic, backoff timing, throttle behavior, and queue management in isolation 13381. **Tests accompany code**: Each phase specifies exactly which tests to create
13392. **Integration tests for outcomes**: Verify that events sync correctly, not the internal mechanics 13392. **Unit tests for mechanics**: Test backoff, throttle, retry logic in isolation using mocks
13403. **Avoid testing implementation details**: Don't test every code path; test observable behaviors 13403. **Integration tests for outcomes**: Verify events sync correctly end-to-end
13414. **Focus on edge cases that matter**: Partial OID availability, server failures, concurrent events 13414. **No speculative tests**: Don't create tests for code that doesn't exist yet
1342 1342
1343### Unit Tests (src/purgatory/sync.rs) 1343### MockSyncContext
1344 1344
1345Unit tests use `MockSyncContext` to test sync logic without I/O. The mock is simple: 1345Phases 4-6 use `MockSyncContext` to test sync logic without I/O:
1346 1346
1347```rust 1347```rust
1348/// Mock context for testing sync logic 1348/// Mock context for testing sync logic
1349struct MockSyncContext { 1349#[cfg(test)]
1350 /// URLs available for fetching, grouped by domain 1350pub struct MockSyncContext {
1351 urls_by_domain: HashMap<String, Vec<String>>, 1351 /// Repository data to return
1352 /// Which OIDs each URL can provide 1352 repo_data: RepositoryData,
1353 url_provides_oids: HashMap<String, HashSet<String>>,
1354 /// OIDs still needed (decremented when "fetched") 1353 /// OIDs still needed (decremented when "fetched")
1355 needed_oids: RefCell<HashSet<String>>, 1354 needed_oids: RefCell<HashSet<String>>,
1355 /// Which OIDs each URL can provide
1356 url_provides_oids: HashMap<String, HashSet<String>>,
1356 /// Track fetch attempts for assertions 1357 /// Track fetch attempts for assertions
1357 fetch_log: RefCell<Vec<(String, Vec<String>)>>, // (url, oids_requested) 1358 fetch_log: RefCell<Vec<String>>,
1359 /// Whether there are pending events
1360 has_pending: RefCell<bool>,
1361}
1362
1363impl MockSyncContext {
1364 pub fn new() -> Self;
1365 pub fn with_urls(self, urls: &[&str]) -> Self;
1366 pub fn with_needed_oids(self, oids: &[&str]) -> Self;
1367 pub fn url_provides(self, url: &str, oids: &[&str]) -> Self;
1358} 1368}
1359``` 1369```
1360 1370
1361#### Test Module 1: Backoff Behavior 1371### Test Locations
1372
1373| Test Type | Location | Created In |
1374|-----------|----------|------------|
1375| SyncQueueEntry | `src/purgatory/sync/queue.rs` | Phase 1 |
1376| DomainThrottle | `src/purgatory/sync/throttle.rs` | Phase 2 |
1377| ThrottleManager | `src/purgatory/sync/throttle.rs` | Phase 3 |
1378| Core sync functions | `src/purgatory/sync/functions.rs` | Phase 5-6 |
1379| Queue integration | `src/purgatory/mod.rs` | Phase 7 |
1380| Integration tests | `tests/purgatory_sync.rs` | Phase 10 |
1381
1382## Implementation Phases
1383
1384Each phase has clear deliverables, unit tests, and success criteria. Unit tests are created **only** for the code built in that phase.
1385
1386---
1387
1388### Phase 1: SyncQueueEntry with Backoff
1389
1390**Goal**: Implement the sync queue entry struct with backoff calculation.
1391
1392**Files**:
1393- `src/purgatory/sync/queue.rs` (new)
1362 1394
1363Tests the `SyncQueueEntry` backoff calculation and state transitions: 1395**Deliverables**:
1396```rust
1397pub struct SyncQueueEntry {
1398 pub next_attempt: Instant,
1399 pub attempt_count: u32,
1400 pub in_progress: bool,
1401}
1364 1402
1403impl SyncQueueEntry {
1404 pub fn new(delay: Duration) -> Self;
1405 pub fn backoff(&self) -> Duration;
1406 pub fn is_ready(&self) -> bool;
1407 pub fn on_new_event(&mut self, delay: Duration);
1408 pub fn on_sync_complete(&mut self);
1409}
1410```
1411
1412**Unit Tests** (2 tests):
1365```rust 1413```rust
1366mod backoff_tests { 1414#[cfg(test)]
1415mod tests {
1367 #[test] 1416 #[test]
1368 fn backoff_doubles_up_to_cap() { 1417 fn backoff_doubles_up_to_cap() {
1369 // Verify: 20s → 40s → 80s → 120s (capped) 1418 // 20s → 40s → 80s → 120s → 120s (capped)
1370 let mut entry = SyncQueueEntry::new(Duration::ZERO);
1371
1372 entry.on_sync_complete();
1373 assert_eq!(entry.backoff(), Duration::from_secs(20));
1374
1375 entry.on_sync_complete();
1376 assert_eq!(entry.backoff(), Duration::from_secs(40));
1377
1378 entry.on_sync_complete();
1379 assert_eq!(entry.backoff(), Duration::from_secs(80));
1380
1381 entry.on_sync_complete();
1382 assert_eq!(entry.backoff(), Duration::from_secs(120));
1383
1384 // Stays capped
1385 entry.on_sync_complete();
1386 assert_eq!(entry.backoff(), Duration::from_secs(120));
1387 } 1419 }
1388 1420
1389 #[test] 1421 #[test]
1390 fn new_event_resets_backoff() { 1422 fn new_event_resets_attempt_count() {
1391 // New event for same identifier resets attempt_count 1423 // on_new_event() resets attempt_count to 0
1392 let mut entry = SyncQueueEntry::new(Duration::ZERO);
1393 entry.on_sync_complete();
1394 entry.on_sync_complete();
1395 assert!(entry.attempt_count > 0);
1396
1397 entry.on_new_event(Duration::from_millis(500));
1398 assert_eq!(entry.attempt_count, 0);
1399 } 1424 }
1400} 1425}
1401``` 1426```
1402 1427
1403#### Test Module 2: Throttle & Rate Limiting 1428**Success Criteria**:
1429- [ ] `SyncQueueEntry::new()` creates entry with given delay
1430- [ ] `backoff()` returns 20s, 40s, 80s, 120s, 120s for attempts 1-5
1431- [ ] `on_new_event()` resets `attempt_count` to 0
1432- [ ] `on_sync_complete()` increments `attempt_count` and updates `next_attempt`
1433- [ ] Both unit tests pass
1434
1435---
1436
1437### Phase 2: DomainThrottle with Rate Limiting and Round-Robin
1438
1439**Goal**: Implement per-domain throttling with concurrent/rate limits and fair queue processing.
1440
1441**Files**:
1442- `src/purgatory/sync/throttle.rs` (new)
1443
1444**Deliverables**:
1445```rust
1446pub struct DomainThrottle {
1447 domain: String,
1448 in_flight: u32,
1449 request_times: VecDeque<Instant>,
1450 queue: IndexMap<String, IdentifierQueueState>,
1451 round_robin_index: usize,
1452 max_concurrent: u32,
1453 max_per_minute: u32,
1454}
1404 1455
1405Tests `DomainThrottle` capacity checks and rate limiting: 1456impl DomainThrottle {
1457 pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self;
1458 pub fn has_capacity(&self) -> bool;
1459 pub fn start_request(&mut self);
1460 pub fn complete_request(&mut self);
1461 pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>);
1462 pub fn next_ready_identifier(&mut self) -> Option<String>;
1463 pub fn mark_identifier_not_in_progress(&mut self, identifier: &str);
1464 pub fn remove_identifier(&mut self, identifier: &str);
1465}
1466```
1406 1467
1468**Unit Tests** (4 tests):
1407```rust 1469```rust
1408mod throttle_tests { 1470#[cfg(test)]
1471mod tests {
1409 #[test] 1472 #[test]
1410 fn concurrent_limit_enforced() { 1473 fn concurrent_limit_blocks_when_saturated() {
1411 let mut throttle = DomainThrottle::new("example.com".into(), 2, 100); 1474 // has_capacity() returns false when in_flight >= max_concurrent
1412
1413 assert!(throttle.has_capacity());
1414 throttle.start_request();
1415 assert!(throttle.has_capacity());
1416 throttle.start_request();
1417 assert!(!throttle.has_capacity()); // At limit
1418
1419 throttle.complete_request();
1420 assert!(throttle.has_capacity()); // Capacity freed
1421 } 1475 }
1422 1476
1423 #[test] 1477 #[test]
1424 fn rate_limit_enforced() { 1478 fn rate_limit_blocks_when_window_full() {
1425 let mut throttle = DomainThrottle::new("example.com".into(), 100, 2); 1479 // has_capacity() returns false when requests in last 60s >= max_per_minute
1426 1480 // Use deterministic time (pass Instant or mock clock)
1427 // Make 2 requests (at rate limit)
1428 throttle.start_request();
1429 throttle.complete_request();
1430 throttle.start_request();
1431 throttle.complete_request();
1432
1433 assert!(!throttle.has_capacity()); // Rate limited
1434
1435 // After 60s window passes, capacity returns
1436 // (In real tests, use tokio::time::pause/advance)
1437 } 1481 }
1438 1482
1439 #[test] 1483 #[test]
1440 fn round_robin_fairness() { 1484 fn round_robin_processes_identifiers_fairly() {
1441 let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); 1485 // Enqueue A, B, C → next_ready returns A, B, C, A, B, C...
1442
1443 // Enqueue 3 identifiers
1444 throttle.enqueue_identifier("A".into(), HashSet::new());
1445 throttle.enqueue_identifier("B".into(), HashSet::new());
1446 throttle.enqueue_identifier("C".into(), HashSet::new());
1447
1448 // Process in round-robin order
1449 let mut order = vec![];
1450 for _ in 0..6 {
1451 if let Some(id) = throttle.next_ready_identifier() {
1452 order.push(id.clone());
1453 throttle.mark_identifier_not_in_progress(&id);
1454 }
1455 }
1456
1457 assert_eq!(order, vec!["A", "B", "C", "A", "B", "C"]);
1458 } 1486 }
1459 1487
1460 #[test] 1488 #[test]
1461 fn skips_in_progress_identifiers() { 1489 fn skips_in_progress_identifiers() {
1462 let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); 1490 // next_ready skips identifiers where in_progress=true
1463 1491 }
1464 throttle.enqueue_identifier("A".into(), HashSet::new()); 1492}
1465 throttle.enqueue_identifier("B".into(), HashSet::new()); 1493```
1466 1494
1467 // Get A (now in_progress) 1495**Success Criteria**:
1468 assert_eq!(throttle.next_ready_identifier(), Some("A".into())); 1496- [ ] Concurrent limit enforced (blocks at max_concurrent)
1469 1497- [ ] Rate limit enforced (blocks at max_per_minute within 60s window)
1470 // Next skips A, returns B 1498- [ ] Round-robin ordering maintained across calls
1471 assert_eq!(throttle.next_ready_identifier(), Some("B".into())); 1499- [ ] In-progress identifiers skipped
1472 1500- [ ] All 4 unit tests pass
1473 // Both in progress 1501
1474 assert_eq!(throttle.next_ready_identifier(), None); 1502---
1503
1504### Phase 3: ThrottleManager
1505
1506**Goal**: Implement the manager that owns all domain throttles and provides the sync interface.
1507
1508**Files**:
1509- `src/purgatory/sync/throttle.rs` (extend)
1510
1511**Deliverables**:
1512```rust
1513pub struct ThrottleManager {
1514 throttles: DashMap<String, DomainThrottle>,
1515 max_concurrent_per_domain: u32,
1516 max_per_minute_per_domain: u32,
1517}
1518
1519impl ThrottleManager {
1520 pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self;
1521 pub fn is_throttled(&self, domain: &str) -> bool;
1522 pub fn start_request(&self, domain: &str);
1523 pub fn complete_request(&self, domain: &str);
1524 pub fn enqueue_identifier(&self, domain: &str, identifier: String, tried_urls: HashSet<String>);
1525}
1526```
1527
1528**Unit Tests** (1 test):
1529```rust
1530#[cfg(test)]
1531mod tests {
1532 #[test]
1533 fn is_throttled_reflects_domain_capacity() {
1534 // is_throttled returns true when domain has no capacity
1475 } 1535 }
1476} 1536}
1477``` 1537```
1478 1538
1479#### Test Module 3: Retry & URL Selection 1539**Success Criteria**:
1540- [ ] `is_throttled()` correctly reflects domain capacity
1541- [ ] `start_request()`/`complete_request()` delegate to correct domain
1542- [ ] `enqueue_identifier()` creates domain throttle if needed
1543- [ ] Unit test passes
1544
1545---
1546
1547### Phase 4: SyncContext Trait and MockSyncContext
1548
1549**Goal**: Define the abstraction for sync operations and create the test mock.
1550
1551**Files**:
1552- `src/purgatory/sync/context.rs` (new)
1553
1554**Deliverables**:
1555```rust
1556#[async_trait]
1557pub trait SyncContext: Send + Sync {
1558 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData>;
1559 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String>;
1560 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>;
1561 async fn process_satisfiable_events(&self, identifier: &str) -> Result<ProcessResult>;
1562 fn has_pending_events(&self, identifier: &str) -> bool;
1563 fn find_target_repo(&self, data: &RepositoryData) -> Option<PathBuf>;
1564 fn our_domain(&self) -> Option<&str>;
1565}
1566
1567// Test support
1568#[cfg(test)]
1569pub struct MockSyncContext { ... }
1570```
1571
1572**Unit Tests** (0 tests):
1573- This phase creates infrastructure only; tests come in Phase 5
1574
1575**Success Criteria**:
1576- [ ] `SyncContext` trait compiles with all required methods
1577- [ ] `MockSyncContext` implements `SyncContext`
1578- [ ] Mock supports builder pattern for test setup
1579
1580---
1581
1582### Phase 5: Core Sync Functions
1480 1583
1481Tests `sync_identifier_next_url` behavior: 1584**Goal**: Implement `sync_identifier_next_url` and `sync_identifier_from_url`.
1482 1585
1586**Files**:
1587- `src/purgatory/sync/functions.rs` (new)
1588
1589**Deliverables**:
1590```rust
1591pub async fn sync_identifier_next_url<C: SyncContext>(
1592 ctx: &C,
1593 identifier: &str,
1594 domain: Option<&str>,
1595 tried_urls: &HashSet<String>,
1596 throttle_manager: &ThrottleManager,
1597) -> Option<String>;
1598
1599pub async fn sync_identifier_from_url<C: SyncContext>(
1600 ctx: &C,
1601 identifier: &str,
1602 url: &str,
1603 throttle_manager: &Arc<ThrottleManager>,
1604) -> usize;
1605```
1606
1607**Unit Tests** (3 tests):
1483```rust 1608```rust
1484mod retry_tests { 1609#[cfg(test)]
1610mod tests {
1485 #[tokio::test] 1611 #[tokio::test]
1486 async fn tries_urls_until_success() { 1612 async fn next_url_skips_throttled_domains() {
1487 let ctx = MockSyncContext::new() 1613 // When domain is throttled, next_url returns URL from different domain
1488 .with_urls(&["https://fail1.com/r.git", "https://fail2.com/r.git", "https://ok.com/r.git"])
1489 .with_needed_oids(&["abc123"])
1490 .url_provides("https://ok.com/r.git", &["abc123"]);
1491
1492 let throttle = Arc::new(ThrottleManager::new(5, 30));
1493 sync_identifier(&ctx, "test-repo", &throttle).await;
1494
1495 // Should have tried all 3 URLs
1496 assert_eq!(ctx.fetch_log.borrow().len(), 3);
1497 // OID should now be satisfied
1498 assert!(ctx.needed_oids.borrow().is_empty());
1499 } 1614 }
1500 1615
1501 #[tokio::test] 1616 #[tokio::test]
1502 async fn skips_throttled_domains() { 1617 async fn next_url_skips_tried_urls() {
1503 let ctx = MockSyncContext::new() 1618 // URLs in tried_urls set are not returned
1504 .with_urls(&["https://throttled.com/r.git", "https://ok.com/r.git"])
1505 .with_needed_oids(&["abc123"])
1506 .url_provides("https://ok.com/r.git", &["abc123"]);
1507
1508 let throttle = Arc::new(ThrottleManager::new(5, 30));
1509
1510 // Saturate throttled.com
1511 for _ in 0..5 {
1512 throttle.start_request("throttled.com");
1513 }
1514
1515 sync_identifier(&ctx, "test-repo", &throttle).await;
1516
1517 // Should only try ok.com (throttled.com skipped)
1518 let urls_tried: Vec<_> = ctx.fetch_log.borrow().iter().map(|(u, _)| u.clone()).collect();
1519 assert!(!urls_tried.contains(&"https://throttled.com/r.git".to_string()));
1520 assert!(urls_tried.contains(&"https://ok.com/r.git".to_string()));
1521 } 1619 }
1522 1620
1523 #[tokio::test] 1621 #[tokio::test]
1524 async fn enqueues_with_throttled_domains() { 1622 async fn from_url_fetches_and_processes_on_success() {
1525 let ctx = MockSyncContext::new() 1623 // Successful fetch triggers process_satisfiable_events
1526 .with_urls(&["https://throttled.com/r.git"])
1527 .with_needed_oids(&["abc123"]);
1528
1529 let throttle = Arc::new(ThrottleManager::new(1, 30));
1530 throttle.start_request("throttled.com"); // Saturate
1531
1532 sync_identifier(&ctx, "test-repo", &throttle).await;
1533
1534 // Should be enqueued with throttled.com for later
1535 assert!(throttle.has_queued_identifier("throttled.com", "test-repo"));
1536 } 1624 }
1537} 1625}
1538``` 1626```
1539 1627
1540#### Test Module 4: Partial OID Fetching 1628**Success Criteria**:
1629- [ ] `sync_identifier_next_url` returns non-throttled, untried URL
1630- [ ] `sync_identifier_next_url` returns `None` when all URLs tried or throttled
1631- [ ] `sync_identifier_from_url` calls `fetch_oids` and `process_satisfiable_events`
1632- [ ] All 3 unit tests pass
1541 1633
1542Tests behavior when servers have different subsets of OIDs: 1634---
1543 1635
1636### Phase 6: sync_identifier Orchestration
1637
1638**Goal**: Implement the main sync loop for a single identifier.
1639
1640**Files**:
1641- `src/purgatory/sync/functions.rs` (extend)
1642
1643**Deliverables**:
1544```rust 1644```rust
1545mod partial_oid_tests { 1645pub async fn sync_identifier<C: SyncContext>(
1646 ctx: &C,
1647 identifier: &str,
1648 throttle_manager: &Arc<ThrottleManager>,
1649) -> bool; // true if complete, false if pending
1650```
1651
1652**Unit Tests** (2 tests):
1653```rust
1654#[cfg(test)]
1655mod tests {
1546 #[tokio::test] 1656 #[tokio::test]
1547 async fn aggregates_oids_from_multiple_servers() { 1657 async fn tries_multiple_urls_until_complete() {
1548 // Server A has oid1, Server B has oid2 1658 // Tries URL1 (partial), URL2 (partial), URL3 (complete) → returns true
1549 let ctx = MockSyncContext::new()
1550 .with_urls(&["https://a.com/r.git", "https://b.com/r.git"])
1551 .with_needed_oids(&["oid1", "oid2"])
1552 .url_provides("https://a.com/r.git", &["oid1"])
1553 .url_provides("https://b.com/r.git", &["oid2"]);
1554
1555 let throttle = Arc::new(ThrottleManager::new(5, 30));
1556 let complete = sync_identifier(&ctx, "test-repo", &throttle).await;
1557
1558 assert!(complete);
1559 assert!(ctx.needed_oids.borrow().is_empty());
1560 } 1659 }
1561 1660
1562 #[tokio::test] 1661 #[tokio::test]
1563 async fn incomplete_when_oids_unavailable() { 1662 async fn enqueues_throttled_domains_when_incomplete() {
1564 // No server has oid2 1663 // When URLs remain but are throttled, enqueues and returns false
1565 let ctx = MockSyncContext::new()
1566 .with_urls(&["https://a.com/r.git", "https://b.com/r.git"])
1567 .with_needed_oids(&["oid1", "oid2"])
1568 .url_provides("https://a.com/r.git", &["oid1"])
1569 .url_provides("https://b.com/r.git", &["oid1"]); // Same OID, missing oid2
1570
1571 let throttle = Arc::new(ThrottleManager::new(5, 30));
1572 let complete = sync_identifier(&ctx, "test-repo", &throttle).await;
1573
1574 assert!(!complete);
1575 assert!(ctx.needed_oids.borrow().contains("oid2"));
1576 } 1664 }
1577} 1665}
1578``` 1666```
1579 1667
1580#### Test Module 5: Burst Debouncing 1668**Success Criteria**:
1669- [ ] Loops through available URLs until sync complete or all tried
1670- [ ] Enqueues with throttled domains when OIDs still needed
1671- [ ] Returns `true` when all OIDs fetched, `false` otherwise
1672- [ ] Both unit tests pass
1673
1674---
1675
1676### Phase 7: Purgatory Sync Queue Integration
1677
1678**Goal**: Add sync queue to Purgatory and implement `enqueue_sync`.
1581 1679
1582Tests that rapid event arrivals are debounced: 1680**Files**:
1681- `src/purgatory/mod.rs` (extend)
1583 1682
1683**Deliverables**:
1584```rust 1684```rust
1585mod debounce_tests { 1685impl Purgatory {
1586 #[tokio::test] 1686 // New field: sync_queue: Arc<DashMap<String, SyncQueueEntry>>
1587 async fn burst_events_debounced() { 1687
1588 tokio::time::pause(); 1688 pub fn enqueue_sync(&self, identifier: &str, delay: Duration);
1589 1689 pub fn has_pending_events(&self, identifier: &str) -> bool;
1590 let queue = SyncQueue::new(); 1690}
1591 1691```
1592 // Simulate 10 events arriving in 100ms 1692
1593 for i in 0..10 { 1693**Unit Tests** (1 test):
1594 queue.enqueue("test-repo", Duration::from_millis(500)); 1694```rust
1595 tokio::time::advance(Duration::from_millis(10)).await; 1695#[cfg(test)]
1596 } 1696mod tests {
1597 1697 #[test]
1598 // Should only have one entry (debounced) 1698 fn enqueue_sync_debounces_rapid_calls() {
1599 assert_eq!(queue.len(), 1); 1699 // Multiple enqueue_sync calls within delay window result in single entry
1600
1601 // Entry should be ready after 500ms from last event
1602 tokio::time::advance(Duration::from_millis(400)).await;
1603 assert!(!queue.get("test-repo").unwrap().is_ready());
1604
1605 tokio::time::advance(Duration::from_millis(100)).await;
1606 assert!(queue.get("test-repo").unwrap().is_ready());
1607 } 1700 }
1608} 1701}
1609``` 1702```
1610 1703
1611### Integration Tests (tests/purgatory_sync.rs) 1704**Success Criteria**:
1705- [ ] `enqueue_sync` adds/updates entry in sync_queue
1706- [ ] Rapid calls debounce (don't create multiple entries)
1707- [ ] `has_pending_events` checks both state_events and pr_events
1708- [ ] Unit test passes
1709
1710---
1711
1712### Phase 8: Main Sync Loop
1612 1713
1613Integration tests verify end-to-end behavior with real relay instances. Keep these minimal and focused on outcomes. 1714**Goal**: Implement the background sync loop that processes ready identifiers.
1614 1715
1716**Files**:
1717- `src/purgatory/sync/loop.rs` (new)
1718
1719**Deliverables**:
1615```rust 1720```rust
1616//! Purgatory Sync Integration Tests 1721impl Purgatory {
1617//! 1722 pub fn start_sync_loop(
1618//! These tests verify that state and PR events sync correctly between 1723 self: Arc<Self>,
1619//! ngit-grasp instances, including git data fetching. 1724 ctx: Arc<dyn SyncContext>,
1725 throttle_manager: Arc<ThrottleManager>,
1726 ) -> JoinHandle<()>;
1727}
1728```
1620 1729
1621mod common; 1730**Unit Tests** (0 tests):
1622use common::{TestRelay, TestRepo, create_state_event, create_pr_event}; 1731- The sync loop is tested via integration tests; unit testing async loops is fragile
1623 1732
1624/// Core test: State event syncs with git data 1733**Success Criteria**:
1625/// 1734- [ ] Loop runs every 1 second
1626/// Verifies the complete purgatory workflow: 1735- [ ] Finds ready identifiers and spawns sync tasks
1627/// 1. State event arrives at relay B (no git data) 1736- [ ] Applies backoff on incomplete syncs
1628/// 2. Event enters purgatory 1737- [ ] Removes completed identifiers from queue
1629/// 3. Git data fetched from relay A 1738
1630/// 4. Event released and served 1739---
1740
1741### Phase 9: RealSyncContext Implementation
1742
1743**Goal**: Implement the production `SyncContext` that connects to real systems.
1744
1745**Files**:
1746- `src/purgatory/sync/context.rs` (extend)
1747
1748**Deliverables**:
1749```rust
1750pub struct RealSyncContext {
1751 purgatory: Purgatory,
1752 database: SharedDatabase,
1753 git_data_path: PathBuf,
1754 our_domain: Option<String>,
1755 local_relay: Option<LocalRelay>,
1756}
1757
1758impl SyncContext for RealSyncContext { ... }
1759```
1760
1761**Unit Tests** (0 tests):
1762- `RealSyncContext` is tested via integration tests
1763
1764**Success Criteria**:
1765- [ ] All `SyncContext` methods implemented
1766- [ ] Connects to real database, git, and relay
1767- [ ] `process_satisfiable_events` releases events from purgatory
1768
1769---
1770
1771### Phase 10: Integration Tests
1772
1773**Goal**: Verify end-to-end sync behavior with real relay instances.
1774
1775**Files**:
1776- `tests/purgatory_sync.rs` (new)
1777
1778**Integration Tests** (4 tests):
1779```rust
1631#[tokio::test] 1780#[tokio::test]
1632async fn state_event_syncs_from_remote() { 1781async fn state_event_syncs_from_remote() {
1633 // Setup: Two relays, relay A has repo with git data 1782 // State event enters purgatory, git data fetched, event released
1634 let relay_a = TestRelay::start().await;
1635 let repo = TestRepo::create_with_commits(&relay_a, 3).await;
1636
1637 // Relay B configured to sync from A
1638 let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await;
1639
1640 // Publish state event to relay B (git data not present)
1641 let state = create_state_event(&repo);
1642 relay_b.publish(&state).await;
1643
1644 // Wait for sync (with timeout)
1645 let synced = relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)).await;
1646 assert!(synced, "State event should be served after git sync");
1647
1648 // Verify git data is now available
1649 assert!(relay_b.can_clone(&repo.identifier()).await);
1650} 1783}
1651 1784
1652/// Core test: PR event syncs with commit data
1653#[tokio::test] 1785#[tokio::test]
1654async fn pr_event_syncs_from_remote() { 1786async fn pr_event_syncs_from_remote() {
1655 let relay_a = TestRelay::start().await; 1787 // PR event enters purgatory, commit fetched, event released
1656 let repo = TestRepo::create_with_commits(&relay_a, 1).await;
1657
1658 // Create PR with a new commit
1659 let pr_commit = repo.create_pr_commit().await;
1660 let pr_event = create_pr_event(&repo, &pr_commit);
1661 relay_a.publish(&pr_event).await;
1662 relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await;
1663
1664 // Relay B syncs
1665 let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await;
1666 relay_b.publish(&pr_event).await;
1667
1668 let synced = relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)).await;
1669 assert!(synced, "PR event should be served after git sync");
1670} 1788}
1671 1789
1672/// Edge case: Concurrent state and PR events for same repo
1673///
1674/// Verifies that both event types sync correctly when arriving together.
1675#[tokio::test] 1790#[tokio::test]
1676async fn concurrent_state_and_pr_sync() { 1791async fn concurrent_state_and_pr_sync() {
1677 let relay_a = TestRelay::start().await; 1792 // Both event types sync correctly when arriving together
1678 let repo = TestRepo::create_with_commits(&relay_a, 2).await;
1679
1680 let state = create_state_event(&repo);
1681 let pr_commit = repo.create_pr_commit().await;
1682 let pr_event = create_pr_event(&repo, &pr_commit);
1683
1684 relay_a.publish(&state).await;
1685 relay_a.publish(&pr_event).await;
1686 relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await;
1687
1688 let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await;
1689
1690 // Publish both simultaneously
1691 tokio::join!(
1692 relay_b.publish(&state),
1693 relay_b.publish(&pr_event),
1694 );
1695
1696 // Both should sync
1697 let (state_synced, pr_synced) = tokio::join!(
1698 relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)),
1699 relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)),
1700 );
1701
1702 assert!(state_synced && pr_synced, "Both events should sync");
1703} 1793}
1704 1794
1705/// Edge case: Server has subset of required OIDs
1706///
1707/// Verifies aggregation from multiple sources when no single
1708/// server has all required OIDs.
1709#[tokio::test] 1795#[tokio::test]
1710async fn partial_oid_aggregation() { 1796async fn partial_oid_aggregation_from_multiple_servers() {
1711 // Relay A has commits 1-2, Relay B has commits 2-3 1797 // OIDs aggregated when no single server has all
1712 let relay_a = TestRelay::start().await;
1713 let relay_b = TestRelay::start().await;
1714
1715 let repo_a = TestRepo::create_with_commits(&relay_a, 2).await;
1716 let repo_b = TestRepo::create_with_commits(&relay_b, 2).await;
1717 repo_b.push_additional_commits(1).await; // Commit 3 only on B
1718
1719 // State references commits 1, 2, and 3
1720 let state = create_state_event_with_commits(&[
1721 repo_a.commit(0),
1722 repo_a.commit(1),
1723 repo_b.commit(2),
1724 ]);
1725
1726 // Relay C syncs from both A and B
1727 let relay_c = TestRelay::start_with_sync_sources(&[relay_a.url(), relay_b.url()]).await;
1728 relay_c.publish(&state).await;
1729
1730 let synced = relay_c.wait_for_event_served(&state.id, Duration::from_secs(15)).await;
1731 assert!(synced, "Should aggregate OIDs from multiple sources");
1732} 1798}
1733``` 1799```
1734 1800
1735### Test Summary 1801**Success Criteria**:
1736 1802- [ ] All 4 integration tests pass
1737| Category | Test Count | Purpose | 1803- [ ] State events release after git sync
1738|----------|------------|---------| 1804- [ ] PR events release after commit sync
1739| Backoff | 2 | Verify exponential backoff timing and reset | 1805- [ ] Partial OID scenarios handled correctly
1740| Throttle | 4 | Verify rate limiting, concurrency, round-robin | 1806
1741| Retry | 3 | Verify URL iteration and throttle-skip behavior | 1807---
1742| Partial OIDs | 2 | Verify OID aggregation from multiple servers | 1808
1743| Debounce | 1 | Verify burst event handling | 1809### Phase 11: Cleanup
1744| **Unit Total** | **12** | | 1810
1745| Integration | 4 | End-to-end sync verification | 1811**Goal**: Remove old `start_state_sync` code and wire up new system.
1746| **Grand Total** | **16** | | 1812
1747 1813**Files**:
1748This is a focused set that covers the key behaviors without redundant tests. 1814- `src/purgatory/mod.rs` (modify)
1749 1815- `src/main.rs` (modify)
1750## Migration Path 1816
1751 1817**Deliverables**:
17521. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) 1818- Remove `start_state_sync` method
17532. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests 1819- Wire `start_sync_loop` into application startup
17543. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` 1820- Update `add_state` to call `enqueue_sync`
17554. **Phase 4**: Implement ThrottleManager trigger-based processing 1821
17565. **Phase 5**: Add PR event syncing 1822**Success Criteria**:
17576. **Phase 6**: Remove old `start_state_sync` code 1823- [ ] Old sync code removed
1824- [ ] New sync loop starts on application boot
1825- [ ] All existing tests still pass
1826- [ ] All new tests pass
1827
1828---
1829
1830## Test Summary
1831
1832| Phase | Unit Tests | Integration Tests | Total |
1833|-------|------------|-------------------|-------|
1834| 1. SyncQueueEntry | 2 | - | 2 |
1835| 2. DomainThrottle | 4 | - | 4 |
1836| 3. ThrottleManager | 1 | - | 1 |
1837| 4. SyncContext | 0 | - | 0 |
1838| 5. Core Functions | 3 | - | 3 |
1839| 6. sync_identifier | 2 | - | 2 |
1840| 7. Queue Integration | 1 | - | 1 |
1841| 8. Sync Loop | 0 | - | 0 |
1842| 9. RealSyncContext | 0 | - | 0 |
1843| 10. Integration | - | 4 | 4 |
1844| 11. Cleanup | 0 | - | 0 |
1845| **Total** | **13** | **4** | **17** |
1758 1846
1759## Configuration 1847## Configuration
1760 1848