diff options
Diffstat (limited to 'docs/explanation/purgatory-sync-redesign.md')
| -rw-r--r-- | docs/explanation/purgatory-sync-redesign.md | 742 |
1 files changed, 415 insertions, 327 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index 8eaed6c..54e279a 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md | |||
| @@ -1331,430 +1331,518 @@ impl Purgatory { | |||
| 1331 | 1331 | ||
| 1332 | ## Testing Strategy | 1332 | ## Testing Strategy |
| 1333 | 1333 | ||
| 1334 | The testing strategy follows a clear separation: **unit tests** verify the core sync mechanics (retry, backoff, throttling, round-robin) using mocks, while **integration tests** verify end-to-end behavior with real relay instances. | 1334 | Tests are created **only** as part of each implementation phase. See [Implementation Phases](#implementation-phases) for the complete test plan. |
| 1335 | 1335 | ||
| 1336 | ### Design Principles | 1336 | ### Design Principles |
| 1337 | 1337 | ||
| 1338 | 1. **Unit tests for mechanics**: Test retry logic, backoff timing, throttle behavior, and queue management in isolation | 1338 | 1. **Tests accompany code**: Each phase specifies exactly which tests to create |
| 1339 | 2. **Integration tests for outcomes**: Verify that events sync correctly, not the internal mechanics | 1339 | 2. **Unit tests for mechanics**: Test backoff, throttle, retry logic in isolation using mocks |
| 1340 | 3. **Avoid testing implementation details**: Don't test every code path; test observable behaviors | 1340 | 3. **Integration tests for outcomes**: Verify events sync correctly end-to-end |
| 1341 | 4. **Focus on edge cases that matter**: Partial OID availability, server failures, concurrent events | 1341 | 4. **No speculative tests**: Don't create tests for code that doesn't exist yet |
| 1342 | 1342 | ||
| 1343 | ### Unit Tests (src/purgatory/sync.rs) | 1343 | ### MockSyncContext |
| 1344 | 1344 | ||
| 1345 | Unit tests use `MockSyncContext` to test sync logic without I/O. The mock is simple: | 1345 | Phases 4-6 use `MockSyncContext` to test sync logic without I/O: |
| 1346 | 1346 | ||
| 1347 | ```rust | 1347 | ```rust |
| 1348 | /// Mock context for testing sync logic | 1348 | /// Mock context for testing sync logic |
| 1349 | struct MockSyncContext { | 1349 | #[cfg(test)] |
| 1350 | /// URLs available for fetching, grouped by domain | 1350 | pub struct MockSyncContext { |
| 1351 | urls_by_domain: HashMap<String, Vec<String>>, | 1351 | /// Repository data to return |
| 1352 | /// Which OIDs each URL can provide | 1352 | repo_data: RepositoryData, |
| 1353 | url_provides_oids: HashMap<String, HashSet<String>>, | ||
| 1354 | /// OIDs still needed (decremented when "fetched") | 1353 | /// OIDs still needed (decremented when "fetched") |
| 1355 | needed_oids: RefCell<HashSet<String>>, | 1354 | needed_oids: RefCell<HashSet<String>>, |
| 1355 | /// Which OIDs each URL can provide | ||
| 1356 | url_provides_oids: HashMap<String, HashSet<String>>, | ||
| 1356 | /// Track fetch attempts for assertions | 1357 | /// Track fetch attempts for assertions |
| 1357 | fetch_log: RefCell<Vec<(String, Vec<String>)>>, // (url, oids_requested) | 1358 | fetch_log: RefCell<Vec<String>>, |
| 1359 | /// Whether there are pending events | ||
| 1360 | has_pending: RefCell<bool>, | ||
| 1361 | } | ||
| 1362 | |||
| 1363 | impl MockSyncContext { | ||
| 1364 | pub fn new() -> Self; | ||
| 1365 | pub fn with_urls(self, urls: &[&str]) -> Self; | ||
| 1366 | pub fn with_needed_oids(self, oids: &[&str]) -> Self; | ||
| 1367 | pub fn url_provides(self, url: &str, oids: &[&str]) -> Self; | ||
| 1358 | } | 1368 | } |
| 1359 | ``` | 1369 | ``` |
| 1360 | 1370 | ||
| 1361 | #### Test Module 1: Backoff Behavior | 1371 | ### Test Locations |
| 1372 | |||
| 1373 | | Test Type | Location | Created In | | ||
| 1374 | |-----------|----------|------------| | ||
| 1375 | | SyncQueueEntry | `src/purgatory/sync/queue.rs` | Phase 1 | | ||
| 1376 | | DomainThrottle | `src/purgatory/sync/throttle.rs` | Phase 2 | | ||
| 1377 | | ThrottleManager | `src/purgatory/sync/throttle.rs` | Phase 3 | | ||
| 1378 | | Core sync functions | `src/purgatory/sync/functions.rs` | Phase 5-6 | | ||
| 1379 | | Queue integration | `src/purgatory/mod.rs` | Phase 7 | | ||
| 1380 | | Integration tests | `tests/purgatory_sync.rs` | Phase 10 | | ||
| 1381 | |||
| 1382 | ## Implementation Phases | ||
| 1383 | |||
| 1384 | Each phase has clear deliverables, unit tests, and success criteria. Unit tests are created **only** for the code built in that phase. | ||
| 1385 | |||
| 1386 | --- | ||
| 1387 | |||
| 1388 | ### Phase 1: SyncQueueEntry with Backoff | ||
| 1389 | |||
| 1390 | **Goal**: Implement the sync queue entry struct with backoff calculation. | ||
| 1391 | |||
| 1392 | **Files**: | ||
| 1393 | - `src/purgatory/sync/queue.rs` (new) | ||
| 1362 | 1394 | ||
| 1363 | Tests the `SyncQueueEntry` backoff calculation and state transitions: | 1395 | **Deliverables**: |
| 1396 | ```rust | ||
| 1397 | pub struct SyncQueueEntry { | ||
| 1398 | pub next_attempt: Instant, | ||
| 1399 | pub attempt_count: u32, | ||
| 1400 | pub in_progress: bool, | ||
| 1401 | } | ||
| 1364 | 1402 | ||
| 1403 | impl SyncQueueEntry { | ||
| 1404 | pub fn new(delay: Duration) -> Self; | ||
| 1405 | pub fn backoff(&self) -> Duration; | ||
| 1406 | pub fn is_ready(&self) -> bool; | ||
| 1407 | pub fn on_new_event(&mut self, delay: Duration); | ||
| 1408 | pub fn on_sync_complete(&mut self); | ||
| 1409 | } | ||
| 1410 | ``` | ||
| 1411 | |||
| 1412 | **Unit Tests** (2 tests): | ||
| 1365 | ```rust | 1413 | ```rust |
| 1366 | mod backoff_tests { | 1414 | #[cfg(test)] |
| 1415 | mod tests { | ||
| 1367 | #[test] | 1416 | #[test] |
| 1368 | fn backoff_doubles_up_to_cap() { | 1417 | fn backoff_doubles_up_to_cap() { |
| 1369 | // Verify: 20s → 40s → 80s → 120s (capped) | 1418 | // 20s → 40s → 80s → 120s → 120s (capped) |
| 1370 | let mut entry = SyncQueueEntry::new(Duration::ZERO); | ||
| 1371 | |||
| 1372 | entry.on_sync_complete(); | ||
| 1373 | assert_eq!(entry.backoff(), Duration::from_secs(20)); | ||
| 1374 | |||
| 1375 | entry.on_sync_complete(); | ||
| 1376 | assert_eq!(entry.backoff(), Duration::from_secs(40)); | ||
| 1377 | |||
| 1378 | entry.on_sync_complete(); | ||
| 1379 | assert_eq!(entry.backoff(), Duration::from_secs(80)); | ||
| 1380 | |||
| 1381 | entry.on_sync_complete(); | ||
| 1382 | assert_eq!(entry.backoff(), Duration::from_secs(120)); | ||
| 1383 | |||
| 1384 | // Stays capped | ||
| 1385 | entry.on_sync_complete(); | ||
| 1386 | assert_eq!(entry.backoff(), Duration::from_secs(120)); | ||
| 1387 | } | 1419 | } |
| 1388 | 1420 | ||
| 1389 | #[test] | 1421 | #[test] |
| 1390 | fn new_event_resets_backoff() { | 1422 | fn new_event_resets_attempt_count() { |
| 1391 | // New event for same identifier resets attempt_count | 1423 | // on_new_event() resets attempt_count to 0 |
| 1392 | let mut entry = SyncQueueEntry::new(Duration::ZERO); | ||
| 1393 | entry.on_sync_complete(); | ||
| 1394 | entry.on_sync_complete(); | ||
| 1395 | assert!(entry.attempt_count > 0); | ||
| 1396 | |||
| 1397 | entry.on_new_event(Duration::from_millis(500)); | ||
| 1398 | assert_eq!(entry.attempt_count, 0); | ||
| 1399 | } | 1424 | } |
| 1400 | } | 1425 | } |
| 1401 | ``` | 1426 | ``` |
| 1402 | 1427 | ||
| 1403 | #### Test Module 2: Throttle & Rate Limiting | 1428 | **Success Criteria**: |
| 1429 | - [ ] `SyncQueueEntry::new()` creates entry with given delay | ||
| 1430 | - [ ] `backoff()` returns 20s, 40s, 80s, 120s, 120s for attempts 1-5 | ||
| 1431 | - [ ] `on_new_event()` resets `attempt_count` to 0 | ||
| 1432 | - [ ] `on_sync_complete()` increments `attempt_count` and updates `next_attempt` | ||
| 1433 | - [ ] Both unit tests pass | ||
| 1434 | |||
| 1435 | --- | ||
| 1436 | |||
| 1437 | ### Phase 2: DomainThrottle with Rate Limiting and Round-Robin | ||
| 1438 | |||
| 1439 | **Goal**: Implement per-domain throttling with concurrent/rate limits and fair queue processing. | ||
| 1440 | |||
| 1441 | **Files**: | ||
| 1442 | - `src/purgatory/sync/throttle.rs` (new) | ||
| 1443 | |||
| 1444 | **Deliverables**: | ||
| 1445 | ```rust | ||
| 1446 | pub struct DomainThrottle { | ||
| 1447 | domain: String, | ||
| 1448 | in_flight: u32, | ||
| 1449 | request_times: VecDeque<Instant>, | ||
| 1450 | queue: IndexMap<String, IdentifierQueueState>, | ||
| 1451 | round_robin_index: usize, | ||
| 1452 | max_concurrent: u32, | ||
| 1453 | max_per_minute: u32, | ||
| 1454 | } | ||
| 1404 | 1455 | ||
| 1405 | Tests `DomainThrottle` capacity checks and rate limiting: | 1456 | impl DomainThrottle { |
| 1457 | pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self; | ||
| 1458 | pub fn has_capacity(&self) -> bool; | ||
| 1459 | pub fn start_request(&mut self); | ||
| 1460 | pub fn complete_request(&mut self); | ||
| 1461 | pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet<String>); | ||
| 1462 | pub fn next_ready_identifier(&mut self) -> Option<String>; | ||
| 1463 | pub fn mark_identifier_not_in_progress(&mut self, identifier: &str); | ||
| 1464 | pub fn remove_identifier(&mut self, identifier: &str); | ||
| 1465 | } | ||
| 1466 | ``` | ||
| 1406 | 1467 | ||
| 1468 | **Unit Tests** (4 tests): | ||
| 1407 | ```rust | 1469 | ```rust |
| 1408 | mod throttle_tests { | 1470 | #[cfg(test)] |
| 1471 | mod tests { | ||
| 1409 | #[test] | 1472 | #[test] |
| 1410 | fn concurrent_limit_enforced() { | 1473 | fn concurrent_limit_blocks_when_saturated() { |
| 1411 | let mut throttle = DomainThrottle::new("example.com".into(), 2, 100); | 1474 | // has_capacity() returns false when in_flight >= max_concurrent |
| 1412 | |||
| 1413 | assert!(throttle.has_capacity()); | ||
| 1414 | throttle.start_request(); | ||
| 1415 | assert!(throttle.has_capacity()); | ||
| 1416 | throttle.start_request(); | ||
| 1417 | assert!(!throttle.has_capacity()); // At limit | ||
| 1418 | |||
| 1419 | throttle.complete_request(); | ||
| 1420 | assert!(throttle.has_capacity()); // Capacity freed | ||
| 1421 | } | 1475 | } |
| 1422 | 1476 | ||
| 1423 | #[test] | 1477 | #[test] |
| 1424 | fn rate_limit_enforced() { | 1478 | fn rate_limit_blocks_when_window_full() { |
| 1425 | let mut throttle = DomainThrottle::new("example.com".into(), 100, 2); | 1479 | // has_capacity() returns false when requests in last 60s >= max_per_minute |
| 1426 | 1480 | // Use deterministic time (pass Instant or mock clock) | |
| 1427 | // Make 2 requests (at rate limit) | ||
| 1428 | throttle.start_request(); | ||
| 1429 | throttle.complete_request(); | ||
| 1430 | throttle.start_request(); | ||
| 1431 | throttle.complete_request(); | ||
| 1432 | |||
| 1433 | assert!(!throttle.has_capacity()); // Rate limited | ||
| 1434 | |||
| 1435 | // After 60s window passes, capacity returns | ||
| 1436 | // (In real tests, use tokio::time::pause/advance) | ||
| 1437 | } | 1481 | } |
| 1438 | 1482 | ||
| 1439 | #[test] | 1483 | #[test] |
| 1440 | fn round_robin_fairness() { | 1484 | fn round_robin_processes_identifiers_fairly() { |
| 1441 | let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); | 1485 | // Enqueue A, B, C → next_ready returns A, B, C, A, B, C... |
| 1442 | |||
| 1443 | // Enqueue 3 identifiers | ||
| 1444 | throttle.enqueue_identifier("A".into(), HashSet::new()); | ||
| 1445 | throttle.enqueue_identifier("B".into(), HashSet::new()); | ||
| 1446 | throttle.enqueue_identifier("C".into(), HashSet::new()); | ||
| 1447 | |||
| 1448 | // Process in round-robin order | ||
| 1449 | let mut order = vec![]; | ||
| 1450 | for _ in 0..6 { | ||
| 1451 | if let Some(id) = throttle.next_ready_identifier() { | ||
| 1452 | order.push(id.clone()); | ||
| 1453 | throttle.mark_identifier_not_in_progress(&id); | ||
| 1454 | } | ||
| 1455 | } | ||
| 1456 | |||
| 1457 | assert_eq!(order, vec!["A", "B", "C", "A", "B", "C"]); | ||
| 1458 | } | 1486 | } |
| 1459 | 1487 | ||
| 1460 | #[test] | 1488 | #[test] |
| 1461 | fn skips_in_progress_identifiers() { | 1489 | fn skips_in_progress_identifiers() { |
| 1462 | let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); | 1490 | // next_ready skips identifiers where in_progress=true |
| 1463 | 1491 | } | |
| 1464 | throttle.enqueue_identifier("A".into(), HashSet::new()); | 1492 | } |
| 1465 | throttle.enqueue_identifier("B".into(), HashSet::new()); | 1493 | ``` |
| 1466 | 1494 | ||
| 1467 | // Get A (now in_progress) | 1495 | **Success Criteria**: |
| 1468 | assert_eq!(throttle.next_ready_identifier(), Some("A".into())); | 1496 | - [ ] Concurrent limit enforced (blocks at max_concurrent) |
| 1469 | 1497 | - [ ] Rate limit enforced (blocks at max_per_minute within 60s window) | |
| 1470 | // Next skips A, returns B | 1498 | - [ ] Round-robin ordering maintained across calls |
| 1471 | assert_eq!(throttle.next_ready_identifier(), Some("B".into())); | 1499 | - [ ] In-progress identifiers skipped |
| 1472 | 1500 | - [ ] All 4 unit tests pass | |
| 1473 | // Both in progress | 1501 | |
| 1474 | assert_eq!(throttle.next_ready_identifier(), None); | 1502 | --- |
| 1503 | |||
| 1504 | ### Phase 3: ThrottleManager | ||
| 1505 | |||
| 1506 | **Goal**: Implement the manager that owns all domain throttles and provides the sync interface. | ||
| 1507 | |||
| 1508 | **Files**: | ||
| 1509 | - `src/purgatory/sync/throttle.rs` (extend) | ||
| 1510 | |||
| 1511 | **Deliverables**: | ||
| 1512 | ```rust | ||
| 1513 | pub struct ThrottleManager { | ||
| 1514 | throttles: DashMap<String, DomainThrottle>, | ||
| 1515 | max_concurrent_per_domain: u32, | ||
| 1516 | max_per_minute_per_domain: u32, | ||
| 1517 | } | ||
| 1518 | |||
| 1519 | impl ThrottleManager { | ||
| 1520 | pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self; | ||
| 1521 | pub fn is_throttled(&self, domain: &str) -> bool; | ||
| 1522 | pub fn start_request(&self, domain: &str); | ||
| 1523 | pub fn complete_request(&self, domain: &str); | ||
| 1524 | pub fn enqueue_identifier(&self, domain: &str, identifier: String, tried_urls: HashSet<String>); | ||
| 1525 | } | ||
| 1526 | ``` | ||
| 1527 | |||
| 1528 | **Unit Tests** (1 test): | ||
| 1529 | ```rust | ||
| 1530 | #[cfg(test)] | ||
| 1531 | mod tests { | ||
| 1532 | #[test] | ||
| 1533 | fn is_throttled_reflects_domain_capacity() { | ||
| 1534 | // is_throttled returns true when domain has no capacity | ||
| 1475 | } | 1535 | } |
| 1476 | } | 1536 | } |
| 1477 | ``` | 1537 | ``` |
| 1478 | 1538 | ||
| 1479 | #### Test Module 3: Retry & URL Selection | 1539 | **Success Criteria**: |
| 1540 | - [ ] `is_throttled()` correctly reflects domain capacity | ||
| 1541 | - [ ] `start_request()`/`complete_request()` delegate to correct domain | ||
| 1542 | - [ ] `enqueue_identifier()` creates domain throttle if needed | ||
| 1543 | - [ ] Unit test passes | ||
| 1544 | |||
| 1545 | --- | ||
| 1546 | |||
| 1547 | ### Phase 4: SyncContext Trait and MockSyncContext | ||
| 1548 | |||
| 1549 | **Goal**: Define the abstraction for sync operations and create the test mock. | ||
| 1550 | |||
| 1551 | **Files**: | ||
| 1552 | - `src/purgatory/sync/context.rs` (new) | ||
| 1553 | |||
| 1554 | **Deliverables**: | ||
| 1555 | ```rust | ||
| 1556 | #[async_trait] | ||
| 1557 | pub trait SyncContext: Send + Sync { | ||
| 1558 | async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData>; | ||
| 1559 | fn collect_needed_oids(&self, identifier: &str) -> HashSet<String>; | ||
| 1560 | async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>; | ||
| 1561 | async fn process_satisfiable_events(&self, identifier: &str) -> Result<ProcessResult>; | ||
| 1562 | fn has_pending_events(&self, identifier: &str) -> bool; | ||
| 1563 | fn find_target_repo(&self, data: &RepositoryData) -> Option<PathBuf>; | ||
| 1564 | fn our_domain(&self) -> Option<&str>; | ||
| 1565 | } | ||
| 1566 | |||
| 1567 | // Test support | ||
| 1568 | #[cfg(test)] | ||
| 1569 | pub struct MockSyncContext { ... } | ||
| 1570 | ``` | ||
| 1571 | |||
| 1572 | **Unit Tests** (0 tests): | ||
| 1573 | - This phase creates infrastructure only; tests come in Phase 5 | ||
| 1574 | |||
| 1575 | **Success Criteria**: | ||
| 1576 | - [ ] `SyncContext` trait compiles with all required methods | ||
| 1577 | - [ ] `MockSyncContext` implements `SyncContext` | ||
| 1578 | - [ ] Mock supports builder pattern for test setup | ||
| 1579 | |||
| 1580 | --- | ||
| 1581 | |||
| 1582 | ### Phase 5: Core Sync Functions | ||
| 1480 | 1583 | ||
| 1481 | Tests `sync_identifier_next_url` behavior: | 1584 | **Goal**: Implement `sync_identifier_next_url` and `sync_identifier_from_url`. |
| 1482 | 1585 | ||
| 1586 | **Files**: | ||
| 1587 | - `src/purgatory/sync/functions.rs` (new) | ||
| 1588 | |||
| 1589 | **Deliverables**: | ||
| 1590 | ```rust | ||
| 1591 | pub async fn sync_identifier_next_url<C: SyncContext>( | ||
| 1592 | ctx: &C, | ||
| 1593 | identifier: &str, | ||
| 1594 | domain: Option<&str>, | ||
| 1595 | tried_urls: &HashSet<String>, | ||
| 1596 | throttle_manager: &ThrottleManager, | ||
| 1597 | ) -> Option<String>; | ||
| 1598 | |||
| 1599 | pub async fn sync_identifier_from_url<C: SyncContext>( | ||
| 1600 | ctx: &C, | ||
| 1601 | identifier: &str, | ||
| 1602 | url: &str, | ||
| 1603 | throttle_manager: &Arc<ThrottleManager>, | ||
| 1604 | ) -> usize; | ||
| 1605 | ``` | ||
| 1606 | |||
| 1607 | **Unit Tests** (3 tests): | ||
| 1483 | ```rust | 1608 | ```rust |
| 1484 | mod retry_tests { | 1609 | #[cfg(test)] |
| 1610 | mod tests { | ||
| 1485 | #[tokio::test] | 1611 | #[tokio::test] |
| 1486 | async fn tries_urls_until_success() { | 1612 | async fn next_url_skips_throttled_domains() { |
| 1487 | let ctx = MockSyncContext::new() | 1613 | // When domain is throttled, next_url returns URL from different domain |
| 1488 | .with_urls(&["https://fail1.com/r.git", "https://fail2.com/r.git", "https://ok.com/r.git"]) | ||
| 1489 | .with_needed_oids(&["abc123"]) | ||
| 1490 | .url_provides("https://ok.com/r.git", &["abc123"]); | ||
| 1491 | |||
| 1492 | let throttle = Arc::new(ThrottleManager::new(5, 30)); | ||
| 1493 | sync_identifier(&ctx, "test-repo", &throttle).await; | ||
| 1494 | |||
| 1495 | // Should have tried all 3 URLs | ||
| 1496 | assert_eq!(ctx.fetch_log.borrow().len(), 3); | ||
| 1497 | // OID should now be satisfied | ||
| 1498 | assert!(ctx.needed_oids.borrow().is_empty()); | ||
| 1499 | } | 1614 | } |
| 1500 | 1615 | ||
| 1501 | #[tokio::test] | 1616 | #[tokio::test] |
| 1502 | async fn skips_throttled_domains() { | 1617 | async fn next_url_skips_tried_urls() { |
| 1503 | let ctx = MockSyncContext::new() | 1618 | // URLs in tried_urls set are not returned |
| 1504 | .with_urls(&["https://throttled.com/r.git", "https://ok.com/r.git"]) | ||
| 1505 | .with_needed_oids(&["abc123"]) | ||
| 1506 | .url_provides("https://ok.com/r.git", &["abc123"]); | ||
| 1507 | |||
| 1508 | let throttle = Arc::new(ThrottleManager::new(5, 30)); | ||
| 1509 | |||
| 1510 | // Saturate throttled.com | ||
| 1511 | for _ in 0..5 { | ||
| 1512 | throttle.start_request("throttled.com"); | ||
| 1513 | } | ||
| 1514 | |||
| 1515 | sync_identifier(&ctx, "test-repo", &throttle).await; | ||
| 1516 | |||
| 1517 | // Should only try ok.com (throttled.com skipped) | ||
| 1518 | let urls_tried: Vec<_> = ctx.fetch_log.borrow().iter().map(|(u, _)| u.clone()).collect(); | ||
| 1519 | assert!(!urls_tried.contains(&"https://throttled.com/r.git".to_string())); | ||
| 1520 | assert!(urls_tried.contains(&"https://ok.com/r.git".to_string())); | ||
| 1521 | } | 1619 | } |
| 1522 | 1620 | ||
| 1523 | #[tokio::test] | 1621 | #[tokio::test] |
| 1524 | async fn enqueues_with_throttled_domains() { | 1622 | async fn from_url_fetches_and_processes_on_success() { |
| 1525 | let ctx = MockSyncContext::new() | 1623 | // Successful fetch triggers process_satisfiable_events |
| 1526 | .with_urls(&["https://throttled.com/r.git"]) | ||
| 1527 | .with_needed_oids(&["abc123"]); | ||
| 1528 | |||
| 1529 | let throttle = Arc::new(ThrottleManager::new(1, 30)); | ||
| 1530 | throttle.start_request("throttled.com"); // Saturate | ||
| 1531 | |||
| 1532 | sync_identifier(&ctx, "test-repo", &throttle).await; | ||
| 1533 | |||
| 1534 | // Should be enqueued with throttled.com for later | ||
| 1535 | assert!(throttle.has_queued_identifier("throttled.com", "test-repo")); | ||
| 1536 | } | 1624 | } |
| 1537 | } | 1625 | } |
| 1538 | ``` | 1626 | ``` |
| 1539 | 1627 | ||
| 1540 | #### Test Module 4: Partial OID Fetching | 1628 | **Success Criteria**: |
| 1629 | - [ ] `sync_identifier_next_url` returns non-throttled, untried URL | ||
| 1630 | - [ ] `sync_identifier_next_url` returns `None` when all URLs tried or throttled | ||
| 1631 | - [ ] `sync_identifier_from_url` calls `fetch_oids` and `process_satisfiable_events` | ||
| 1632 | - [ ] All 3 unit tests pass | ||
| 1541 | 1633 | ||
| 1542 | Tests behavior when servers have different subsets of OIDs: | 1634 | --- |
| 1543 | 1635 | ||
| 1636 | ### Phase 6: sync_identifier Orchestration | ||
| 1637 | |||
| 1638 | **Goal**: Implement the main sync loop for a single identifier. | ||
| 1639 | |||
| 1640 | **Files**: | ||
| 1641 | - `src/purgatory/sync/functions.rs` (extend) | ||
| 1642 | |||
| 1643 | **Deliverables**: | ||
| 1544 | ```rust | 1644 | ```rust |
| 1545 | mod partial_oid_tests { | 1645 | pub async fn sync_identifier<C: SyncContext>( |
| 1646 | ctx: &C, | ||
| 1647 | identifier: &str, | ||
| 1648 | throttle_manager: &Arc<ThrottleManager>, | ||
| 1649 | ) -> bool; // true if complete, false if pending | ||
| 1650 | ``` | ||
| 1651 | |||
| 1652 | **Unit Tests** (2 tests): | ||
| 1653 | ```rust | ||
| 1654 | #[cfg(test)] | ||
| 1655 | mod tests { | ||
| 1546 | #[tokio::test] | 1656 | #[tokio::test] |
| 1547 | async fn aggregates_oids_from_multiple_servers() { | 1657 | async fn tries_multiple_urls_until_complete() { |
| 1548 | // Server A has oid1, Server B has oid2 | 1658 | // Tries URL1 (partial), URL2 (partial), URL3 (complete) → returns true |
| 1549 | let ctx = MockSyncContext::new() | ||
| 1550 | .with_urls(&["https://a.com/r.git", "https://b.com/r.git"]) | ||
| 1551 | .with_needed_oids(&["oid1", "oid2"]) | ||
| 1552 | .url_provides("https://a.com/r.git", &["oid1"]) | ||
| 1553 | .url_provides("https://b.com/r.git", &["oid2"]); | ||
| 1554 | |||
| 1555 | let throttle = Arc::new(ThrottleManager::new(5, 30)); | ||
| 1556 | let complete = sync_identifier(&ctx, "test-repo", &throttle).await; | ||
| 1557 | |||
| 1558 | assert!(complete); | ||
| 1559 | assert!(ctx.needed_oids.borrow().is_empty()); | ||
| 1560 | } | 1659 | } |
| 1561 | 1660 | ||
| 1562 | #[tokio::test] | 1661 | #[tokio::test] |
| 1563 | async fn incomplete_when_oids_unavailable() { | 1662 | async fn enqueues_throttled_domains_when_incomplete() { |
| 1564 | // No server has oid2 | 1663 | // When URLs remain but are throttled, enqueues and returns false |
| 1565 | let ctx = MockSyncContext::new() | ||
| 1566 | .with_urls(&["https://a.com/r.git", "https://b.com/r.git"]) | ||
| 1567 | .with_needed_oids(&["oid1", "oid2"]) | ||
| 1568 | .url_provides("https://a.com/r.git", &["oid1"]) | ||
| 1569 | .url_provides("https://b.com/r.git", &["oid1"]); // Same OID, missing oid2 | ||
| 1570 | |||
| 1571 | let throttle = Arc::new(ThrottleManager::new(5, 30)); | ||
| 1572 | let complete = sync_identifier(&ctx, "test-repo", &throttle).await; | ||
| 1573 | |||
| 1574 | assert!(!complete); | ||
| 1575 | assert!(ctx.needed_oids.borrow().contains("oid2")); | ||
| 1576 | } | 1664 | } |
| 1577 | } | 1665 | } |
| 1578 | ``` | 1666 | ``` |
| 1579 | 1667 | ||
| 1580 | #### Test Module 5: Burst Debouncing | 1668 | **Success Criteria**: |
| 1669 | - [ ] Loops through available URLs until sync complete or all tried | ||
| 1670 | - [ ] Enqueues with throttled domains when OIDs still needed | ||
| 1671 | - [ ] Returns `true` when all OIDs fetched, `false` otherwise | ||
| 1672 | - [ ] Both unit tests pass | ||
| 1673 | |||
| 1674 | --- | ||
| 1675 | |||
| 1676 | ### Phase 7: Purgatory Sync Queue Integration | ||
| 1677 | |||
| 1678 | **Goal**: Add sync queue to Purgatory and implement `enqueue_sync`. | ||
| 1581 | 1679 | ||
| 1582 | Tests that rapid event arrivals are debounced: | 1680 | **Files**: |
| 1681 | - `src/purgatory/mod.rs` (extend) | ||
| 1583 | 1682 | ||
| 1683 | **Deliverables**: | ||
| 1584 | ```rust | 1684 | ```rust |
| 1585 | mod debounce_tests { | 1685 | impl Purgatory { |
| 1586 | #[tokio::test] | 1686 | // New field: sync_queue: Arc<DashMap<String, SyncQueueEntry>> |
| 1587 | async fn burst_events_debounced() { | 1687 | |
| 1588 | tokio::time::pause(); | 1688 | pub fn enqueue_sync(&self, identifier: &str, delay: Duration); |
| 1589 | 1689 | pub fn has_pending_events(&self, identifier: &str) -> bool; | |
| 1590 | let queue = SyncQueue::new(); | 1690 | } |
| 1591 | 1691 | ``` | |
| 1592 | // Simulate 10 events arriving in 100ms | 1692 | |
| 1593 | for i in 0..10 { | 1693 | **Unit Tests** (1 test): |
| 1594 | queue.enqueue("test-repo", Duration::from_millis(500)); | 1694 | ```rust |
| 1595 | tokio::time::advance(Duration::from_millis(10)).await; | 1695 | #[cfg(test)] |
| 1596 | } | 1696 | mod tests { |
| 1597 | 1697 | #[test] | |
| 1598 | // Should only have one entry (debounced) | 1698 | fn enqueue_sync_debounces_rapid_calls() { |
| 1599 | assert_eq!(queue.len(), 1); | 1699 | // Multiple enqueue_sync calls within delay window result in single entry |
| 1600 | |||
| 1601 | // Entry should be ready after 500ms from last event | ||
| 1602 | tokio::time::advance(Duration::from_millis(400)).await; | ||
| 1603 | assert!(!queue.get("test-repo").unwrap().is_ready()); | ||
| 1604 | |||
| 1605 | tokio::time::advance(Duration::from_millis(100)).await; | ||
| 1606 | assert!(queue.get("test-repo").unwrap().is_ready()); | ||
| 1607 | } | 1700 | } |
| 1608 | } | 1701 | } |
| 1609 | ``` | 1702 | ``` |
| 1610 | 1703 | ||
| 1611 | ### Integration Tests (tests/purgatory_sync.rs) | 1704 | **Success Criteria**: |
| 1705 | - [ ] `enqueue_sync` adds/updates entry in sync_queue | ||
| 1706 | - [ ] Rapid calls debounce (don't create multiple entries) | ||
| 1707 | - [ ] `has_pending_events` checks both state_events and pr_events | ||
| 1708 | - [ ] Unit test passes | ||
| 1709 | |||
| 1710 | --- | ||
| 1711 | |||
| 1712 | ### Phase 8: Main Sync Loop | ||
| 1612 | 1713 | ||
| 1613 | Integration tests verify end-to-end behavior with real relay instances. Keep these minimal and focused on outcomes. | 1714 | **Goal**: Implement the background sync loop that processes ready identifiers. |
| 1614 | 1715 | ||
| 1716 | **Files**: | ||
| 1717 | - `src/purgatory/sync/loop.rs` (new) | ||
| 1718 | |||
| 1719 | **Deliverables**: | ||
| 1615 | ```rust | 1720 | ```rust |
| 1616 | //! Purgatory Sync Integration Tests | 1721 | impl Purgatory { |
| 1617 | //! | 1722 | pub fn start_sync_loop( |
| 1618 | //! These tests verify that state and PR events sync correctly between | 1723 | self: Arc<Self>, |
| 1619 | //! ngit-grasp instances, including git data fetching. | 1724 | ctx: Arc<dyn SyncContext>, |
| 1725 | throttle_manager: Arc<ThrottleManager>, | ||
| 1726 | ) -> JoinHandle<()>; | ||
| 1727 | } | ||
| 1728 | ``` | ||
| 1620 | 1729 | ||
| 1621 | mod common; | 1730 | **Unit Tests** (0 tests): |
| 1622 | use common::{TestRelay, TestRepo, create_state_event, create_pr_event}; | 1731 | - The sync loop is tested via integration tests; unit testing async loops is fragile |
| 1623 | 1732 | ||
| 1624 | /// Core test: State event syncs with git data | 1733 | **Success Criteria**: |
| 1625 | /// | 1734 | - [ ] Loop runs every 1 second |
| 1626 | /// Verifies the complete purgatory workflow: | 1735 | - [ ] Finds ready identifiers and spawns sync tasks |
| 1627 | /// 1. State event arrives at relay B (no git data) | 1736 | - [ ] Applies backoff on incomplete syncs |
| 1628 | /// 2. Event enters purgatory | 1737 | - [ ] Removes completed identifiers from queue |
| 1629 | /// 3. Git data fetched from relay A | 1738 | |
| 1630 | /// 4. Event released and served | 1739 | --- |
| 1740 | |||
| 1741 | ### Phase 9: RealSyncContext Implementation | ||
| 1742 | |||
| 1743 | **Goal**: Implement the production `SyncContext` that connects to real systems. | ||
| 1744 | |||
| 1745 | **Files**: | ||
| 1746 | - `src/purgatory/sync/context.rs` (extend) | ||
| 1747 | |||
| 1748 | **Deliverables**: | ||
| 1749 | ```rust | ||
| 1750 | pub struct RealSyncContext { | ||
| 1751 | purgatory: Purgatory, | ||
| 1752 | database: SharedDatabase, | ||
| 1753 | git_data_path: PathBuf, | ||
| 1754 | our_domain: Option<String>, | ||
| 1755 | local_relay: Option<LocalRelay>, | ||
| 1756 | } | ||
| 1757 | |||
| 1758 | impl SyncContext for RealSyncContext { ... } | ||
| 1759 | ``` | ||
| 1760 | |||
| 1761 | **Unit Tests** (0 tests): | ||
| 1762 | - `RealSyncContext` is tested via integration tests | ||
| 1763 | |||
| 1764 | **Success Criteria**: | ||
| 1765 | - [ ] All `SyncContext` methods implemented | ||
| 1766 | - [ ] Connects to real database, git, and relay | ||
| 1767 | - [ ] `process_satisfiable_events` releases events from purgatory | ||
| 1768 | |||
| 1769 | --- | ||
| 1770 | |||
| 1771 | ### Phase 10: Integration Tests | ||
| 1772 | |||
| 1773 | **Goal**: Verify end-to-end sync behavior with real relay instances. | ||
| 1774 | |||
| 1775 | **Files**: | ||
| 1776 | - `tests/purgatory_sync.rs` (new) | ||
| 1777 | |||
| 1778 | **Integration Tests** (4 tests): | ||
| 1779 | ```rust | ||
| 1631 | #[tokio::test] | 1780 | #[tokio::test] |
| 1632 | async fn state_event_syncs_from_remote() { | 1781 | async fn state_event_syncs_from_remote() { |
| 1633 | // Setup: Two relays, relay A has repo with git data | 1782 | // State event enters purgatory, git data fetched, event released |
| 1634 | let relay_a = TestRelay::start().await; | ||
| 1635 | let repo = TestRepo::create_with_commits(&relay_a, 3).await; | ||
| 1636 | |||
| 1637 | // Relay B configured to sync from A | ||
| 1638 | let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; | ||
| 1639 | |||
| 1640 | // Publish state event to relay B (git data not present) | ||
| 1641 | let state = create_state_event(&repo); | ||
| 1642 | relay_b.publish(&state).await; | ||
| 1643 | |||
| 1644 | // Wait for sync (with timeout) | ||
| 1645 | let synced = relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)).await; | ||
| 1646 | assert!(synced, "State event should be served after git sync"); | ||
| 1647 | |||
| 1648 | // Verify git data is now available | ||
| 1649 | assert!(relay_b.can_clone(&repo.identifier()).await); | ||
| 1650 | } | 1783 | } |
| 1651 | 1784 | ||
| 1652 | /// Core test: PR event syncs with commit data | ||
| 1653 | #[tokio::test] | 1785 | #[tokio::test] |
| 1654 | async fn pr_event_syncs_from_remote() { | 1786 | async fn pr_event_syncs_from_remote() { |
| 1655 | let relay_a = TestRelay::start().await; | 1787 | // PR event enters purgatory, commit fetched, event released |
| 1656 | let repo = TestRepo::create_with_commits(&relay_a, 1).await; | ||
| 1657 | |||
| 1658 | // Create PR with a new commit | ||
| 1659 | let pr_commit = repo.create_pr_commit().await; | ||
| 1660 | let pr_event = create_pr_event(&repo, &pr_commit); | ||
| 1661 | relay_a.publish(&pr_event).await; | ||
| 1662 | relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await; | ||
| 1663 | |||
| 1664 | // Relay B syncs | ||
| 1665 | let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; | ||
| 1666 | relay_b.publish(&pr_event).await; | ||
| 1667 | |||
| 1668 | let synced = relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)).await; | ||
| 1669 | assert!(synced, "PR event should be served after git sync"); | ||
| 1670 | } | 1788 | } |
| 1671 | 1789 | ||
| 1672 | /// Edge case: Concurrent state and PR events for same repo | ||
| 1673 | /// | ||
| 1674 | /// Verifies that both event types sync correctly when arriving together. | ||
| 1675 | #[tokio::test] | 1790 | #[tokio::test] |
| 1676 | async fn concurrent_state_and_pr_sync() { | 1791 | async fn concurrent_state_and_pr_sync() { |
| 1677 | let relay_a = TestRelay::start().await; | 1792 | // Both event types sync correctly when arriving together |
| 1678 | let repo = TestRepo::create_with_commits(&relay_a, 2).await; | ||
| 1679 | |||
| 1680 | let state = create_state_event(&repo); | ||
| 1681 | let pr_commit = repo.create_pr_commit().await; | ||
| 1682 | let pr_event = create_pr_event(&repo, &pr_commit); | ||
| 1683 | |||
| 1684 | relay_a.publish(&state).await; | ||
| 1685 | relay_a.publish(&pr_event).await; | ||
| 1686 | relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await; | ||
| 1687 | |||
| 1688 | let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; | ||
| 1689 | |||
| 1690 | // Publish both simultaneously | ||
| 1691 | tokio::join!( | ||
| 1692 | relay_b.publish(&state), | ||
| 1693 | relay_b.publish(&pr_event), | ||
| 1694 | ); | ||
| 1695 | |||
| 1696 | // Both should sync | ||
| 1697 | let (state_synced, pr_synced) = tokio::join!( | ||
| 1698 | relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)), | ||
| 1699 | relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)), | ||
| 1700 | ); | ||
| 1701 | |||
| 1702 | assert!(state_synced && pr_synced, "Both events should sync"); | ||
| 1703 | } | 1793 | } |
| 1704 | 1794 | ||
| 1705 | /// Edge case: Server has subset of required OIDs | ||
| 1706 | /// | ||
| 1707 | /// Verifies aggregation from multiple sources when no single | ||
| 1708 | /// server has all required OIDs. | ||
| 1709 | #[tokio::test] | 1795 | #[tokio::test] |
| 1710 | async fn partial_oid_aggregation() { | 1796 | async fn partial_oid_aggregation_from_multiple_servers() { |
| 1711 | // Relay A has commits 1-2, Relay B has commits 2-3 | 1797 | // OIDs aggregated when no single server has all |
| 1712 | let relay_a = TestRelay::start().await; | ||
| 1713 | let relay_b = TestRelay::start().await; | ||
| 1714 | |||
| 1715 | let repo_a = TestRepo::create_with_commits(&relay_a, 2).await; | ||
| 1716 | let repo_b = TestRepo::create_with_commits(&relay_b, 2).await; | ||
| 1717 | repo_b.push_additional_commits(1).await; // Commit 3 only on B | ||
| 1718 | |||
| 1719 | // State references commits 1, 2, and 3 | ||
| 1720 | let state = create_state_event_with_commits(&[ | ||
| 1721 | repo_a.commit(0), | ||
| 1722 | repo_a.commit(1), | ||
| 1723 | repo_b.commit(2), | ||
| 1724 | ]); | ||
| 1725 | |||
| 1726 | // Relay C syncs from both A and B | ||
| 1727 | let relay_c = TestRelay::start_with_sync_sources(&[relay_a.url(), relay_b.url()]).await; | ||
| 1728 | relay_c.publish(&state).await; | ||
| 1729 | |||
| 1730 | let synced = relay_c.wait_for_event_served(&state.id, Duration::from_secs(15)).await; | ||
| 1731 | assert!(synced, "Should aggregate OIDs from multiple sources"); | ||
| 1732 | } | 1798 | } |
| 1733 | ``` | 1799 | ``` |
| 1734 | 1800 | ||
| 1735 | ### Test Summary | 1801 | **Success Criteria**: |
| 1736 | 1802 | - [ ] All 4 integration tests pass | |
| 1737 | | Category | Test Count | Purpose | | 1803 | - [ ] State events release after git sync |
| 1738 | |----------|------------|---------| | 1804 | - [ ] PR events release after commit sync |
| 1739 | | Backoff | 2 | Verify exponential backoff timing and reset | | 1805 | - [ ] Partial OID scenarios handled correctly |
| 1740 | | Throttle | 4 | Verify rate limiting, concurrency, round-robin | | 1806 | |
| 1741 | | Retry | 3 | Verify URL iteration and throttle-skip behavior | | 1807 | --- |
| 1742 | | Partial OIDs | 2 | Verify OID aggregation from multiple servers | | 1808 | |
| 1743 | | Debounce | 1 | Verify burst event handling | | 1809 | ### Phase 11: Cleanup |
| 1744 | | **Unit Total** | **12** | | | 1810 | |
| 1745 | | Integration | 4 | End-to-end sync verification | | 1811 | **Goal**: Remove old `start_state_sync` code and wire up new system. |
| 1746 | | **Grand Total** | **16** | | | 1812 | |
| 1747 | 1813 | **Files**: | |
| 1748 | This is a focused set that covers the key behaviors without redundant tests. | 1814 | - `src/purgatory/mod.rs` (modify) |
| 1749 | 1815 | - `src/main.rs` (modify) | |
| 1750 | ## Migration Path | 1816 | |
| 1751 | 1817 | **Deliverables**: | |
| 1752 | 1. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) | 1818 | - Remove `start_state_sync` method |
| 1753 | 2. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests | 1819 | - Wire `start_sync_loop` into application startup |
| 1754 | 3. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` | 1820 | - Update `add_state` to call `enqueue_sync` |
| 1755 | 4. **Phase 4**: Implement ThrottleManager trigger-based processing | 1821 | |
| 1756 | 5. **Phase 5**: Add PR event syncing | 1822 | **Success Criteria**: |
| 1757 | 6. **Phase 6**: Remove old `start_state_sync` code | 1823 | - [ ] Old sync code removed |
| 1824 | - [ ] New sync loop starts on application boot | ||
| 1825 | - [ ] All existing tests still pass | ||
| 1826 | - [ ] All new tests pass | ||
| 1827 | |||
| 1828 | --- | ||
| 1829 | |||
| 1830 | ## Test Summary | ||
| 1831 | |||
| 1832 | | Phase | Unit Tests | Integration Tests | Total | | ||
| 1833 | |-------|------------|-------------------|-------| | ||
| 1834 | | 1. SyncQueueEntry | 2 | - | 2 | | ||
| 1835 | | 2. DomainThrottle | 4 | - | 4 | | ||
| 1836 | | 3. ThrottleManager | 1 | - | 1 | | ||
| 1837 | | 4. SyncContext | 0 | - | 0 | | ||
| 1838 | | 5. Core Functions | 3 | - | 3 | | ||
| 1839 | | 6. sync_identifier | 2 | - | 2 | | ||
| 1840 | | 7. Queue Integration | 1 | - | 1 | | ||
| 1841 | | 8. Sync Loop | 0 | - | 0 | | ||
| 1842 | | 9. RealSyncContext | 0 | - | 0 | | ||
| 1843 | | 10. Integration | - | 4 | 4 | | ||
| 1844 | | 11. Cleanup | 0 | - | 0 | | ||
| 1845 | | **Total** | **13** | **4** | **17** | | ||
| 1758 | 1846 | ||
| 1759 | ## Configuration | 1847 | ## Configuration |
| 1760 | 1848 | ||