From 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 09:07:54 +0000 Subject: improve sync design --- .../state-structure-redesign-proposal.md | 373 +++++++++++++++++++++ 1 file changed, 373 insertions(+) create mode 100644 docs/explanation/state-structure-redesign-proposal.md (limited to 'docs/explanation/state-structure-redesign-proposal.md') diff --git a/docs/explanation/state-structure-redesign-proposal.md b/docs/explanation/state-structure-redesign-proposal.md new file mode 100644 index 0000000..0a27cf4 --- /dev/null +++ b/docs/explanation/state-structure-redesign-proposal.md @@ -0,0 +1,373 @@ +# State Structure Redesign Proposal v2 + +## The Core Problem + +We need to transform: +- **Repo Announcements** (30617) that list relays +- **Root Events** (1617/1618/1619/1621) that tag repos + +Into: +- **Per-relay subscriptions**: which repos and root events to sync from each relay + +And generate **RelayActions** when this mapping changes. + +--- + +## Proposed Data Model + +### 1. RepoIndex (Primary Source of Truth) + +```rust +/// Everything we know about repos we're tracking +/// Key: repo addressable ref ("30617:pubkey:identifier") +pub type RepoIndex = Arc>>; + +#[derive(Debug, Clone, Default)] +pub struct RepoInfo { + /// Relay URLs listed in the repo's announcement + pub relays: HashSet, + /// Root event IDs that reference this repo + pub root_events: HashSet, +} +``` + +**Updated by:** Database init, batch processing of new announcements/root events + +### 2. RelayIndex (Applied State) + +```rust +/// What we've told each relay to sync +/// Key: relay URL +pub type RelayIndex = Arc>>; + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct SyncTarget { + /// Repos we're syncing for this relay + pub repos: HashSet, + /// Root events we're tracking + pub root_events: HashSet, +} +``` + +**Updated by:** SyncManager after RelayActions are applied + +--- + +## The Transformation + +```mermaid +flowchart LR + subgraph Input + RA[Repo Announcements] + RE[Root Events] + end + + subgraph RepoIndex + R1[repo_a: relays=X,Y events=1,2] + R2[repo_b: relays=Y,Z events=3] + end + + subgraph Derived Target + T1[relay_X: repos=a events=1,2] + T2[relay_Y: repos=a,b events=1,2,3] + T3[relay_Z: repos=b events=3] + end + + subgraph RelayIndex Applied + A1[relay_X: repos=a events=1,2] + A2[relay_Y: repos=a events=1,2] + end + + RA --> R1 + RA --> R2 + RE --> R1 + RE --> R2 + + R1 --> T1 + R1 --> T2 + R2 --> T2 + R2 --> T3 +``` + +The **diff** between Derived Target and RelayIndex produces RelayActions: +- relay_Y needs AddFilters for repo_b and event 3 +- relay_Z needs SpawnRelay + +--- + +## Algorithm: derive_target_from_repo_index + +```rust +/// Derive what we SHOULD be syncing from the repo data +fn derive_relay_targets(repo_index: &HashMap) -> HashMap { + let mut targets: HashMap = HashMap::new(); + + for (repo_ref, info) in repo_index { + // For each relay that lists this repo + for relay_url in &info.relays { + let target = targets.entry(relay_url.clone()).or_default(); + target.repos.insert(repo_ref.clone()); + target.root_events.extend(info.root_events.iter().cloned()); + } + } + + targets +} +``` + +--- + +## Algorithm: process_batch + +```rust +async fn process_batch(&self, pending: &mut PendingUpdates) { + // ============================================ + // STEP 1: Update RepoIndex from batch + // ============================================ + + let mut repo_index = self.repo_index.write().await; + + // 1a. Process root events - add to repo's root_events set + for event in pending.root_events.drain(..) { + for repo_ref in extract_repo_refs(&event) { + repo_index.entry(repo_ref) + .or_default() + .root_events + .insert(event.id); + } + } + + // 1b. Process announcements - update repo's relay set + for event in pending.announcements.drain(..) { + if !lists_our_service(&event) { + continue; + } + let repo_ref = build_repo_ref(&event); + let relay_urls: HashSet = extract_relay_urls(&event) + .into_iter() + .filter(|url| !is_own_relay(url)) + .collect(); + + // Replace relay set (handles updates that change relays) + repo_index.entry(repo_ref) + .or_default() + .relays = relay_urls; + } + + // ============================================ + // STEP 2: Derive target state from RepoIndex + // ============================================ + + let target = derive_relay_targets(&repo_index); + drop(repo_index); // Release write lock + + // ============================================ + // STEP 3: Diff target vs applied (RelayIndex) + // ============================================ + + let applied = self.relay_index.read().await; + let actions = compute_relay_actions(&target, &applied); + drop(applied); // Release read lock + + // ============================================ + // STEP 4: Send actions & update RelayIndex + // ============================================ + + for action in actions { + match &action { + RelayAction::SpawnRelay { relay_url, repos_and_root_events } => { + // Update RelayIndex with new relay + let mut applied = self.relay_index.write().await; + applied.insert(relay_url.clone(), SyncTarget { + repos: repos_and_root_events.keys().cloned().collect(), + root_events: repos_and_root_events.values() + .flat_map(|e| e.iter().cloned()) + .collect(), + }); + } + RelayAction::AddFilters { relay_url, repos_and_new_root_event } => { + // Update RelayIndex with additions + let mut applied = self.relay_index.write().await; + if let Some(target) = applied.get_mut(relay_url) { + for (repo, events) in repos_and_new_root_event { + target.repos.insert(repo.clone()); + target.root_events.extend(events.iter().cloned()); + } + } + } + } + + // Send action to SyncManager + let _ = self.action_tx.send(action).await; + } +} +``` + +--- + +## Algorithm: compute_relay_actions + +```rust +fn compute_relay_actions( + target: &HashMap, + applied: &HashMap, +) -> Vec { + let mut actions = Vec::new(); + + for (relay_url, target_state) in target { + match applied.get(relay_url) { + None => { + // New relay - spawn it + let mut repos_and_events = HashMap::new(); + for repo in &target_state.repos { + // Get events for this specific repo + let events = target_state.root_events.clone(); // simplified + repos_and_events.insert(repo.clone(), events); + } + actions.push(RelayAction::SpawnRelay { + relay_url: relay_url.clone(), + repos_and_root_events: repos_and_events, + }); + } + Some(applied_state) => { + // Existing relay - check for new repos/events + let new_repos: HashSet<_> = target_state.repos + .difference(&applied_state.repos) + .cloned() + .collect(); + let new_events: HashSet<_> = target_state.root_events + .difference(&applied_state.root_events) + .cloned() + .collect(); + + if !new_repos.is_empty() || !new_events.is_empty() { + let mut repos_and_events = HashMap::new(); + for repo in &new_repos { + repos_and_events.insert(repo.clone(), new_events.clone()); + } + // Also handle new events for existing repos + if !new_events.is_empty() && new_repos.is_empty() { + for repo in &applied_state.repos { + repos_and_events.insert(repo.clone(), new_events.clone()); + } + } + + actions.push(RelayAction::AddFilters { + relay_url: relay_url.clone(), + repos_and_new_root_event: repos_and_events, + }); + } + } + } + } + + // Future: detect relay removal (in applied but not in target) + + actions +} +``` + +--- + +## Handling Announcement Updates + +When an announcement is **updated** and changes its relay list: + +```mermaid +flowchart TD + A[repo_a announcement updated] --> B[Old: relays X,Y] + B --> C[New: relays Y,Z] + C --> D[RepoIndex updated: repo_a.relays = Y,Z] + D --> E[derive_relay_targets] + E --> F[Target: X=empty, Y=repo_a, Z=repo_a] + F --> G[Diff with Applied: X=repo_a, Y=repo_a] + G --> H1[X: repo_a removed - future RemoveFilters] + G --> H2[Z: new relay - SpawnRelay] +``` + +The current RelayAction types only support growth (SpawnRelay, AddFilters). Removal would need a new `RemoveFilters` action type - this is a future enhancement. + +--- + +## Name Mappings + +| Current | Proposed | Semantics | +|---------|----------|-----------| +| `FollowingRepoRootEvents` | `RepoIndex` | Per-repo: relays + root events | +| `SyncRelays` | `RelayIndex` | Per-relay: what we're syncing (applied state) | +| - | `SyncTarget` | Struct for repos + events | +| - | `RepoInfo` | Struct for relay set + event set | + +--- + +## Data Flow Summary + +```mermaid +flowchart TB + subgraph Batch Input + RA[30617 Announcements] + RE[Root Events 1617-1621] + end + + subgraph Step 1: Update Source + RI[RepoIndex] + end + + subgraph Step 2: Derive Target + DT[derive_relay_targets] + TGT[Target HashMap] + end + + subgraph Step 3: Diff + RLI[RelayIndex - Applied] + DIFF[compute_relay_actions] + end + + subgraph Step 4: Apply + ACT[RelayActions] + SM[SyncManager] + end + + RA --> RI + RE --> RI + RI --> DT + DT --> TGT + TGT --> DIFF + RLI --> DIFF + DIFF --> ACT + ACT --> SM + ACT --> |update| RLI +``` + +--- + +## Files to Modify + +| File | Changes | +|------|---------| +| [`src/sync/mod.rs`](src/sync/mod.rs) | Replace type aliases with RepoIndex/RelayIndex + structs | +| [`src/sync/self_subscriber.rs`](src/sync/self_subscriber.rs) | Rewrite process_batch with new algorithm | + +--- + +## Questions for Approval + +1. **Naming**: Are `RepoIndex`/`RelayIndex` and `RepoInfo`/`SyncTarget` clear enough? + +2. **When to update RelayIndex**: Should we: + - (a) Update immediately when generating action (optimistic) ← proposed above + - (b) Update only after SyncManager confirms action succeeded + +3. **Bootstrap relay**: Keep special-casing it in RelayIndex (always present)? + +4. **Future work**: Add `RemoveFilters` action for relay removal, or defer? + +--- + +## Benefits + +1. **Logical flow**: Source → Derived → Diff → Actions +2. **Single source of truth**: RepoIndex is the authoritative data +3. **Clear transformation**: `derive_relay_targets()` is a pure function +4. **Handles updates**: Replacing `repo.relays` naturally handles announcement changes +5. **Testable**: Each step can be unit tested independently \ No newline at end of file -- cgit v1.2.3