From ecdc68f7d38b63ec411b48fac05cd2d98303cab0 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 09:15:40 +0000 Subject: docs: purgatory sync implemeantion plan --- docs/explanation/purgatory-sync-redesign.md | 742 ++++++++++++++++------------ 1 file changed, 415 insertions(+), 327 deletions(-) (limited to 'docs') diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index 8eaed6c..54e279a 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md @@ -1331,430 +1331,518 @@ impl Purgatory { ## Testing Strategy -The testing strategy follows a clear separation: **unit tests** verify the core sync mechanics (retry, backoff, throttling, round-robin) using mocks, while **integration tests** verify end-to-end behavior with real relay instances. +Tests are created **only** as part of each implementation phase. See [Implementation Phases](#implementation-phases) for the complete test plan. ### Design Principles -1. **Unit tests for mechanics**: Test retry logic, backoff timing, throttle behavior, and queue management in isolation -2. **Integration tests for outcomes**: Verify that events sync correctly, not the internal mechanics -3. **Avoid testing implementation details**: Don't test every code path; test observable behaviors -4. **Focus on edge cases that matter**: Partial OID availability, server failures, concurrent events +1. **Tests accompany code**: Each phase specifies exactly which tests to create +2. **Unit tests for mechanics**: Test backoff, throttle, retry logic in isolation using mocks +3. **Integration tests for outcomes**: Verify events sync correctly end-to-end +4. **No speculative tests**: Don't create tests for code that doesn't exist yet -### Unit Tests (src/purgatory/sync.rs) +### MockSyncContext -Unit tests use `MockSyncContext` to test sync logic without I/O. The mock is simple: +Phases 4-6 use `MockSyncContext` to test sync logic without I/O: ```rust /// Mock context for testing sync logic -struct MockSyncContext { - /// URLs available for fetching, grouped by domain - urls_by_domain: HashMap>, - /// Which OIDs each URL can provide - url_provides_oids: HashMap>, +#[cfg(test)] +pub struct MockSyncContext { + /// Repository data to return + repo_data: RepositoryData, /// OIDs still needed (decremented when "fetched") needed_oids: RefCell>, + /// Which OIDs each URL can provide + url_provides_oids: HashMap>, /// Track fetch attempts for assertions - fetch_log: RefCell)>>, // (url, oids_requested) + fetch_log: RefCell>, + /// Whether there are pending events + has_pending: RefCell, +} + +impl MockSyncContext { + pub fn new() -> Self; + pub fn with_urls(self, urls: &[&str]) -> Self; + pub fn with_needed_oids(self, oids: &[&str]) -> Self; + pub fn url_provides(self, url: &str, oids: &[&str]) -> Self; } ``` -#### Test Module 1: Backoff Behavior +### Test Locations + +| Test Type | Location | Created In | +|-----------|----------|------------| +| SyncQueueEntry | `src/purgatory/sync/queue.rs` | Phase 1 | +| DomainThrottle | `src/purgatory/sync/throttle.rs` | Phase 2 | +| ThrottleManager | `src/purgatory/sync/throttle.rs` | Phase 3 | +| Core sync functions | `src/purgatory/sync/functions.rs` | Phase 5-6 | +| Queue integration | `src/purgatory/mod.rs` | Phase 7 | +| Integration tests | `tests/purgatory_sync.rs` | Phase 10 | + +## Implementation Phases + +Each phase has clear deliverables, unit tests, and success criteria. Unit tests are created **only** for the code built in that phase. + +--- + +### Phase 1: SyncQueueEntry with Backoff + +**Goal**: Implement the sync queue entry struct with backoff calculation. + +**Files**: +- `src/purgatory/sync/queue.rs` (new) -Tests the `SyncQueueEntry` backoff calculation and state transitions: +**Deliverables**: +```rust +pub struct SyncQueueEntry { + pub next_attempt: Instant, + pub attempt_count: u32, + pub in_progress: bool, +} +impl SyncQueueEntry { + pub fn new(delay: Duration) -> Self; + pub fn backoff(&self) -> Duration; + pub fn is_ready(&self) -> bool; + pub fn on_new_event(&mut self, delay: Duration); + pub fn on_sync_complete(&mut self); +} +``` + +**Unit Tests** (2 tests): ```rust -mod backoff_tests { +#[cfg(test)] +mod tests { #[test] fn backoff_doubles_up_to_cap() { - // Verify: 20s → 40s → 80s → 120s (capped) - let mut entry = SyncQueueEntry::new(Duration::ZERO); - - entry.on_sync_complete(); - assert_eq!(entry.backoff(), Duration::from_secs(20)); - - entry.on_sync_complete(); - assert_eq!(entry.backoff(), Duration::from_secs(40)); - - entry.on_sync_complete(); - assert_eq!(entry.backoff(), Duration::from_secs(80)); - - entry.on_sync_complete(); - assert_eq!(entry.backoff(), Duration::from_secs(120)); - - // Stays capped - entry.on_sync_complete(); - assert_eq!(entry.backoff(), Duration::from_secs(120)); + // 20s → 40s → 80s → 120s → 120s (capped) } #[test] - fn new_event_resets_backoff() { - // New event for same identifier resets attempt_count - let mut entry = SyncQueueEntry::new(Duration::ZERO); - entry.on_sync_complete(); - entry.on_sync_complete(); - assert!(entry.attempt_count > 0); - - entry.on_new_event(Duration::from_millis(500)); - assert_eq!(entry.attempt_count, 0); + fn new_event_resets_attempt_count() { + // on_new_event() resets attempt_count to 0 } } ``` -#### Test Module 2: Throttle & Rate Limiting +**Success Criteria**: +- [ ] `SyncQueueEntry::new()` creates entry with given delay +- [ ] `backoff()` returns 20s, 40s, 80s, 120s, 120s for attempts 1-5 +- [ ] `on_new_event()` resets `attempt_count` to 0 +- [ ] `on_sync_complete()` increments `attempt_count` and updates `next_attempt` +- [ ] Both unit tests pass + +--- + +### Phase 2: DomainThrottle with Rate Limiting and Round-Robin + +**Goal**: Implement per-domain throttling with concurrent/rate limits and fair queue processing. + +**Files**: +- `src/purgatory/sync/throttle.rs` (new) + +**Deliverables**: +```rust +pub struct DomainThrottle { + domain: String, + in_flight: u32, + request_times: VecDeque, + queue: IndexMap, + round_robin_index: usize, + max_concurrent: u32, + max_per_minute: u32, +} -Tests `DomainThrottle` capacity checks and rate limiting: +impl DomainThrottle { + pub fn new(domain: String, max_concurrent: u32, max_per_minute: u32) -> Self; + pub fn has_capacity(&self) -> bool; + pub fn start_request(&mut self); + pub fn complete_request(&mut self); + pub fn enqueue_identifier(&mut self, identifier: String, tried_urls: HashSet); + pub fn next_ready_identifier(&mut self) -> Option; + pub fn mark_identifier_not_in_progress(&mut self, identifier: &str); + pub fn remove_identifier(&mut self, identifier: &str); +} +``` +**Unit Tests** (4 tests): ```rust -mod throttle_tests { +#[cfg(test)] +mod tests { #[test] - fn concurrent_limit_enforced() { - let mut throttle = DomainThrottle::new("example.com".into(), 2, 100); - - assert!(throttle.has_capacity()); - throttle.start_request(); - assert!(throttle.has_capacity()); - throttle.start_request(); - assert!(!throttle.has_capacity()); // At limit - - throttle.complete_request(); - assert!(throttle.has_capacity()); // Capacity freed + fn concurrent_limit_blocks_when_saturated() { + // has_capacity() returns false when in_flight >= max_concurrent } #[test] - fn rate_limit_enforced() { - let mut throttle = DomainThrottle::new("example.com".into(), 100, 2); - - // Make 2 requests (at rate limit) - throttle.start_request(); - throttle.complete_request(); - throttle.start_request(); - throttle.complete_request(); - - assert!(!throttle.has_capacity()); // Rate limited - - // After 60s window passes, capacity returns - // (In real tests, use tokio::time::pause/advance) + fn rate_limit_blocks_when_window_full() { + // has_capacity() returns false when requests in last 60s >= max_per_minute + // Use deterministic time (pass Instant or mock clock) } #[test] - fn round_robin_fairness() { - let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); - - // Enqueue 3 identifiers - throttle.enqueue_identifier("A".into(), HashSet::new()); - throttle.enqueue_identifier("B".into(), HashSet::new()); - throttle.enqueue_identifier("C".into(), HashSet::new()); - - // Process in round-robin order - let mut order = vec![]; - for _ in 0..6 { - if let Some(id) = throttle.next_ready_identifier() { - order.push(id.clone()); - throttle.mark_identifier_not_in_progress(&id); - } - } - - assert_eq!(order, vec!["A", "B", "C", "A", "B", "C"]); + fn round_robin_processes_identifiers_fairly() { + // Enqueue A, B, C → next_ready returns A, B, C, A, B, C... } #[test] fn skips_in_progress_identifiers() { - let mut throttle = DomainThrottle::new("example.com".into(), 5, 30); - - throttle.enqueue_identifier("A".into(), HashSet::new()); - throttle.enqueue_identifier("B".into(), HashSet::new()); - - // Get A (now in_progress) - assert_eq!(throttle.next_ready_identifier(), Some("A".into())); - - // Next skips A, returns B - assert_eq!(throttle.next_ready_identifier(), Some("B".into())); - - // Both in progress - assert_eq!(throttle.next_ready_identifier(), None); + // next_ready skips identifiers where in_progress=true + } +} +``` + +**Success Criteria**: +- [ ] Concurrent limit enforced (blocks at max_concurrent) +- [ ] Rate limit enforced (blocks at max_per_minute within 60s window) +- [ ] Round-robin ordering maintained across calls +- [ ] In-progress identifiers skipped +- [ ] All 4 unit tests pass + +--- + +### Phase 3: ThrottleManager + +**Goal**: Implement the manager that owns all domain throttles and provides the sync interface. + +**Files**: +- `src/purgatory/sync/throttle.rs` (extend) + +**Deliverables**: +```rust +pub struct ThrottleManager { + throttles: DashMap, + max_concurrent_per_domain: u32, + max_per_minute_per_domain: u32, +} + +impl ThrottleManager { + pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self; + pub fn is_throttled(&self, domain: &str) -> bool; + pub fn start_request(&self, domain: &str); + pub fn complete_request(&self, domain: &str); + pub fn enqueue_identifier(&self, domain: &str, identifier: String, tried_urls: HashSet); +} +``` + +**Unit Tests** (1 test): +```rust +#[cfg(test)] +mod tests { + #[test] + fn is_throttled_reflects_domain_capacity() { + // is_throttled returns true when domain has no capacity } } ``` -#### Test Module 3: Retry & URL Selection +**Success Criteria**: +- [ ] `is_throttled()` correctly reflects domain capacity +- [ ] `start_request()`/`complete_request()` delegate to correct domain +- [ ] `enqueue_identifier()` creates domain throttle if needed +- [ ] Unit test passes + +--- + +### Phase 4: SyncContext Trait and MockSyncContext + +**Goal**: Define the abstraction for sync operations and create the test mock. + +**Files**: +- `src/purgatory/sync/context.rs` (new) + +**Deliverables**: +```rust +#[async_trait] +pub trait SyncContext: Send + Sync { + async fn fetch_repository_data(&self, identifier: &str) -> Result; + fn collect_needed_oids(&self, identifier: &str) -> HashSet; + async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result>; + async fn process_satisfiable_events(&self, identifier: &str) -> Result; + fn has_pending_events(&self, identifier: &str) -> bool; + fn find_target_repo(&self, data: &RepositoryData) -> Option; + fn our_domain(&self) -> Option<&str>; +} + +// Test support +#[cfg(test)] +pub struct MockSyncContext { ... } +``` + +**Unit Tests** (0 tests): +- This phase creates infrastructure only; tests come in Phase 5 + +**Success Criteria**: +- [ ] `SyncContext` trait compiles with all required methods +- [ ] `MockSyncContext` implements `SyncContext` +- [ ] Mock supports builder pattern for test setup + +--- + +### Phase 5: Core Sync Functions -Tests `sync_identifier_next_url` behavior: +**Goal**: Implement `sync_identifier_next_url` and `sync_identifier_from_url`. +**Files**: +- `src/purgatory/sync/functions.rs` (new) + +**Deliverables**: +```rust +pub async fn sync_identifier_next_url( + ctx: &C, + identifier: &str, + domain: Option<&str>, + tried_urls: &HashSet, + throttle_manager: &ThrottleManager, +) -> Option; + +pub async fn sync_identifier_from_url( + ctx: &C, + identifier: &str, + url: &str, + throttle_manager: &Arc, +) -> usize; +``` + +**Unit Tests** (3 tests): ```rust -mod retry_tests { +#[cfg(test)] +mod tests { #[tokio::test] - async fn tries_urls_until_success() { - let ctx = MockSyncContext::new() - .with_urls(&["https://fail1.com/r.git", "https://fail2.com/r.git", "https://ok.com/r.git"]) - .with_needed_oids(&["abc123"]) - .url_provides("https://ok.com/r.git", &["abc123"]); - - let throttle = Arc::new(ThrottleManager::new(5, 30)); - sync_identifier(&ctx, "test-repo", &throttle).await; - - // Should have tried all 3 URLs - assert_eq!(ctx.fetch_log.borrow().len(), 3); - // OID should now be satisfied - assert!(ctx.needed_oids.borrow().is_empty()); + async fn next_url_skips_throttled_domains() { + // When domain is throttled, next_url returns URL from different domain } #[tokio::test] - async fn skips_throttled_domains() { - let ctx = MockSyncContext::new() - .with_urls(&["https://throttled.com/r.git", "https://ok.com/r.git"]) - .with_needed_oids(&["abc123"]) - .url_provides("https://ok.com/r.git", &["abc123"]); - - let throttle = Arc::new(ThrottleManager::new(5, 30)); - - // Saturate throttled.com - for _ in 0..5 { - throttle.start_request("throttled.com"); - } - - sync_identifier(&ctx, "test-repo", &throttle).await; - - // Should only try ok.com (throttled.com skipped) - let urls_tried: Vec<_> = ctx.fetch_log.borrow().iter().map(|(u, _)| u.clone()).collect(); - assert!(!urls_tried.contains(&"https://throttled.com/r.git".to_string())); - assert!(urls_tried.contains(&"https://ok.com/r.git".to_string())); + async fn next_url_skips_tried_urls() { + // URLs in tried_urls set are not returned } #[tokio::test] - async fn enqueues_with_throttled_domains() { - let ctx = MockSyncContext::new() - .with_urls(&["https://throttled.com/r.git"]) - .with_needed_oids(&["abc123"]); - - let throttle = Arc::new(ThrottleManager::new(1, 30)); - throttle.start_request("throttled.com"); // Saturate - - sync_identifier(&ctx, "test-repo", &throttle).await; - - // Should be enqueued with throttled.com for later - assert!(throttle.has_queued_identifier("throttled.com", "test-repo")); + async fn from_url_fetches_and_processes_on_success() { + // Successful fetch triggers process_satisfiable_events } } ``` -#### Test Module 4: Partial OID Fetching +**Success Criteria**: +- [ ] `sync_identifier_next_url` returns non-throttled, untried URL +- [ ] `sync_identifier_next_url` returns `None` when all URLs tried or throttled +- [ ] `sync_identifier_from_url` calls `fetch_oids` and `process_satisfiable_events` +- [ ] All 3 unit tests pass -Tests behavior when servers have different subsets of OIDs: +--- +### Phase 6: sync_identifier Orchestration + +**Goal**: Implement the main sync loop for a single identifier. + +**Files**: +- `src/purgatory/sync/functions.rs` (extend) + +**Deliverables**: ```rust -mod partial_oid_tests { +pub async fn sync_identifier( + ctx: &C, + identifier: &str, + throttle_manager: &Arc, +) -> bool; // true if complete, false if pending +``` + +**Unit Tests** (2 tests): +```rust +#[cfg(test)] +mod tests { #[tokio::test] - async fn aggregates_oids_from_multiple_servers() { - // Server A has oid1, Server B has oid2 - let ctx = MockSyncContext::new() - .with_urls(&["https://a.com/r.git", "https://b.com/r.git"]) - .with_needed_oids(&["oid1", "oid2"]) - .url_provides("https://a.com/r.git", &["oid1"]) - .url_provides("https://b.com/r.git", &["oid2"]); - - let throttle = Arc::new(ThrottleManager::new(5, 30)); - let complete = sync_identifier(&ctx, "test-repo", &throttle).await; - - assert!(complete); - assert!(ctx.needed_oids.borrow().is_empty()); + async fn tries_multiple_urls_until_complete() { + // Tries URL1 (partial), URL2 (partial), URL3 (complete) → returns true } #[tokio::test] - async fn incomplete_when_oids_unavailable() { - // No server has oid2 - let ctx = MockSyncContext::new() - .with_urls(&["https://a.com/r.git", "https://b.com/r.git"]) - .with_needed_oids(&["oid1", "oid2"]) - .url_provides("https://a.com/r.git", &["oid1"]) - .url_provides("https://b.com/r.git", &["oid1"]); // Same OID, missing oid2 - - let throttle = Arc::new(ThrottleManager::new(5, 30)); - let complete = sync_identifier(&ctx, "test-repo", &throttle).await; - - assert!(!complete); - assert!(ctx.needed_oids.borrow().contains("oid2")); + async fn enqueues_throttled_domains_when_incomplete() { + // When URLs remain but are throttled, enqueues and returns false } } ``` -#### Test Module 5: Burst Debouncing +**Success Criteria**: +- [ ] Loops through available URLs until sync complete or all tried +- [ ] Enqueues with throttled domains when OIDs still needed +- [ ] Returns `true` when all OIDs fetched, `false` otherwise +- [ ] Both unit tests pass + +--- + +### Phase 7: Purgatory Sync Queue Integration + +**Goal**: Add sync queue to Purgatory and implement `enqueue_sync`. -Tests that rapid event arrivals are debounced: +**Files**: +- `src/purgatory/mod.rs` (extend) +**Deliverables**: ```rust -mod debounce_tests { - #[tokio::test] - async fn burst_events_debounced() { - tokio::time::pause(); - - let queue = SyncQueue::new(); - - // Simulate 10 events arriving in 100ms - for i in 0..10 { - queue.enqueue("test-repo", Duration::from_millis(500)); - tokio::time::advance(Duration::from_millis(10)).await; - } - - // Should only have one entry (debounced) - assert_eq!(queue.len(), 1); - - // Entry should be ready after 500ms from last event - tokio::time::advance(Duration::from_millis(400)).await; - assert!(!queue.get("test-repo").unwrap().is_ready()); - - tokio::time::advance(Duration::from_millis(100)).await; - assert!(queue.get("test-repo").unwrap().is_ready()); +impl Purgatory { + // New field: sync_queue: Arc> + + pub fn enqueue_sync(&self, identifier: &str, delay: Duration); + pub fn has_pending_events(&self, identifier: &str) -> bool; +} +``` + +**Unit Tests** (1 test): +```rust +#[cfg(test)] +mod tests { + #[test] + fn enqueue_sync_debounces_rapid_calls() { + // Multiple enqueue_sync calls within delay window result in single entry } } ``` -### Integration Tests (tests/purgatory_sync.rs) +**Success Criteria**: +- [ ] `enqueue_sync` adds/updates entry in sync_queue +- [ ] Rapid calls debounce (don't create multiple entries) +- [ ] `has_pending_events` checks both state_events and pr_events +- [ ] Unit test passes + +--- + +### Phase 8: Main Sync Loop -Integration tests verify end-to-end behavior with real relay instances. Keep these minimal and focused on outcomes. +**Goal**: Implement the background sync loop that processes ready identifiers. +**Files**: +- `src/purgatory/sync/loop.rs` (new) + +**Deliverables**: ```rust -//! Purgatory Sync Integration Tests -//! -//! These tests verify that state and PR events sync correctly between -//! ngit-grasp instances, including git data fetching. +impl Purgatory { + pub fn start_sync_loop( + self: Arc, + ctx: Arc, + throttle_manager: Arc, + ) -> JoinHandle<()>; +} +``` -mod common; -use common::{TestRelay, TestRepo, create_state_event, create_pr_event}; +**Unit Tests** (0 tests): +- The sync loop is tested via integration tests; unit testing async loops is fragile -/// Core test: State event syncs with git data -/// -/// Verifies the complete purgatory workflow: -/// 1. State event arrives at relay B (no git data) -/// 2. Event enters purgatory -/// 3. Git data fetched from relay A -/// 4. Event released and served +**Success Criteria**: +- [ ] Loop runs every 1 second +- [ ] Finds ready identifiers and spawns sync tasks +- [ ] Applies backoff on incomplete syncs +- [ ] Removes completed identifiers from queue + +--- + +### Phase 9: RealSyncContext Implementation + +**Goal**: Implement the production `SyncContext` that connects to real systems. + +**Files**: +- `src/purgatory/sync/context.rs` (extend) + +**Deliverables**: +```rust +pub struct RealSyncContext { + purgatory: Purgatory, + database: SharedDatabase, + git_data_path: PathBuf, + our_domain: Option, + local_relay: Option, +} + +impl SyncContext for RealSyncContext { ... } +``` + +**Unit Tests** (0 tests): +- `RealSyncContext` is tested via integration tests + +**Success Criteria**: +- [ ] All `SyncContext` methods implemented +- [ ] Connects to real database, git, and relay +- [ ] `process_satisfiable_events` releases events from purgatory + +--- + +### Phase 10: Integration Tests + +**Goal**: Verify end-to-end sync behavior with real relay instances. + +**Files**: +- `tests/purgatory_sync.rs` (new) + +**Integration Tests** (4 tests): +```rust #[tokio::test] async fn state_event_syncs_from_remote() { - // Setup: Two relays, relay A has repo with git data - let relay_a = TestRelay::start().await; - let repo = TestRepo::create_with_commits(&relay_a, 3).await; - - // Relay B configured to sync from A - let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; - - // Publish state event to relay B (git data not present) - let state = create_state_event(&repo); - relay_b.publish(&state).await; - - // Wait for sync (with timeout) - let synced = relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)).await; - assert!(synced, "State event should be served after git sync"); - - // Verify git data is now available - assert!(relay_b.can_clone(&repo.identifier()).await); + // State event enters purgatory, git data fetched, event released } -/// Core test: PR event syncs with commit data #[tokio::test] async fn pr_event_syncs_from_remote() { - let relay_a = TestRelay::start().await; - let repo = TestRepo::create_with_commits(&relay_a, 1).await; - - // Create PR with a new commit - let pr_commit = repo.create_pr_commit().await; - let pr_event = create_pr_event(&repo, &pr_commit); - relay_a.publish(&pr_event).await; - relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await; - - // Relay B syncs - let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; - relay_b.publish(&pr_event).await; - - let synced = relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)).await; - assert!(synced, "PR event should be served after git sync"); + // PR event enters purgatory, commit fetched, event released } -/// Edge case: Concurrent state and PR events for same repo -/// -/// Verifies that both event types sync correctly when arriving together. #[tokio::test] async fn concurrent_state_and_pr_sync() { - let relay_a = TestRelay::start().await; - let repo = TestRepo::create_with_commits(&relay_a, 2).await; - - let state = create_state_event(&repo); - let pr_commit = repo.create_pr_commit().await; - let pr_event = create_pr_event(&repo, &pr_commit); - - relay_a.publish(&state).await; - relay_a.publish(&pr_event).await; - relay_a.push_pr_ref(&repo, &pr_event.id, &pr_commit).await; - - let relay_b = TestRelay::start_with_sync_source(relay_a.url()).await; - - // Publish both simultaneously - tokio::join!( - relay_b.publish(&state), - relay_b.publish(&pr_event), - ); - - // Both should sync - let (state_synced, pr_synced) = tokio::join!( - relay_b.wait_for_event_served(&state.id, Duration::from_secs(10)), - relay_b.wait_for_event_served(&pr_event.id, Duration::from_secs(10)), - ); - - assert!(state_synced && pr_synced, "Both events should sync"); + // Both event types sync correctly when arriving together } -/// Edge case: Server has subset of required OIDs -/// -/// Verifies aggregation from multiple sources when no single -/// server has all required OIDs. #[tokio::test] -async fn partial_oid_aggregation() { - // Relay A has commits 1-2, Relay B has commits 2-3 - let relay_a = TestRelay::start().await; - let relay_b = TestRelay::start().await; - - let repo_a = TestRepo::create_with_commits(&relay_a, 2).await; - let repo_b = TestRepo::create_with_commits(&relay_b, 2).await; - repo_b.push_additional_commits(1).await; // Commit 3 only on B - - // State references commits 1, 2, and 3 - let state = create_state_event_with_commits(&[ - repo_a.commit(0), - repo_a.commit(1), - repo_b.commit(2), - ]); - - // Relay C syncs from both A and B - let relay_c = TestRelay::start_with_sync_sources(&[relay_a.url(), relay_b.url()]).await; - relay_c.publish(&state).await; - - let synced = relay_c.wait_for_event_served(&state.id, Duration::from_secs(15)).await; - assert!(synced, "Should aggregate OIDs from multiple sources"); +async fn partial_oid_aggregation_from_multiple_servers() { + // OIDs aggregated when no single server has all } ``` -### Test Summary - -| Category | Test Count | Purpose | -|----------|------------|---------| -| Backoff | 2 | Verify exponential backoff timing and reset | -| Throttle | 4 | Verify rate limiting, concurrency, round-robin | -| Retry | 3 | Verify URL iteration and throttle-skip behavior | -| Partial OIDs | 2 | Verify OID aggregation from multiple servers | -| Debounce | 1 | Verify burst event handling | -| **Unit Total** | **12** | | -| Integration | 4 | End-to-end sync verification | -| **Grand Total** | **16** | | - -This is a focused set that covers the key behaviors without redundant tests. - -## Migration Path - -1. **Phase 1**: Add new data structures (SyncQueueEntry, ThrottleManager, DomainThrottle, SyncContext trait) -2. **Phase 2**: Implement `sync_identifier_next_url` and `sync_identifier_from_url` with unit tests -3. **Phase 3**: Implement `sync_identifier` and main sync loop alongside existing `start_state_sync` -4. **Phase 4**: Implement ThrottleManager trigger-based processing -5. **Phase 5**: Add PR event syncing -6. **Phase 6**: Remove old `start_state_sync` code +**Success Criteria**: +- [ ] All 4 integration tests pass +- [ ] State events release after git sync +- [ ] PR events release after commit sync +- [ ] Partial OID scenarios handled correctly + +--- + +### Phase 11: Cleanup + +**Goal**: Remove old `start_state_sync` code and wire up new system. + +**Files**: +- `src/purgatory/mod.rs` (modify) +- `src/main.rs` (modify) + +**Deliverables**: +- Remove `start_state_sync` method +- Wire `start_sync_loop` into application startup +- Update `add_state` to call `enqueue_sync` + +**Success Criteria**: +- [ ] Old sync code removed +- [ ] New sync loop starts on application boot +- [ ] All existing tests still pass +- [ ] All new tests pass + +--- + +## Test Summary + +| Phase | Unit Tests | Integration Tests | Total | +|-------|------------|-------------------|-------| +| 1. SyncQueueEntry | 2 | - | 2 | +| 2. DomainThrottle | 4 | - | 4 | +| 3. ThrottleManager | 1 | - | 1 | +| 4. SyncContext | 0 | - | 0 | +| 5. Core Functions | 3 | - | 3 | +| 6. sync_identifier | 2 | - | 2 | +| 7. Queue Integration | 1 | - | 1 | +| 8. Sync Loop | 0 | - | 0 | +| 9. RealSyncContext | 0 | - | 0 | +| 10. Integration | - | 4 | 4 | +| 11. Cleanup | 0 | - | 0 | +| **Total** | **13** | **4** | **17** | ## Configuration -- cgit v1.2.3