diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 09:04:50 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 09:04:50 +0000 |
| commit | 6a9f3b1219edba6af4ac0556e86c9b06f1f8233e (patch) | |
| tree | 0acd485e22b5748bebb6393792784483b6e037d4 | |
| parent | da0f173bf4b68928c6b4e3e720562d0b4c0775ac (diff) | |
docs: purgatory design improve testing strategy
| -rw-r--r-- | docs/explanation/purgatory-sync-redesign.md | 647 |
1 files changed, 358 insertions, 289 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index 6ff94bb..8eaed6c 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md | |||
| @@ -1331,352 +1331,421 @@ impl Purgatory { | |||
| 1331 | 1331 | ||
| 1332 | ## Testing Strategy | 1332 | ## Testing Strategy |
| 1333 | 1333 | ||
| 1334 | ### Unit Tests for Sync Logic | 1334 | The testing strategy follows a clear separation: **unit tests** verify the core sync mechanics (retry, backoff, throttling, round-robin) using mocks, while **integration tests** verify end-to-end behavior with real relay instances. |
| 1335 | 1335 | ||
| 1336 | The `SyncContext` trait enables pure unit tests without any I/O: | 1336 | ### Design Principles |
| 1337 | |||
| 1338 | 1. **Unit tests for mechanics**: Test retry logic, backoff timing, throttle behavior, and queue management in isolation | ||
| 1339 | 2. **Integration tests for outcomes**: Verify that events sync correctly, not the internal mechanics | ||
| 1340 | 3. **Avoid testing implementation details**: Don't test every code path; test observable behaviors | ||
| 1341 | 4. **Focus on edge cases that matter**: Partial OID availability, server failures, concurrent events | ||
| 1342 | |||
| 1343 | ### Unit Tests (src/purgatory/sync.rs) | ||
| 1344 | |||
| 1345 | Unit tests use `MockSyncContext` to test sync logic without I/O. The mock is simple: | ||
| 1337 | 1346 | ||
| 1338 | ```rust | 1347 | ```rust |
| 1339 | #[cfg(test)] | 1348 | /// Mock context for testing sync logic |
| 1340 | mod tests { | 1349 | struct MockSyncContext { |
| 1341 | use super::*; | 1350 | /// URLs available for fetching, grouped by domain |
| 1342 | 1351 | urls_by_domain: HashMap<String, Vec<String>>, | |
| 1343 | /// Mock context for testing sync logic | 1352 | /// Which OIDs each URL can provide |
| 1344 | struct MockSyncContext { | 1353 | url_provides_oids: HashMap<String, HashSet<String>>, |
| 1345 | pending_events: RefCell<bool>, | 1354 | /// OIDs still needed (decremented when "fetched") |
| 1346 | needed_oids: RefCell<HashSet<String>>, | 1355 | needed_oids: RefCell<HashSet<String>>, |
| 1347 | available_urls: Vec<String>, | 1356 | /// Track fetch attempts for assertions |
| 1348 | fetch_results: RefCell<HashMap<String, Vec<String>>>, | 1357 | fetch_log: RefCell<Vec<(String, Vec<String>)>>, // (url, oids_requested) |
| 1349 | processed_count: RefCell<usize>, | 1358 | } |
| 1350 | } | 1359 | ``` |
| 1351 | 1360 | ||
| 1352 | #[async_trait] | 1361 | #### Test Module 1: Backoff Behavior |
| 1353 | impl SyncContext for MockSyncContext { | 1362 | |
| 1354 | async fn fetch_repository_data(&self, _id: &str) -> Result<RepositoryData> { | 1363 | Tests the `SyncQueueEntry` backoff calculation and state transitions: |
| 1355 | Ok(RepositoryData { | 1364 | |
| 1356 | announcements: vec![MockAnnouncement { | 1365 | ```rust |
| 1357 | clone_urls: self.available_urls.clone(), | 1366 | mod backoff_tests { |
| 1358 | ..Default::default() | 1367 | #[test] |
| 1359 | }], | 1368 | fn backoff_doubles_up_to_cap() { |
| 1360 | ..Default::default() | 1369 | // Verify: 20s → 40s → 80s → 120s (capped) |
| 1361 | }) | 1370 | let mut entry = SyncQueueEntry::new(Duration::ZERO); |
| 1362 | } | ||
| 1363 | |||
| 1364 | fn collect_needed_oids(&self, _id: &str) -> HashSet<String> { | ||
| 1365 | self.needed_oids.borrow().clone() | ||
| 1366 | } | ||
| 1367 | |||
| 1368 | fn oid_exists(&self, _path: &Path, oid: &str) -> bool { | ||
| 1369 | !self.needed_oids.borrow().contains(oid) | ||
| 1370 | } | ||
| 1371 | |||
| 1372 | async fn fetch_oids(&self, _path: &Path, url: &str, _oids: &[String]) -> Result<Vec<String>> { | ||
| 1373 | Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) | ||
| 1374 | } | ||
| 1375 | 1371 | ||
| 1376 | async fn process_satisfiable_events(&self, _id: &str) -> Result<ProcessResult> { | 1372 | entry.on_sync_complete(); |
| 1377 | *self.processed_count.borrow_mut() += 1; | 1373 | assert_eq!(entry.backoff(), Duration::from_secs(20)); |
| 1378 | Ok(ProcessResult::default()) | ||
| 1379 | } | ||
| 1380 | 1374 | ||
| 1381 | fn has_pending_events(&self, _id: &str) -> bool { | 1375 | entry.on_sync_complete(); |
| 1382 | *self.pending_events.borrow() | 1376 | assert_eq!(entry.backoff(), Duration::from_secs(40)); |
| 1383 | } | ||
| 1384 | 1377 | ||
| 1385 | fn find_target_repo(&self, _data: &RepositoryData) -> Option<PathBuf> { | 1378 | entry.on_sync_complete(); |
| 1386 | Some(PathBuf::from("/tmp/test-repo")) | 1379 | assert_eq!(entry.backoff(), Duration::from_secs(80)); |
| 1387 | } | ||
| 1388 | 1380 | ||
| 1389 | fn our_domain(&self) -> Option<&str> { | 1381 | entry.on_sync_complete(); |
| 1390 | None | 1382 | assert_eq!(entry.backoff(), Duration::from_secs(120)); |
| 1391 | } | ||
| 1392 | } | ||
| 1393 | |||
| 1394 | #[tokio::test] | ||
| 1395 | async fn test_next_url_no_pending_events() { | ||
| 1396 | let ctx = MockSyncContext { | ||
| 1397 | pending_events: RefCell::new(false), | ||
| 1398 | needed_oids: RefCell::new(HashSet::new()), | ||
| 1399 | available_urls: vec!["https://example.com/repo.git".to_string()], | ||
| 1400 | ..Default::default() | ||
| 1401 | }; | ||
| 1402 | let throttle_manager = ThrottleManager::new(5, 30); | ||
| 1403 | let tried = HashSet::new(); | ||
| 1404 | 1383 | ||
| 1405 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; | 1384 | // Stays capped |
| 1406 | assert!(result.is_none()); | 1385 | entry.on_sync_complete(); |
| 1386 | assert_eq!(entry.backoff(), Duration::from_secs(120)); | ||
| 1407 | } | 1387 | } |
| 1408 | 1388 | ||
| 1409 | #[tokio::test] | 1389 | #[test] |
| 1410 | async fn test_next_url_no_oids_needed() { | 1390 | fn new_event_resets_backoff() { |
| 1411 | let ctx = MockSyncContext { | 1391 | // New event for same identifier resets attempt_count |
| 1412 | pending_events: RefCell::new(true), | 1392 | let mut entry = SyncQueueEntry::new(Duration::ZERO); |
| 1413 | needed_oids: RefCell::new(HashSet::new()), // Empty = no OIDs needed | 1393 | entry.on_sync_complete(); |
| 1414 | available_urls: vec!["https://example.com/repo.git".to_string()], | 1394 | entry.on_sync_complete(); |
| 1415 | ..Default::default() | 1395 | assert!(entry.attempt_count > 0); |
| 1416 | }; | ||
| 1417 | let throttle_manager = ThrottleManager::new(5, 30); | ||
| 1418 | let tried = HashSet::new(); | ||
| 1419 | 1396 | ||
| 1420 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; | 1397 | entry.on_new_event(Duration::from_millis(500)); |
| 1421 | assert!(result.is_none()); // No URL needed, sync is complete | 1398 | assert_eq!(entry.attempt_count, 0); |
| 1422 | } | 1399 | } |
| 1423 | 1400 | } | |
| 1424 | #[tokio::test] | 1401 | ``` |
| 1425 | async fn test_next_url_returns_non_throttled() { | 1402 | |
| 1426 | let mut needed = HashSet::new(); | 1403 | #### Test Module 2: Throttle & Rate Limiting |
| 1427 | needed.insert("abc123".to_string()); | 1404 | |
| 1428 | 1405 | Tests `DomainThrottle` capacity checks and rate limiting: | |
| 1429 | let ctx = MockSyncContext { | 1406 | |
| 1430 | pending_events: RefCell::new(true), | 1407 | ```rust |
| 1431 | needed_oids: RefCell::new(needed), | 1408 | mod throttle_tests { |
| 1432 | available_urls: vec!["https://example.com/repo.git".to_string()], | 1409 | #[test] |
| 1433 | ..Default::default() | 1410 | fn concurrent_limit_enforced() { |
| 1434 | }; | 1411 | let mut throttle = DomainThrottle::new("example.com".into(), 2, 100); |
| 1435 | let throttle_manager = ThrottleManager::new(5, 30); | 1412 | |
| 1436 | let tried = HashSet::new(); | 1413 | assert!(throttle.has_capacity()); |
| 1437 | 1414 | throttle.start_request(); | |
| 1438 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; | 1415 | assert!(throttle.has_capacity()); |
| 1439 | assert_eq!(result, Some("https://example.com/repo.git".to_string())); | 1416 | throttle.start_request(); |
| 1417 | assert!(!throttle.has_capacity()); // At limit | ||
| 1418 | |||
| 1419 | throttle.complete_request(); | ||
| 1420 | assert!(throttle.has_capacity()); // Capacity freed | ||
| 1440 | } | 1421 | } |
| 1441 | 1422 | ||
| 1442 | #[tokio::test] | 1423 | #[test] |
| 1443 | async fn test_next_url_skips_tried() { | 1424 | fn rate_limit_enforced() { |
| 1444 | let mut needed = HashSet::new(); | 1425 | let mut throttle = DomainThrottle::new("example.com".into(), 100, 2); |
| 1445 | needed.insert("abc123".to_string()); | ||
| 1446 | 1426 | ||
| 1447 | let ctx = MockSyncContext { | 1427 | // Make 2 requests (at rate limit) |
| 1448 | pending_events: RefCell::new(true), | 1428 | throttle.start_request(); |
| 1449 | needed_oids: RefCell::new(needed), | 1429 | throttle.complete_request(); |
| 1450 | available_urls: vec![ | 1430 | throttle.start_request(); |
| 1451 | "https://example.com/repo.git".to_string(), | 1431 | throttle.complete_request(); |
| 1452 | "https://other.com/repo.git".to_string(), | ||
| 1453 | ], | ||
| 1454 | ..Default::default() | ||
| 1455 | }; | ||
| 1456 | let throttle_manager = ThrottleManager::new(5, 30); | ||
| 1457 | 1432 | ||
| 1458 | let mut tried = HashSet::new(); | 1433 | assert!(!throttle.has_capacity()); // Rate limited |
| 1459 | tried.insert("https://example.com/repo.git".to_string()); | ||
| 1460 | 1434 | ||
| 1461 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; | 1435 | // After 60s window passes, capacity returns |
| 1462 | assert_eq!(result, Some("https://other.com/repo.git".to_string())); | 1436 | // (In real tests, use tokio::time::pause/advance) |
| 1463 | } | 1437 | } |
| 1464 | 1438 | ||
| 1465 | #[tokio::test] | 1439 | #[test] |
| 1466 | async fn test_next_url_specific_domain() { | 1440 | fn round_robin_fairness() { |
| 1467 | let mut needed = HashSet::new(); | 1441 | let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); |
| 1468 | needed.insert("abc123".to_string()); | 1442 | |
| 1469 | 1443 | // Enqueue 3 identifiers | |
| 1470 | let ctx = MockSyncContext { | 1444 | throttle.enqueue_identifier("A".into(), HashSet::new()); |
| 1471 | pending_events: RefCell::new(true), | 1445 | throttle.enqueue_identifier("B".into(), HashSet::new()); |
| 1472 | needed_oids: RefCell::new(needed), | 1446 | throttle.enqueue_identifier("C".into(), HashSet::new()); |
| 1473 | available_urls: vec![ | 1447 | |
| 1474 | "https://example.com/repo.git".to_string(), | 1448 | // Process in round-robin order |
| 1475 | "https://other.com/repo.git".to_string(), | 1449 | let mut order = vec![]; |
| 1476 | ], | 1450 | for _ in 0..6 { |
| 1477 | ..Default::default() | 1451 | if let Some(id) = throttle.next_ready_identifier() { |
| 1478 | }; | 1452 | order.push(id.clone()); |
| 1479 | let throttle_manager = ThrottleManager::new(5, 30); | 1453 | throttle.mark_identifier_not_in_progress(&id); |
| 1480 | let tried = HashSet::new(); | 1454 | } |
| 1455 | } | ||
| 1481 | 1456 | ||
| 1482 | // Request specific domain | 1457 | assert_eq!(order, vec!["A", "B", "C", "A", "B", "C"]); |
| 1483 | let result = sync_identifier_next_url( | ||
| 1484 | &ctx, "test", Some("other.com"), &tried, &throttle_manager | ||
| 1485 | ).await; | ||
| 1486 | assert_eq!(result, Some("https://other.com/repo.git".to_string())); | ||
| 1487 | } | 1458 | } |
| 1488 | 1459 | ||
| 1489 | #[tokio::test] | 1460 | #[test] |
| 1490 | async fn test_next_url_none_when_all_tried() { | 1461 | fn skips_in_progress_identifiers() { |
| 1491 | let mut needed = HashSet::new(); | 1462 | let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); |
| 1492 | needed.insert("abc123".to_string()); | ||
| 1493 | 1463 | ||
| 1494 | let ctx = MockSyncContext { | 1464 | throttle.enqueue_identifier("A".into(), HashSet::new()); |
| 1495 | pending_events: RefCell::new(true), | 1465 | throttle.enqueue_identifier("B".into(), HashSet::new()); |
| 1496 | needed_oids: RefCell::new(needed), | 1466 | |
| 1497 | available_urls: vec!["https://example.com/repo.git".to_string()], | 1467 | // Get A (now in_progress) |
| 1498 | ..Default::default() | 1468 | assert_eq!(throttle.next_ready_identifier(), Some("A".into())); |
| 1499 | }; | ||
| 1500 | let throttle_manager = ThrottleManager::new(5, 30); | ||
| 1501 | 1469 | ||
| 1502 | let mut tried = HashSet::new(); | 1470 | // Next skips A, returns B |
| 1503 | tried.insert("https://example.com/repo.git".to_string()); | 1471 | assert_eq!(throttle.next_ready_identifier(), Some("B".into())); |
| 1504 | 1472 | ||
| 1505 | let result = sync_identifier_next_url(&ctx, "test", None, &tried, &throttle_manager).await; | 1473 | // Both in progress |
| 1506 | assert!(result.is_none()); | 1474 | assert_eq!(throttle.next_ready_identifier(), None); |
| 1507 | } | 1475 | } |
| 1508 | 1476 | } | |
| 1477 | ``` | ||
| 1478 | |||
| 1479 | #### Test Module 3: Retry & URL Selection | ||
| 1480 | |||
| 1481 | Tests `sync_identifier_next_url` behavior: | ||
| 1482 | |||
| 1483 | ```rust | ||
| 1484 | mod retry_tests { | ||
| 1509 | #[tokio::test] | 1485 | #[tokio::test] |
| 1510 | async fn test_from_url_fetches_and_processes() { | 1486 | async fn tries_urls_until_success() { |
| 1511 | let mut needed = HashSet::new(); | 1487 | let ctx = MockSyncContext::new() |
| 1512 | needed.insert("abc123".to_string()); | 1488 | .with_urls(&["https://fail1.com/r.git", "https://fail2.com/r.git", "https://ok.com/r.git"]) |
| 1513 | 1489 | .with_needed_oids(&["abc123"]) | |
| 1514 | let mut fetch_results = HashMap::new(); | 1490 | .url_provides("https://ok.com/r.git", &["abc123"]); |
| 1515 | fetch_results.insert( | 1491 | |
| 1516 | "https://example.com/repo.git".to_string(), | 1492 | let throttle = Arc::new(ThrottleManager::new(5, 30)); |
| 1517 | vec!["abc123".to_string()], | 1493 | sync_identifier(&ctx, "test-repo", &throttle).await; |
| 1518 | ); | 1494 | |
| 1519 | 1495 | // Should have tried all 3 URLs | |
| 1520 | let ctx = MockSyncContext { | 1496 | assert_eq!(ctx.fetch_log.borrow().len(), 3); |
| 1521 | pending_events: RefCell::new(true), | 1497 | // OID should now be satisfied |
| 1522 | needed_oids: RefCell::new(needed), | 1498 | assert!(ctx.needed_oids.borrow().is_empty()); |
| 1523 | available_urls: vec!["https://example.com/repo.git".to_string()], | ||
| 1524 | fetch_results: RefCell::new(fetch_results), | ||
| 1525 | processed_count: RefCell::new(0), | ||
| 1526 | }; | ||
| 1527 | let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); | ||
| 1528 | |||
| 1529 | let oids_fetched = sync_identifier_from_url( | ||
| 1530 | &ctx, "test", "https://example.com/repo.git", &throttle_manager | ||
| 1531 | ).await; | ||
| 1532 | |||
| 1533 | assert_eq!(oids_fetched, 1); | ||
| 1534 | assert_eq!(*ctx.processed_count.borrow(), 1); | ||
| 1535 | } | 1499 | } |
| 1536 | 1500 | ||
| 1537 | #[tokio::test] | 1501 | #[tokio::test] |
| 1538 | async fn test_full_sync_with_throttled_domains() { | 1502 | async fn skips_throttled_domains() { |
| 1539 | let mut needed = HashSet::new(); | 1503 | let ctx = MockSyncContext::new() |
| 1540 | needed.insert("abc123".to_string()); | 1504 | .with_urls(&["https://throttled.com/r.git", "https://ok.com/r.git"]) |
| 1541 | 1505 | .with_needed_oids(&["abc123"]) | |
| 1542 | let mut fetch_results = HashMap::new(); | 1506 | .url_provides("https://ok.com/r.git", &["abc123"]); |
| 1543 | fetch_results.insert( | ||
| 1544 | "https://server1.com/repo.git".to_string(), | ||
| 1545 | vec![], // First server doesn't have the OID | ||
| 1546 | ); | ||
| 1547 | fetch_results.insert( | ||
| 1548 | "https://server2.com/repo.git".to_string(), | ||
| 1549 | vec!["abc123".to_string()], // Second server has it | ||
| 1550 | ); | ||
| 1551 | |||
| 1552 | let ctx = MockSyncContext { | ||
| 1553 | pending_events: RefCell::new(true), | ||
| 1554 | needed_oids: RefCell::new(needed.clone()), | ||
| 1555 | available_urls: vec![ | ||
| 1556 | "https://server1.com/repo.git".to_string(), | ||
| 1557 | "https://server2.com/repo.git".to_string(), | ||
| 1558 | ], | ||
| 1559 | fetch_results: RefCell::new(fetch_results), | ||
| 1560 | processed_count: RefCell::new(0), | ||
| 1561 | }; | ||
| 1562 | 1507 | ||
| 1563 | let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); | 1508 | let throttle = Arc::new(ThrottleManager::new(5, 30)); |
| 1564 | 1509 | ||
| 1565 | // Manually throttle server2.com to test enqueueing | 1510 | // Saturate throttled.com |
| 1566 | // (In real code, this would happen due to rate limits) | 1511 | for _ in 0..5 { |
| 1567 | // For this test, we just verify the sync tries available URLs | 1512 | throttle.start_request("throttled.com"); |
| 1513 | } | ||
| 1568 | 1514 | ||
| 1569 | let complete = sync_identifier(&ctx, "test", &throttle_manager).await; | 1515 | sync_identifier(&ctx, "test-repo", &throttle).await; |
| 1570 | 1516 | ||
| 1571 | // Should have processed events (found OID from server2) | 1517 | // Should only try ok.com (throttled.com skipped) |
| 1572 | assert!(*ctx.processed_count.borrow() >= 1); | 1518 | let urls_tried: Vec<_> = ctx.fetch_log.borrow().iter().map(|(u, _)| u.clone()).collect(); |
| 1519 | assert!(!urls_tried.contains(&"https://throttled.com/r.git".to_string())); | ||
| 1520 | assert!(urls_tried.contains(&"https://ok.com/r.git".to_string())); | ||
| 1573 | } | 1521 | } |
| 1574 | 1522 | ||
| 1575 | #[tokio::test] | 1523 | #[tokio::test] |
| 1576 | async fn test_domain_throttle_queue_round_robin() { | 1524 | async fn enqueues_with_throttled_domains() { |
| 1577 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | 1525 | let ctx = MockSyncContext::new() |
| 1578 | 1526 | .with_urls(&["https://throttled.com/r.git"]) | |
| 1579 | // Enqueue three identifiers | 1527 | .with_needed_oids(&["abc123"]); |
| 1580 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | ||
| 1581 | throttle.enqueue_identifier("id2".to_string(), HashSet::new()); | ||
| 1582 | throttle.enqueue_identifier("id3".to_string(), HashSet::new()); | ||
| 1583 | 1528 | ||
| 1584 | // Should get them in round-robin order | 1529 | let throttle = Arc::new(ThrottleManager::new(1, 30)); |
| 1585 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | 1530 | throttle.start_request("throttled.com"); // Saturate |
| 1586 | throttle.mark_identifier_not_in_progress("id1"); | ||
| 1587 | 1531 | ||
| 1588 | assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string())); | 1532 | sync_identifier(&ctx, "test-repo", &throttle).await; |
| 1589 | throttle.mark_identifier_not_in_progress("id2"); | ||
| 1590 | 1533 | ||
| 1591 | assert_eq!(throttle.next_ready_identifier(), Some("id3".to_string())); | 1534 | // Should be enqueued with throttled.com for later |
| 1592 | throttle.mark_identifier_not_in_progress("id3"); | 1535 | assert!(throttle.has_queued_identifier("throttled.com", "test-repo")); |
| 1593 | |||
| 1594 | // Back to id1 | ||
| 1595 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | ||
| 1596 | } | 1536 | } |
| 1597 | 1537 | } | |
| 1538 | ``` | ||
| 1539 | |||
| 1540 | #### Test Module 4: Partial OID Fetching | ||
| 1541 | |||
| 1542 | Tests behavior when servers have different subsets of OIDs: | ||
| 1543 | |||
| 1544 | ```rust | ||
| 1545 | mod partial_oid_tests { | ||
| 1598 | #[tokio::test] | 1546 | #[tokio::test] |
| 1599 | async fn test_domain_throttle_skips_in_progress() { | 1547 | async fn aggregates_oids_from_multiple_servers() { |
| 1600 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | 1548 | // Server A has oid1, Server B has oid2 |
| 1601 | 1549 | let ctx = MockSyncContext::new() | |
| 1602 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | 1550 | .with_urls(&["https://a.com/r.git", "https://b.com/r.git"]) |
| 1603 | throttle.enqueue_identifier("id2".to_string(), HashSet::new()); | 1551 | .with_needed_oids(&["oid1", "oid2"]) |
| 1604 | 1552 | .url_provides("https://a.com/r.git", &["oid1"]) | |
| 1605 | // Get id1 (marks it in_progress) | 1553 | .url_provides("https://b.com/r.git", &["oid2"]); |
| 1606 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | 1554 | |
| 1607 | 1555 | let throttle = Arc::new(ThrottleManager::new(5, 30)); | |
| 1608 | // Next should skip id1 and return id2 | 1556 | let complete = sync_identifier(&ctx, "test-repo", &throttle).await; |
| 1609 | assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string())); | 1557 | |
| 1610 | 1558 | assert!(complete); | |
| 1611 | // Both in progress, should return None | 1559 | assert!(ctx.needed_oids.borrow().is_empty()); |
| 1612 | assert_eq!(throttle.next_ready_identifier(), None); | ||
| 1613 | |||
| 1614 | // Mark id1 not in progress | ||
| 1615 | throttle.mark_identifier_not_in_progress("id1"); | ||
| 1616 | |||
| 1617 | // Now id1 should be available again | ||
| 1618 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | ||
| 1619 | } | 1560 | } |
| 1620 | 1561 | ||
| 1621 | #[tokio::test] | 1562 | #[tokio::test] |
| 1622 | async fn test_domain_throttle_remove_adjusts_index() { | 1563 | async fn incomplete_when_oids_unavailable() { |
| 1623 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | 1564 | // No server has oid2 |
| 1624 | 1565 | let ctx = MockSyncContext::new() | |
| 1625 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | 1566 | .with_urls(&["https://a.com/r.git", "https://b.com/r.git"]) |
| 1626 | throttle.enqueue_identifier("id2".to_string(), HashSet::new()); | 1567 | .with_needed_oids(&["oid1", "oid2"]) |
| 1627 | throttle.enqueue_identifier("id3".to_string(), HashSet::new()); | 1568 | .url_provides("https://a.com/r.git", &["oid1"]) |
| 1628 | 1569 | .url_provides("https://b.com/r.git", &["oid1"]); // Same OID, missing oid2 | |
| 1629 | // Advance to id2 | 1570 | |
| 1630 | assert_eq!(throttle.next_ready_identifier(), Some("id1".to_string())); | 1571 | let throttle = Arc::new(ThrottleManager::new(5, 30)); |
| 1631 | throttle.mark_identifier_not_in_progress("id1"); | 1572 | let complete = sync_identifier(&ctx, "test-repo", &throttle).await; |
| 1632 | 1573 | ||
| 1633 | // Remove id1 (before current index) | 1574 | assert!(!complete); |
| 1634 | throttle.remove_identifier("id1"); | 1575 | assert!(ctx.needed_oids.borrow().contains("oid2")); |
| 1635 | |||
| 1636 | // Should continue with id2 (not skip to id3) | ||
| 1637 | assert_eq!(throttle.next_ready_identifier(), Some("id2".to_string())); | ||
| 1638 | } | 1576 | } |
| 1639 | 1577 | } | |
| 1578 | ``` | ||
| 1579 | |||
| 1580 | #### Test Module 5: Burst Debouncing | ||
| 1581 | |||
| 1582 | Tests that rapid event arrivals are debounced: | ||
| 1583 | |||
| 1584 | ```rust | ||
| 1585 | mod debounce_tests { | ||
| 1640 | #[tokio::test] | 1586 | #[tokio::test] |
| 1641 | async fn test_domain_throttle_has_queued_work() { | 1587 | async fn burst_events_debounced() { |
| 1642 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | 1588 | tokio::time::pause(); |
| 1643 | |||
| 1644 | assert!(!throttle.has_queued_work()); | ||
| 1645 | 1589 | ||
| 1646 | throttle.enqueue_identifier("id1".to_string(), HashSet::new()); | 1590 | let queue = SyncQueue::new(); |
| 1647 | assert!(throttle.has_queued_work()); | ||
| 1648 | 1591 | ||
| 1649 | throttle.remove_identifier("id1"); | 1592 | // Simulate 10 events arriving in 100ms |
| 1650 | assert!(!throttle.has_queued_work()); | 1593 | for i in 0..10 { |
| 1651 | } | 1594 | queue.enqueue("test-repo", Duration::from_millis(500)); |
| 1652 | 1595 | tokio::time::advance(Duration::from_millis(10)).await; | |
| 1653 | #[tokio::test] | 1596 | } |
| 1654 | async fn test_domain_throttle_tried_urls_merge() { | ||
| 1655 | let mut throttle = DomainThrottle::new("example.com".to_string(), 5, 30); | ||
| 1656 | 1597 | ||
| 1657 | let mut urls1 = HashSet::new(); | 1598 | // Should only have one entry (debounced) |
| 1658 | urls1.insert("url1".to_string()); | 1599 | assert_eq!(queue.len(), 1); |
| 1659 | throttle.enqueue_identifier("id1".to_string(), urls1); | ||
| 1660 | 1600 | ||
| 1661 | // Enqueue again with different tried URLs - should merge | 1601 | // Entry should be ready after 500ms from last event |
| 1662 | let mut urls2 = HashSet::new(); | 1602 | tokio::time::advance(Duration::from_millis(400)).await; |
| 1663 | urls2.insert("url2".to_string()); | 1603 | assert!(!queue.get("test-repo").unwrap().is_ready()); |
| 1664 | throttle.enqueue_identifier("id1".to_string(), urls2); | ||
| 1665 | 1604 | ||
| 1666 | let tried = throttle.get_tried_urls("id1"); | 1605 | tokio::time::advance(Duration::from_millis(100)).await; |
| 1667 | assert!(tried.contains("url1")); | 1606 | assert!(queue.get("test-repo").unwrap().is_ready()); |
| 1668 | assert!(tried.contains("url2")); | ||
| 1669 | } | 1607 | } |
| 1670 | } | 1608 | } |
| 1671 | ``` | 1609 | ``` |
| 1672 | 1610 | ||
| 1673 | ### Integration Tests | 1611 | ### Integration Tests (tests/purgatory_sync.rs) |
| 1612 | |||
| 1613 | Integration tests verify end-to-end behavior with real relay instances. Keep these minimal and focused on outcomes. | ||
| 1614 | |||
| 1615 | ```rust | ||
| 1616 | //! Purgatory Sync Integration Tests | ||
| 1617 | //! | ||
| 1618 | //! These tests verify that state and PR events sync correctly between | ||
| 1619 | //! ngit-grasp instances, including git data fetching. | ||
| 1620 | |||
| 1621 | mod common; | ||
| 1622 | use common::{TestRelay, TestRepo, create_state_event, create_pr_event}; | ||
| 1623 | |||
| 1624 | /// Core test: State event syncs with git data | ||
| 1625 | /// | ||
| 1626 | /// Verifies the complete purgatory workflow: | ||
| 1627 | /// 1. State event arrives at relay B (no git data) | ||
| 1628 | /// 2. Event enters purgatory | ||
| 1629 | /// 3. Git data fetched from relay A | ||
| 1630 | /// 4. Event released and served | ||
| 1631 | #[tokio::test] | ||
| 1632 | async fn state_event_syncs_from_remote() { | ||
| 1633 | // Setup: Two relays, relay A has repo with git data | ||
| 1634 | let relay_a = TestRelay::start().await; | ||
| 1635 | let repo = TestRepo::create_with_commits(&relay_a, 3).await; | ||
| 1636 | |||
| 1637 | // Relay B configured to sync from A | ||
| 1638 | let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; | ||
| 1639 | |||
| 1640 | // Publish state event to relay B (git data not present) | ||
| 1641 | let state = create_state_event(&repo); | ||
| 1642 | relay_b.publish(&state).await; | ||
| 1643 | |||
| 1644 | // Wait for sync (with timeout) | ||
| 1645 | let synced = relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)).await; | ||
| 1646 | assert!(synced, "State event should be served after git sync"); | ||
| 1647 | |||
| 1648 | // Verify git data is now available | ||
| 1649 | assert!(relay_b.can_clone(&repo.identifier()).await); | ||
| 1650 | } | ||
| 1651 | |||
| 1652 | /// Core test: PR event syncs with commit data | ||
| 1653 | #[tokio::test] | ||
| 1654 | async fn pr_event_syncs_from_remote() { | ||
| 1655 | let relay_a = TestRelay::start().await; | ||
| 1656 | let repo = TestRepo::create_with_commits(&relay_a, 1).await; | ||
| 1657 | |||
| 1658 | // Create PR with a new commit | ||
| 1659 | let pr_commit = repo.create_pr_commit().await; | ||
| 1660 | let pr_event = create_pr_event(&repo, &pr_commit); | ||
| 1661 | relay_a.publish(&pr_event).await; | ||
| 1662 | relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await; | ||
| 1663 | |||
| 1664 | // Relay B syncs | ||
| 1665 | let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; | ||
| 1666 | relay_b.publish(&pr_event).await; | ||
| 1667 | |||
| 1668 | let synced = relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)).await; | ||
| 1669 | assert!(synced, "PR event should be served after git sync"); | ||
| 1670 | } | ||
| 1671 | |||
| 1672 | /// Edge case: Concurrent state and PR events for same repo | ||
| 1673 | /// | ||
| 1674 | /// Verifies that both event types sync correctly when arriving together. | ||
| 1675 | #[tokio::test] | ||
| 1676 | async fn concurrent_state_and_pr_sync() { | ||
| 1677 | let relay_a = TestRelay::start().await; | ||
| 1678 | let repo = TestRepo::create_with_commits(&relay_a, 2).await; | ||
| 1679 | |||
| 1680 | let state = create_state_event(&repo); | ||
| 1681 | let pr_commit = repo.create_pr_commit().await; | ||
| 1682 | let pr_event = create_pr_event(&repo, &pr_commit); | ||
| 1683 | |||
| 1684 | relay_a.publish(&state).await; | ||
| 1685 | relay_a.publish(&pr_event).await; | ||
| 1686 | relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await; | ||
| 1687 | |||
| 1688 | let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; | ||
| 1689 | |||
| 1690 | // Publish both simultaneously | ||
| 1691 | tokio::join!( | ||
| 1692 | relay_b.publish(&state), | ||
| 1693 | relay_b.publish(&pr_event), | ||
| 1694 | ); | ||
| 1695 | |||
| 1696 | // Both should sync | ||
| 1697 | let (state_synced, pr_synced) = tokio::join!( | ||
| 1698 | relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)), | ||
| 1699 | relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)), | ||
| 1700 | ); | ||
| 1701 | |||
| 1702 | assert!(state_synced && pr_synced, "Both events should sync"); | ||
| 1703 | } | ||
| 1704 | |||
| 1705 | /// Edge case: Server has subset of required OIDs | ||
| 1706 | /// | ||
| 1707 | /// Verifies aggregation from multiple sources when no single | ||
| 1708 | /// server has all required OIDs. | ||
| 1709 | #[tokio::test] | ||
| 1710 | async fn partial_oid_aggregation() { | ||
| 1711 | // Relay A has commits 1-2, Relay B has commits 2-3 | ||
| 1712 | let relay_a = TestRelay::start().await; | ||
| 1713 | let relay_b = TestRelay::start().await; | ||
| 1714 | |||
| 1715 | let repo_a = TestRepo::create_with_commits(&relay_a, 2).await; | ||
| 1716 | let repo_b = TestRepo::create_with_commits(&relay_b, 2).await; | ||
| 1717 | repo_b.push_additional_commits(1).await; // Commit 3 only on B | ||
| 1718 | |||
| 1719 | // State references commits 1, 2, and 3 | ||
| 1720 | let state = create_state_event_with_commits(&[ | ||
| 1721 | repo_a.commit(0), | ||
| 1722 | repo_a.commit(1), | ||
| 1723 | repo_b.commit(2), | ||
| 1724 | ]); | ||
| 1725 | |||
| 1726 | // Relay C syncs from both A and B | ||
| 1727 | let relay_c = TestRelay::start_with_sync_sources(&[relay_a.url(), relay_b.url()]).await; | ||
| 1728 | relay_c.publish(&state).await; | ||
| 1729 | |||
| 1730 | let synced = relay_c.wait_for_event_served(&state.id, Duration::from_secs(15)).await; | ||
| 1731 | assert!(synced, "Should aggregate OIDs from multiple sources"); | ||
| 1732 | } | ||
| 1733 | ``` | ||
| 1734 | |||
| 1735 | ### Test Summary | ||
| 1736 | |||
| 1737 | | Category | Test Count | Purpose | | ||
| 1738 | |----------|------------|---------| | ||
| 1739 | | Backoff | 2 | Verify exponential backoff timing and reset | | ||
| 1740 | | Throttle | 4 | Verify rate limiting, concurrency, round-robin | | ||
| 1741 | | Retry | 3 | Verify URL iteration and throttle-skip behavior | | ||
| 1742 | | Partial OIDs | 2 | Verify OID aggregation from multiple servers | | ||
| 1743 | | Debounce | 1 | Verify burst event handling | | ||
| 1744 | | **Unit Total** | **12** | | | ||
| 1745 | | Integration | 4 | End-to-end sync verification | | ||
| 1746 | | **Grand Total** | **16** | | | ||
| 1674 | 1747 | ||
| 1675 | 1. **Sync against own implementation**: Two ngit-grasp instances syncing | 1748 | This is a focused set that covers the key behaviors without redundant tests. |
| 1676 | 2. **Burst handling**: 10 events in 100ms, verify debounce | ||
| 1677 | 3. **Backoff behavior**: Unreachable URLs, verify timing | ||
| 1678 | 4. **Rate limiting**: Verify 30 req/min and 5 concurrent limits | ||
| 1679 | 5. **Parallel identifiers**: 5 identifiers sync in parallel | ||
| 1680 | 1749 | ||
| 1681 | ## Migration Path | 1750 | ## Migration Path |
| 1682 | 1751 | ||