From 852eddcc33b59ed027e06d30456f6b9e3b9a31cb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 11:18:19 +0000 Subject: docs: purgatory tweak implementation plan --- docs/explanation/purgatory-sync-redesign.md | 68 +++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 14 deletions(-) (limited to 'docs') diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index dd59870..629c2ff 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md @@ -745,6 +745,17 @@ This separation enables: - DomainThrottle to process queued identifiers when capacity frees - Clean testability with mocked SyncContext +### Helper: extract_domain + +```rust +/// Extract domain from a URL (e.g., "https://github.com/foo/bar.git" → "github.com") +fn extract_domain(url: &str) -> Option { + url::Url::parse(url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) +} +``` + ### sync_identifier_next_url ```rust @@ -1286,8 +1297,11 @@ Each phase has clear deliverables, unit tests, and success criteria. Unit tests **Goal**: Implement the sync queue entry struct with backoff calculation. **Files**: +- `src/purgatory/sync/mod.rs` (new - module declaration) - `src/purgatory/sync/queue.rs` (new) +**Note**: This creates a new `sync` submodule under `src/purgatory/`. The current purgatory structure is flat (`mod.rs`, `helpers.rs`, `types.rs`). Add `pub mod sync;` to `src/purgatory/mod.rs`. + **Deliverables**: ```rust pub struct SyncQueueEntry { @@ -1397,9 +1411,9 @@ mod tests { --- -### Phase 3: ThrottleManager +### Phase 3: ThrottleManager (Rate Limiting Only) -**Goal**: Implement the manager that owns all domain throttles and provides the sync interface. +**Goal**: Implement the manager that owns all domain throttles and provides rate limiting. Queue processing methods (`set_context`, `process_queued_identifier`) are added in Phase 6 after `SyncContext` exists. **Files**: - `src/purgatory/sync/throttle.rs` (extend) @@ -1410,14 +1424,15 @@ pub struct ThrottleManager { throttles: DashMap, max_concurrent_per_domain: u32, max_per_minute_per_domain: u32, + // Note: ctx: OnceLock> added in Phase 6 } impl ThrottleManager { pub fn new(max_concurrent: u32, max_per_minute: u32) -> Self; pub fn is_throttled(&self, domain: &str) -> bool; pub fn start_request(&self, domain: &str); - pub fn complete_request(&self, domain: &str); - pub fn enqueue_identifier(&self, domain: &str, identifier: String, tried_urls: HashSet); + pub fn complete_request(&self, domain: &str); // Trigger logic added in Phase 6 + pub fn enqueue_identifier(&self, domain: &str, identifier: String, tried_urls: HashSet); // Trigger logic added in Phase 6 } ``` @@ -1438,6 +1453,8 @@ mod tests { - [ ] `enqueue_identifier()` creates domain throttle if needed - [ ] Unit test passes +**Note**: The `complete_request()` and `enqueue_identifier()` methods in this phase only update state. The trigger-based processing (spawning tasks when capacity frees) is added in Phase 6 after `SyncContext` is available. + --- ### Phase 4: SyncContext Trait and MockSyncContext @@ -1533,20 +1550,36 @@ mod tests { --- -### Phase 6: sync_identifier Orchestration +### Phase 6: sync_identifier Orchestration + ThrottleManager Queue Processing -**Goal**: Implement the main sync loop for a single identifier. +**Goal**: Implement the main sync loop for a single identifier AND add trigger-based queue processing to ThrottleManager. **Files**: - `src/purgatory/sync/functions.rs` (extend) +- `src/purgatory/sync/throttle.rs` (extend - add queue processing triggers) **Deliverables**: ```rust +// In functions.rs pub async fn sync_identifier( ctx: &C, identifier: &str, throttle_manager: &Arc, ) -> bool; // true if complete, false if pending + +// In throttle.rs - add to ThrottleManager +impl ThrottleManager { + /// Set the sync context (called once at startup) + pub fn set_context(&self, ctx: Arc); + + /// Try to process the next queued identifier for a domain (internal) + fn try_process_next(self: &Arc, domain: &str); + + /// Process a single identifier from a domain's queue (internal) + async fn process_queued_identifier(self: &Arc, domain: &str, identifier: &str); +} + +// Update complete_request and enqueue_identifier to trigger processing ``` **Unit Tests** (2 tests): @@ -1569,6 +1602,8 @@ mod tests { - [ ] Loops through available URLs until sync complete or all tried - [ ] Enqueues with throttled domains when OIDs still needed - [ ] Returns `true` when all OIDs fetched, `false` otherwise +- [ ] `complete_request()` triggers `try_process_next()` when capacity available +- [ ] `enqueue_identifier()` triggers `try_process_next()` when capacity available - [ ] Both unit tests pass --- @@ -1627,11 +1662,13 @@ impl Purgatory { } ``` +**Note**: The sync loop interval is hardcoded to 1 second. No configuration option needed. + **Unit Tests** (0 tests): - The sync loop is tested via integration tests; unit testing async loops is fragile **Success Criteria**: -- [ ] Loop runs every 1 second +- [ ] Loop runs every 1 second (hardcoded) - [ ] Finds ready identifiers and spawns sync tasks - [ ] Applies backoff on incomplete syncs - [ ] Removes completed identifiers from queue @@ -1647,8 +1684,9 @@ This is the core function described in [Unified Git Data Sync](unify-git-data-sy - `RealSyncContext::process_newly_available_git_data` after purgatory sync fetches OIDs **Files**: -- `src/git/sync.rs` (new) -- `src/purgatory/mod.rs` (extend - add secondary index for PR events by identifier) +- `src/git/sync.rs` (extend - add `ProcessResult` alongside existing types) + +**Note**: `src/git/sync.rs` already exists with `sync_to_owner_repos`, `align_repository_with_state`, etc. This phase extends it with the unified processing function. **Deliverables**: ```rust @@ -1684,7 +1722,8 @@ fn extract_identifier_from_pr_event(event: &Event) -> Option; // Purgatory additions impl Purgatory { - /// Find all PR events for an identifier (uses secondary index) + /// Find all PR events for an identifier. + /// Filters pr_events entries where the identifier matches (no secondary index needed). pub fn find_prs_for_identifier(&self, identifier: &str) -> Vec; } ``` @@ -1714,7 +1753,7 @@ mod tests { - [ ] `process_newly_available_git_data` discovers satisfiable events from purgatory - [ ] State events: syncs OIDs to owner repos, aligns refs, sets HEAD, saves to DB, notifies WS, removes from purgatory - [ ] PR events: syncs commit to owner repos, creates refs/nostr/, saves to DB, notifies WS, removes from purgatory -- [ ] Secondary index `pr_events_by_identifier` maintained correctly +- [ ] `find_prs_for_identifier` filters pr_events by identifier correctly - [ ] All 3 unit tests pass --- @@ -1879,10 +1918,10 @@ async fn push_triggers_unified_processing() { |-------|------------|-------------------|-------| | 1. SyncQueueEntry | 2 | - | 2 | | 2. DomainThrottle | 4 | - | 4 | -| 3. ThrottleManager | 1 | - | 1 | +| 3. ThrottleManager (rate limiting) | 1 | - | 1 | | 4. SyncContext | 0 | - | 0 | | 5. Core Functions | 3 | - | 3 | -| 6. sync_identifier | 2 | - | 2 | +| 6. sync_identifier + queue triggers | 2 | - | 2 | | 7. Queue Integration | 1 | - | 1 | | 8. Sync Loop | 0 | - | 0 | | 9. Unified Function | 3 | - | 3 | @@ -1896,12 +1935,13 @@ async fn push_triggers_unified_processing() { | Option | CLI Flag | Environment Variable | Default | |--------|----------|---------------------|---------| -| Sync loop interval | `--sync-loop-interval-ms` | `NGIT_SYNC_LOOP_INTERVAL_MS` | `1000` | | Domain concurrent limit | `--sync-domain-concurrent` | `NGIT_SYNC_DOMAIN_CONCURRENT` | `5` | | Domain rate limit | `--sync-domain-rate-limit` | `NGIT_SYNC_DOMAIN_RATE_LIMIT` | `30` | | Default sync delay | `--sync-default-delay-secs` | `NGIT_SYNC_DEFAULT_DELAY_SECS` | `180` | | Immediate sync delay | `--sync-immediate-delay-ms` | `NGIT_SYNC_IMMEDIATE_DELAY_MS` | `500` | +**Note**: Sync loop interval is hardcoded to 1 second (not configurable). + ## Observability ### Metrics -- cgit v1.2.3