From 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 09:07:54 +0000 Subject: improve sync design --- docs/explanation/grasp-02-proactive-sync-v3.md | 871 ++++++++++++++ docs/explanation/grasp-02-proactive-sync-v4.md | 1229 ++++++++++++++++++++ .../state-structure-redesign-proposal.md | 373 ++++++ 3 files changed, 2473 insertions(+) create mode 100644 docs/explanation/grasp-02-proactive-sync-v3.md create mode 100644 docs/explanation/grasp-02-proactive-sync-v4.md create mode 100644 docs/explanation/state-structure-redesign-proposal.md (limited to 'docs') diff --git a/docs/explanation/grasp-02-proactive-sync-v3.md b/docs/explanation/grasp-02-proactive-sync-v3.md new file mode 100644 index 0000000..30b3102 --- /dev/null +++ b/docs/explanation/grasp-02-proactive-sync-v3.md @@ -0,0 +1,871 @@ +# GRASP-02: Proactive Sync v3 - Event-Driven Design + +## Overview + +This document presents v3 of the proactive sync design. Key principles: + +1. **Self-subscription as the only mechanism** - No database initialization at startup +2. **Batch-based pending tracking** - Each batch confirms independently +3. **Single action type** - AddFilters only, auto-spawn connections +4. **Three-way state model** - RepoSyncIndex (want) → PendingSyncIndex (in-flight) → RelaySyncIndex (confirmed) + +--- + +## Data Model + +### RepoSyncIndex (Source of Truth) + +```rust +/// What we WANT to sync - derived from events received via self-subscription. +/// Updated immediately when self-subscriber batch fires. +/// Key: repo addressable ref ("30617:pubkey:identifier") +pub type RepoSyncIndex = Arc>>; + +#[derive(Debug, Clone, Default)] +pub struct RepoSyncNeeds { + /// Relay URLs listed in this repo's 30617 announcement + pub relays: HashSet, + /// Root event IDs (1617/1618/1619/1621) that reference this repo + pub root_events: HashSet, +} +``` + +### RelaySyncIndex (Confirmed State + Connection) + +```rust +/// What we've CONFIRMED syncing - includes connection state for integrated lifecycle. +/// Key: relay URL +pub type RelaySyncIndex = Arc>>; + +/// Connection status for a relay +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionStatus { + /// Not currently connected + Disconnected, + /// Connection attempt in progress + Connecting, + /// Successfully connected and subscribed + Connected, +} + +/// Complete state for a single relay - combines sync needs with connection lifecycle +#[derive(Debug)] +pub struct RelayState { + /// Repos we've confirmed syncing from this relay + pub repos: HashSet, + /// Root events we've confirmed tracking + pub root_events: HashSet, + /// If true, never disconnect this relay + pub is_bootstrap: bool, + /// Current connection status + pub connection_status: ConnectionStatus, + /// When we last successfully connected (for since filter on reconnect) + pub last_connected: Option, + /// When we disconnected (for 15-minute state retention rule) + pub disconnected_at: Option, + /// The active connection (None if disconnected) + pub connection: Option, +} + +impl RelayState { + /// Check if state should be cleared based on 15-minute rule + pub fn should_clear_state(&self) -> bool { + match self.disconnected_at { + Some(disconnected) => { + let now = Timestamp::now(); + now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes + } + None => false, // Still connected or never connected + } + } + + /// Clear repos and root_events (called when reconnect takes > 15 minutes) + pub fn clear_sync_state(&mut self) { + self.repos.clear(); + self.root_events.clear(); + } +} +``` + +### PendingSyncIndex (In-Flight Batches) + +```rust +/// Tracks batches of subscriptions that are in-flight, awaiting EOSE. +/// Each batch has its own ID and can confirm independently. +/// Key: relay URL +pub type PendingSyncIndex = Arc>>>; + +#[derive(Debug, Clone)] +pub struct PendingBatch { + /// Unique ID for this batch (for debugging/logging) + pub batch_id: u64, + /// The items this batch is syncing + pub items: PendingItems, + /// Subscription IDs that must ALL receive EOSE before confirming + pub outstanding_subs: HashSet, +} + +#[derive(Debug, Clone, Default)] +pub struct PendingItems { + pub repos: HashSet, + pub root_events: HashSet, +} +``` + +--- + +## State Flow + +```mermaid +flowchart TB + subgraph Input + SS[SelfSubscriber] + OWN[Own Relay] + end + + subgraph RepoSyncIndex - Want + RSI[HashMap of Repo to Relays+Events] + end + + subgraph Derived Target + DT[derive_relay_targets fn] + TGT[Per-relay: repos + events we should sync] + end + + subgraph PendingSyncIndex - In Flight + PSI[Vec of PendingBatch per relay] + end + + subgraph RelaySyncIndex - State + Connection + RLI[RelayState per relay] + CONN[connection: Option of RelayConnection] + STATUS[connection_status: Connected/Disconnected/Connecting] + REPOS[repos + root_events] + end + + SS -->|subscribe| OWN + OWN -->|events| SS + SS -->|batch fires| RSI + RSI --> DT + DT --> TGT + TGT -->|diff: target - pending - confirmed| DIFF[Compute new items] + PSI --> DIFF + RLI --> DIFF + DIFF -->|skip if disconnected| CHECK{Connected?} + CHECK -->|yes| AF[AddFilters] + CHECK -->|no| QUEUE[Queued in RelayState.repos] + AF -->|subscribe| CONN + AF -->|create batch| PSI + CONN -->|EOSE| PSI + PSI -->|batch complete| REPOS + CONN -->|disconnect event| DISC[Mark Disconnected + set disconnected_at] + DISC -->|reconnect| RECONN[On Reconnect] + RECONN -->|check 15min rule| RULE{disconnected > 15min?} + RULE -->|yes| CLEAR[Clear repos/root_events] + RULE -->|no| RETAIN[Keep retained state] + CLEAR --> REGEN[Regenerate AddFilters from RepoSyncIndex] + RETAIN --> RESUB[Resubscribe with since filter] +``` + +### Connection Lifecycle Integration + +The `RelayState` struct now owns both the connection and sync state: + +```rust +// On disconnect (detected via RelayPoolNotification::Shutdown or handle_notifications returning) +fn handle_disconnect(&mut self, relay_url: &str) { + if let Some(state) = self.relay_sync_index.write().await.get_mut(relay_url) { + state.connection_status = ConnectionStatus::Disconnected; + state.disconnected_at = Some(Timestamp::now()); + state.connection = None; + + // Clear any pending batches for this relay + self.pending_sync_index.write().await.remove(relay_url); + } +} + +// On reconnect +async fn handle_reconnect(&mut self, relay_url: &str) -> Result<(), Error> { + let mut index = self.relay_sync_index.write().await; + let state = index.get_mut(relay_url).ok_or("Relay not in index")?; + + // Apply 15-minute state retention rule + if state.should_clear_state() { + tracing::info!("Reconnect after >15min for {}, clearing state", relay_url); + state.clear_sync_state(); + } + + // Create new connection + state.connection_status = ConnectionStatus::Connecting; + let connection = RelayConnection::new(relay_url.to_string()); + + // Connect with since filter if we have last_connected + let since = state.last_connected.map(|ts| { + Timestamp::from(ts.as_u64().saturating_sub(900)) // -15 min buffer + }); + + connection.connect_and_subscribe_with_since(since).await?; + + state.connection = Some(connection); + state.connection_status = ConnectionStatus::Connected; + state.last_connected = Some(Timestamp::now()); + state.disconnected_at = None; + + drop(index); // Release lock + + // Regenerate AddFilters from current state (either retained or fresh from RepoSyncIndex) + self.regenerate_filters_for_relay(relay_url).await; + + Ok(()) +} + +/// Regenerate AddFilters for a relay after reconnection +async fn regenerate_filters_for_relay(&mut self, relay_url: &str) { + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + + if let Some(target) = targets.get(relay_url) { + // Build filters for everything this relay should sync + let filters = build_filters(&target.repos, &target.root_events); + + // Create and process AddFilters action + let action = AddFilters { + relay_url: relay_url.to_string(), + repos: target.repos.clone(), + root_events: target.root_events.clone(), + filters, + }; + + self.handle_add_filters(action).await; + } +} +``` + +--- + +## Action Type + +```rust +/// Action sent from SelfSubscriber to SyncManager. +/// SyncManager auto-spawns relay connections if they don't exist. +pub struct AddFilters { + pub relay_url: String, + /// Items this action covers (for pending tracking) + pub repos: HashSet, + pub root_events: HashSet, + /// Pre-batched filters (each with <= 100 tags) + pub filters: Vec, +} +``` + +--- + +## Core Algorithms + +### 1. derive_relay_targets + +Transform RepoSyncIndex into per-relay sync targets: + +```rust +fn derive_relay_targets( + repo_index: &HashMap +) -> HashMap { + let mut targets: HashMap = HashMap::new(); + + for (repo_ref, needs) in repo_index { + for relay_url in &needs.relays { + let target = targets.entry(relay_url.clone()).or_default(); + target.repos.insert(repo_ref.clone()); + target.root_events.extend(needs.root_events.iter().cloned()); + } + } + + targets +} +``` + +### 2. compute_actions (Three-Way Diff) + +```rust +fn compute_actions( + targets: &HashMap, + pending: &HashMap>, + confirmed: &HashMap, +) -> Vec { + let mut actions = Vec::new(); + + for (relay_url, target) in targets { + // Skip disconnected relays - they'll get AddFilters on reconnect + if let Some(state) = confirmed.get(relay_url) { + if state.connection_status != ConnectionStatus::Connected { + continue; + } + } + + // Collect all pending items for this relay + let pending_repos: HashSet<_> = pending.get(relay_url) + .map(|batches| batches.iter() + .flat_map(|b| b.items.repos.iter().cloned()) + .collect()) + .unwrap_or_default(); + let pending_events: HashSet<_> = pending.get(relay_url) + .map(|batches| batches.iter() + .flat_map(|b| b.items.root_events.iter().cloned()) + .collect()) + .unwrap_or_default(); + + // Collect confirmed items for this relay + let confirmed_repos = confirmed.get(relay_url) + .map(|c| &c.repos) + .unwrap_or(&HashSet::new()); + let confirmed_events = confirmed.get(relay_url) + .map(|c| &c.root_events) + .unwrap_or(&HashSet::new()); + + // New = target - pending - confirmed + let new_repos: HashSet<_> = target.repos.iter() + .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) + .cloned() + .collect(); + let new_events: HashSet<_> = target.root_events.iter() + .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) + .cloned() + .collect(); + + if !new_repos.is_empty() || !new_events.is_empty() { + let filters = build_filters(&new_repos, &new_events); + actions.push(AddFilters { + relay_url: relay_url.clone(), + repos: new_repos, + root_events: new_events, + filters, + }); + } + } + + actions +} +``` + +### 3. handle_add_filters (SyncManager) + +```rust +impl SyncManager { + async fn handle_add_filters(&mut self, action: AddFilters) { + let AddFilters { relay_url, repos, root_events, filters } = action; + + // Auto-spawn connection if needed + if !self.connections.contains_key(&relay_url) { + self.spawn_connection(&relay_url).await; + } + + let conn = self.connections.get(&relay_url).unwrap(); + + // Subscribe and collect subscription IDs + // nostr-sdk 0.44: subscribe returns Output> + // since we're only subscribed to one relay per connection + let mut sub_ids = HashSet::new(); + for filter in filters { + // cloned filter for each subscription call + match conn.client.subscribe(filter, None).await { + Ok(output) => { + // Output contains subscription IDs for each relay + for sub_id in output.val { + sub_ids.insert(sub_id); + } + } + Err(e) => { + tracing::warn!("Failed to subscribe: {}", e); + } + } + } + + // Create pending batch + let batch = PendingBatch { + batch_id: self.next_batch_id(), + items: PendingItems { repos, root_events }, + outstanding_subs: sub_ids, + }; + + // Add to pending index + self.pending_sync_index.write().await + .entry(relay_url) + .or_default() + .push(batch); + } +} +``` + +### 4. handle_eose (Batch Completion) + +```rust +impl SyncManager { + async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { + let mut pending = self.pending_sync_index.write().await; + + if let Some(batches) = pending.get_mut(relay_url) { + // Find which batch this subscription belongs to + for batch in batches.iter_mut() { + if batch.outstanding_subs.remove(&sub_id) { + // Check if batch is now complete + if batch.outstanding_subs.is_empty() { + // Move items to confirmed + let items = batch.items.clone(); + drop(pending); // Release lock before acquiring another + + let mut confirmed = self.relay_sync_index.write().await; + let relay_confirmed = confirmed + .entry(relay_url.to_string()) + .or_default(); + relay_confirmed.repos.extend(items.repos); + relay_confirmed.root_events.extend(items.root_events); + + tracing::info!( + "Batch {} complete for {} - confirmed {} repos, {} events", + batch.batch_id, relay_url, + items.repos.len(), items.root_events.len() + ); + } + break; + } + } + + // Clean up completed batches + if let Some(batches) = pending.get_mut(relay_url) { + batches.retain(|b| !b.outstanding_subs.is_empty()); + } + } + } +} +``` + +--- + +## Self-Subscriber Flow + +### State Tracking + +```rust +pub struct SelfSubscriber { + own_relay_url: String, + relay_domain: String, + repo_sync_index: RepoSyncIndex, + pending_sync_index: PendingSyncIndex, + relay_sync_index: RelaySyncIndex, + action_tx: mpsc::Sender, + /// Timestamp of last successful connection - used for since filter on reconnection + last_connected: Option, + /// Is this the first connection attempt since startup? + is_initial_connect: bool, +} +``` + +### On Startup + +```rust +impl SelfSubscriber { + async fn run(mut self) { + // Connect to own relay + let client = Client::new(Keys::generate()); + client.add_relay(&self.own_relay_url).await?; + client.connect().await; + + // Track connection time + self.last_connected = Some(Timestamp::now()); + + // Subscribe WITHOUT since filter (get all historical) on first connect + let filter = Filter::new().kinds([ + Kind::Custom(30617), // Repository announcements + Kind::GitPatch, // 1617 + Kind::Custom(1618), // PRs + Kind::Custom(1619), // PR updates + Kind::GitIssue, // 1621 + ]); + + client.subscribe(filter, None).await?; + self.is_initial_connect = false; + + // Run event loop with batching + self.event_loop(&client).await; + } +} +``` + +### On Reconnection + +```rust +impl SelfSubscriber { + async fn reconnect(&mut self, client: &Client) -> Result<(), Error> { + // Reconnect to own relay + client.connect().await; + + // On reconnection ONLY, use since filter based on last_connected + let since = match self.last_connected { + Some(ts) => Timestamp::from(ts.as_u64().saturating_sub(900)), // -15 minutes buffer + None => Timestamp::from(0), // Shouldn't happen, but fall back to full sync + }; + + // Update last_connected AFTER computing since + self.last_connected = Some(Timestamp::now()); + + let filter = Filter::new() + .kinds([ + Kind::Custom(30617), + Kind::GitPatch, + Kind::Custom(1618), + Kind::Custom(1619), + Kind::GitIssue, + ]) + .since(since); + + client.subscribe(filter, None).await?; + Ok(()) + } +} +``` + +### Batching Logic + +```rust +impl SelfSubscriber { + async fn event_loop(&self, client: &Client) { + let mut pending_events: Vec = Vec::new(); + let mut batch_timer: Option = None; + let batch_window = Duration::from_secs(5); + + loop { + let timeout = batch_timer + .map(|t| batch_window.saturating_sub(t.elapsed())) + .unwrap_or(Duration::from_secs(60)); + + tokio::select! { + notification = client.notifications().recv() => { + if let Ok(RelayPoolNotification::Event { event, .. }) = notification { + pending_events.push(*event); + + // Start timer on first event (does NOT reset) + if batch_timer.is_none() { + batch_timer = Some(Instant::now()); + } + } + } + _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { + // Batch window elapsed + self.process_batch(pending_events.drain(..).collect()).await; + batch_timer = None; + } + } + } + } + + async fn process_batch(&self, events: Vec) { + // 1. Update RepoSyncIndex + for event in events { + match event.kind.as_u16() { + 30617 => self.handle_announcement(&event).await, + 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, + _ => {} + } + } + + // 2. Derive targets and compute actions + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + + let pending = self.pending_sync_index.read().await; + let confirmed = self.relay_sync_index.read().await; + + let actions = compute_actions(&targets, &pending, &confirmed); + + drop(repo_index); + drop(pending); + drop(confirmed); + + // 3. Send actions to SyncManager + for action in actions { + let _ = self.action_tx.send(action).await; + } + } +} +``` + +--- + +## Bootstrap Relay + +```rust +impl SyncManager { + async fn initialize_bootstrap(&mut self) { + if let Some(url) = &self.config.bootstrap_relay_url { + // Pre-mark as bootstrap (never removed) + self.relay_sync_index.write().await.insert( + url.clone(), + RelaySyncNeeds { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: true, + } + ); + + // Send Layer 1 filter + let filters = vec![ + Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)]) + ]; + + self.handle_add_filters(AddFilters { + relay_url: url.clone(), + repos: HashSet::new(), // Layer 1 doesn't track specific repos + root_events: HashSet::new(), + filters, + }).await; + } + } +} +``` + +--- + +## Disconnect Handling + +Direct in SyncManager (not via action): + +```rust +impl SyncManager { + async fn check_disconnects(&mut self) { + let confirmed = self.relay_sync_index.read().await; + + for (relay_url, state) in confirmed.iter() { + if state.is_bootstrap { + continue; // Never disconnect bootstrap + } + + if state.repos.is_empty() && state.root_events.is_empty() { + // No repos - disconnect + self.disconnect_relay(relay_url).await; + } + } + } + + async fn disconnect_relay(&mut self, relay_url: &str) { + self.relay_sync_index.write().await.remove(relay_url); + self.pending_sync_index.write().await.remove(relay_url); + + if let Some(conn) = self.connections.remove(relay_url) { + conn.disconnect().await; + } + } +} +``` + +--- + +## Relay Connection Lifecycle + +### State Machine for External Relays + +```mermaid +stateDiagram-v2 + [*] --> Connecting: spawn_connection + Connecting --> Connected: success + Connecting --> Backoff: failure + Connected --> Disconnected: connection lost + Connected --> [*]: intentional disconnect + Disconnected --> Backoff: record_failure + Backoff --> Connecting: backoff elapsed + Backoff --> Dead: 24h continuous failures + Dead --> Connecting: daily retry +``` + +### Health Integration + +Uses `RelayHealthTracker` from [`src/sync/health.rs`](../../src/sync/health.rs): + +```rust +impl SyncManager { + /// Spawn a connection with health tracking + async fn spawn_connection(&mut self, relay_url: &str) { + // Check if we should attempt connection + if !self.health_tracker.should_attempt_connection(relay_url) { + let remaining = self.health_tracker.get_remaining_backoff(relay_url); + tracing::debug!( + "Skipping connection to {} - backoff {:?}", + relay_url, + remaining + ); + return; + } + + match self.try_connect(relay_url).await { + Ok(conn) => { + self.health_tracker.record_success(relay_url); + self.connections.insert(relay_url.to_string(), conn); + } + Err(e) => { + self.health_tracker.record_failure(relay_url); + tracing::warn!("Connection to {} failed: {}", relay_url, e); + } + } + } +} +``` + +### Reconnection Loop + +Each relay connection runs its own reconnection loop: + +```rust +impl RelayConnection { + async fn run_with_reconnection( + mut self, + health_tracker: Arc, + event_tx: mpsc::Sender, + ) { + loop { + // Check backoff before attempting + if !health_tracker.should_attempt_connection(&self.url) { + if let Some(remaining) = health_tracker.get_remaining_backoff(&self.url) { + tokio::time::sleep(remaining).await; + continue; + } + } + + // Attempt connection + match self.connect_and_subscribe().await { + Ok(()) => { + health_tracker.record_success(&self.url); + + // Track when we connected for since filter on reconnect + let connected_at = Timestamp::now(); + + // Run event loop until disconnection + self.run_event_loop(&event_tx).await; + + // Connection lost - will reconnect with since filter + health_tracker.record_failure(&self.url); + + // On reconnect, use since = connected_at - 15 minutes + self.set_reconnect_since(connected_at); + } + Err(e) => { + health_tracker.record_failure(&self.url); + tracing::warn!("Connection to {} failed: {}", self.url, e); + } + } + + // Get backoff duration and wait + let state = health_tracker.get_state(&self.url); + if state == HealthState::Dead { + // Dead relays retry once per 24 hours + tokio::time::sleep(Duration::from_secs(24 * 3600)).await; + } + // Otherwise, loop will check should_attempt_connection + } + } +} +``` + +### Backoff Configuration + +From existing [`RelayHealthTracker`](../../src/sync/health.rs:91): + +| Parameter | Value | Notes | +|-----------|-------|-------| +| Base backoff | 5 seconds | First failure | +| Backoff multiplier | 2x | Exponential increase | +| Max backoff | 1 hour (configurable) | `sync_max_backoff_secs` | +| Dead threshold | 24 hours | Continuous failures | +| Dead retry interval | 24 hours | Once per day | + +--- + +## Consolidation + +### Threshold-Based (70 filters) + +```rust +impl SyncManager { + async fn maybe_consolidate(&mut self, relay_url: &str) { + let filter_count = self.get_filter_count(relay_url).await; + + if filter_count > 70 { + self.consolidate(relay_url).await; + } + } + + async fn consolidate(&mut self, relay_url: &str) { + // 1. Wait for all pending batches to complete + self.wait_pending_complete(relay_url).await; + + // 2. Close all subscriptions + self.close_all_subs(relay_url).await; + + // 3. Rebuild filters from confirmed state + let confirmed = self.relay_sync_index.read().await; + let state = confirmed.get(relay_url)?; + let filters = build_filters(&state.repos, &state.root_events); + + // 4. Resubscribe with since = now - 15 minutes + let since = Timestamp::now() - 900; + for filter in filters { + self.subscribe(relay_url, filter.since(since)).await; + } + } +} +``` + +### Daily Timer (23-25h Random) + +```rust +impl SyncManager { + async fn run_daily_consolidation(&self) { + loop { + let hours = 23 + rand::random::() * 2.0; + tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; + + for relay_url in self.connections.keys() { + self.consolidate(relay_url).await; + } + } + } +} +``` + +--- + +## Key Design Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | +| Since filter | Only on reconnection | Initial subscribe gets full history | +| Pending tracking | Per-batch with batch ID | Independent confirmation, no blocking | +| EOSE requirement | All subs in batch must complete | Single repo may need multiple filter subs | +| Action type | Struct not enum | Only one action type needed | +| Relay spawning | Auto-spawn on AddFilters | Simplifies action logic | +| Disconnect | Direct in SyncManager | Not worth an action type | +| Consolidation | 70 filters + daily timer | Threshold for growth, timer for staleness | +| Timestamps | In-memory only | Not critical for correctness | +| Health tracking | Reuse existing RelayHealthTracker | Already implements exponential backoff, dead relay detection | +| Reconnection backoff | Exponential to 1h max | Prevents hammering failed relays | +| Dead relay policy | 24h threshold, daily retry | Balance between giving up and resource waste | +| last_connected tracking | Per-connection in-memory | Enables 15-minute buffer on reconnect | +| Connection ownership | Inside RelayState | Ties connection lifecycle to sync state, simpler than separate maps | +| State retention rule | Clear if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | +| Skip disconnected | compute_actions skips disconnected | Prevents queuing AddFilters for offline relays | +| Reconnect triggers | handle_notifications returns or Shutdown | nostr-sdk signals disconnect via event loop exit | +| On-reconnect flow | Regenerate AddFilters from RepoSyncIndex | Fresh subscriptions for what we actually need | + +--- + +## Module Structure + +``` +src/sync/ +├── mod.rs # SyncManager, main loop +├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types +├── actions.rs # AddFilters struct, compute_actions +├── self_subscriber.rs # SelfSubscriber, batching logic +├── relay_connection.rs # Per-relay WebSocket connection +├── consolidation.rs # Consolidation logic, daily timer +├── health.rs # Health tracking (reuse from v2) +└── metrics.rs # Prometheus metrics (reuse from v2) \ No newline at end of file diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md new file mode 100644 index 0000000..aba88a5 --- /dev/null +++ b/docs/explanation/grasp-02-proactive-sync-v4.md @@ -0,0 +1,1229 @@ +# GRASP-02: Proactive Sync v4 - Health & Reconnection Design + +## Overview + +This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles: + +1. **Self-subscription as the only mechanism** - No database initialization at startup +2. **compute_actions as single decision point** - Determines what NEW subscriptions to create +3. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) +4. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch +5. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary + +--- + +## Data Model + +### RepoSyncIndex (Source of Truth) + +```rust +/// What we WANT to sync - derived from events received via self-subscription. +/// Updated immediately when self-subscriber batch fires. +/// Key: repo addressable ref - 30617:pubkey:identifier +pub type RepoSyncIndex = Arc>>; + +#[derive(Debug, Clone, Default)] +pub struct RepoSyncNeeds { + /// Relay URLs listed in this repo's 30617 announcement + pub relays: HashSet, + /// Root event IDs - 1617/1618/1619/1621 - that reference this repo + pub root_events: HashSet, +} +``` + +### RelaySyncIndex (Confirmed State + Connection) + +```rust +/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. +/// Key: relay URL +pub type RelaySyncIndex = Arc>>; + +/// Connection status for a relay +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionStatus { + /// Not currently connected + Disconnected, + /// Connection attempt in progress + Connecting, + /// Successfully connected and subscribed + Connected, +} + +/// Complete state for a single relay - combines sync needs with connection lifecycle +#[derive(Debug)] +pub struct RelayState { + /// Repos we have confirmed syncing from this relay + pub repos: HashSet, + /// Root events we have confirmed tracking + pub root_events: HashSet, + /// If true, never disconnect this relay + pub is_bootstrap: bool, + /// Current connection status + pub connection_status: ConnectionStatus, + /// When we last successfully connected - used for since filter on reconnect + pub last_connected: Option, + /// When we disconnected - for 15-minute state retention rule + pub disconnected_at: Option, + /// The active connection - None if disconnected + pub connection: Option, +} + +impl RelayState { + /// Check if state should be cleared based on 15-minute rule + pub fn should_clear_state(&self) -> bool { + match self.disconnected_at { + Some(disconnected) => { + let now = Timestamp::now(); + now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes + } + None => false, // Still connected or never connected + } + } + + /// Clear repos and root_events - called when reconnect takes > 15 minutes + pub fn clear_sync_state(&mut self) { + self.repos.clear(); + self.root_events.clear(); + } +} +``` + +### PendingSyncIndex (In-Flight Batches) + +```rust +/// Tracks batches of subscriptions that are in-flight, awaiting EOSE. +/// Each batch has its own ID and can confirm independently. +/// Key: relay URL +pub type PendingSyncIndex = Arc>>>; + +#[derive(Debug, Clone)] +pub struct PendingBatch { + /// Unique ID for this batch - for debugging/logging + pub batch_id: u64, + /// The items this batch is syncing + pub items: PendingItems, + /// Subscription IDs that must ALL receive EOSE before confirming + pub outstanding_subs: HashSet, +} + +#[derive(Debug, Clone, Default)] +pub struct PendingItems { + pub repos: HashSet, + pub root_events: HashSet, +} +``` + +--- + +## Connection Lifecycle State Machine + +```mermaid +stateDiagram-v2 + [*] --> Disconnected: discover relay via RepoSyncIndex + Disconnected --> Connecting: AddFilters triggers spawn_connection + Connecting --> Connected: success + Connecting --> Disconnected: failure + record in health tracker + Connected --> Disconnected: connection lost + Connected --> [*]: intentional disconnect via check_disconnects + + note right of Disconnected: disconnected_at set for 15min rule + note right of Connected: last_connected tracked for since filter +``` + +--- + +## Flow Scenarios + +### Scenario 1: Initial Connect via handle_connect_or_reconnect + +```mermaid +flowchart TB + START[Startup] --> SS[Self-subscribe to own relay] + SS --> |no since filter| EVENTS[Receive historical events] + EVENTS --> RSI[Update RepoSyncIndex] + RSI --> DT[derive_relay_targets] + DT --> CA[compute_actions with targets and empty confirmed] + CA --> AF[AddFilters for each relay] + AF --> SPAWN{Relay connected?} + SPAWN --> |no| CONN[spawn_connection] + CONN --> HC[handle_connect_or_reconnect] + SPAWN --> |yes| SUB + + subgraph handle_connect_or_reconnect - Fresh Sync + HC --> CHECK_FRESH{is_fresh_sync?} + CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] + L1 --> RCA[recompute_actions_for_relay] + end + + RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> CONFIRM[Move items to confirmed repos/root_events] +``` + +**Key points:** +- No `since` filter on initial connect - get full history +- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` +- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since +- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking + +### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes + +```mermaid +flowchart TB + DISC[Connection lost] --> MARK[Set disconnected_at = now] + MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] + CLEAR_PEND --> WAIT[Wait for reconnection] + WAIT --> RECONN[Connection restored] + RECONN --> HC[handle_connect_or_reconnect] + + subgraph handle_connect_or_reconnect - Quick Reconnect + HC --> CHECK{is_fresh_sync?} + CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] + SINCE --> L1[build_announcement_filter - with since] + L1 --> L23[rebuild_layer2_and_layer3 - with since] + L23 --> RCA[recompute_actions_for_relay] + end + + RCA --> AF[AddFilters for new items only] + AF --> SUB[Subscribe] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> EXTEND[Extend confirmed state] +``` + +**Key points:** +- PendingSyncIndex cleared on disconnect (not reconnect) +- `handle_connect_or_reconnect`: + 1. `build_announcement_filter(Some(since))` - Layer 1 with since + 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since + 3. `recompute_actions_for_relay` - check for new items +- since = last_connected - 15min ensures we catch events during disconnection + +### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes + +```mermaid +flowchart TB + RECONN[Connection restored] --> HC[handle_connect_or_reconnect] + + subgraph handle_connect_or_reconnect - Stale Reconnect + HC --> CHECK{is_fresh_sync?} + CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] + CLEAR --> L1[build_announcement_filter - no since] + L1 --> RCA[recompute_actions_for_relay] + end + + RCA --> CA[compute_actions with empty confirmed] + CA --> AF[AddFilters for everything] + AF --> SUB[Subscribe - no since filter] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> CONFIRM[Populate confirmed state fresh] +``` + +**Key points:** +- `should_clear_state()` returns true → triggers fresh sync +- Same path as initial connect after clearing state +- Layer 1: `build_announcement_filter(None)` - full history +- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything + +### Scenario 4: Consolidation - Triggered on Filter Add + +```mermaid +flowchart TB + AF[handle_add_filters called] --> COUNT{current + new > 70?} + COUNT --> |yes| CONSOLIDATE[consolidate] + CONSOLIDATE --> WAIT_PEND[wait_pending_complete] + WAIT_PEND --> CLOSE[unsubscribe_all] + CLOSE --> SINCE[since = now - 15min] + SINCE --> L1[build_announcement_filter - with since] + L1 --> L23[rebuild_layer2_and_layer3 - with since] + COUNT --> |no| SUB[Subscribe new filters] + SUB --> PB[Create PendingBatch] +``` + +**Key points:** +- Consolidation checked in `handle_add_filters` BEFORE adding new filters +- After closing all subscriptions, re-subscribe: + 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since + 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since +- `since = now - 15min` prevents re-fetching old events +- Keeps confirmed state, just reduces filter count + +### Scenario 5: Daily Timer - 23 to 25h Random + +```mermaid +flowchart TB + DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] + CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] + CLEAR_PEND --> CLEAR_STATE[clear_sync_state] + CLEAR_STATE --> L1[build_announcement_filter - no since] + L1 --> RCA[recompute_actions_for_relay] + RCA --> CA[compute_actions with empty confirmed] + CA --> AF[AddFilters for everything] + AF --> SUB[Subscribe - no since filter] + SUB --> PB[Create PendingBatch] + PB --> EOSE[Wait for EOSE] + EOSE --> CONFIRM[Repopulate confirmed state] +``` + +**Key points:** +- Daily timer is a full fresh sync, NOT consolidation +- Clears both PendingSyncIndex and confirmed state +- Layer 1: `build_announcement_filter(None)` - full history +- Layer 2+3: via compute_actions with empty confirmed - full history +- Detects any state drift accumulated over 24 hours + +--- + +## Core Algorithms + +### 1. derive_relay_targets + +Transform RepoSyncIndex into per-relay sync targets: + +```rust +/// Inverts RepoSyncIndex to get per-relay view +fn derive_relay_targets( + repo_index: &HashMap +) -> HashMap { + let mut targets: HashMap = HashMap::new(); + + for (repo_ref, needs) in repo_index { + for relay_url in &needs.relays { + let target = targets.entry(relay_url.clone()).or_default(); + target.repos.insert(repo_ref.clone()); + target.root_events.extend(needs.root_events.iter().cloned()); + } + } + + targets +} +``` + +### 2. compute_actions (Three-Way Diff) + +**This is the ONLY decision point for what NEW subscriptions to create.** + +```rust +/// Computes AddFilters for items that are: +/// - In targets (what we want) +/// - NOT in pending (already in-flight) +/// - NOT in confirmed (already confirmed) +fn compute_actions( + targets: &HashMap, + pending: &HashMap>, + confirmed: &HashMap, +) -> Vec { + let mut actions = Vec::new(); + + for (relay_url, target) in targets { + // Skip disconnected relays - they will get AddFilters on reconnect + if let Some(state) = confirmed.get(relay_url) { + if state.connection_status != ConnectionStatus::Connected { + continue; + } + } + + // Collect all pending items for this relay + let pending_repos: HashSet<_> = pending.get(relay_url) + .map(|batches| batches.iter() + .flat_map(|b| b.items.repos.iter().cloned()) + .collect()) + .unwrap_or_default(); + let pending_events: HashSet<_> = pending.get(relay_url) + .map(|batches| batches.iter() + .flat_map(|b| b.items.root_events.iter().cloned()) + .collect()) + .unwrap_or_default(); + + // Collect confirmed items for this relay + let confirmed_repos = confirmed.get(relay_url) + .map(|c| &c.repos) + .unwrap_or(&HashSet::new()); + let confirmed_events = confirmed.get(relay_url) + .map(|c| &c.root_events) + .unwrap_or(&HashSet::new()); + + // New = target - pending - confirmed + let new_repos: HashSet<_> = target.repos.iter() + .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) + .cloned() + .collect(); + let new_events: HashSet<_> = target.root_events.iter() + .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) + .cloned() + .collect(); + + if !new_repos.is_empty() || !new_events.is_empty() { + let filters = build_filters(&new_repos, &new_events); + actions.push(AddFilters { + relay_url: relay_url.clone(), + repos: new_repos, + root_events: new_events, + filters, + }); + } + } + + actions +} +``` + +### 3. Filter Building Functions (Three-Layer Strategy) + +The filter strategy uses three layers: +- **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation +- **Layer 2**: Events tagging our repos +- **Layer 3**: Events tagging our root events + +**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). + +```rust +/// Layer 1: Announcements filter (kinds 30617 + 30618) +/// Subscribed ONCE on connect - NOT included in consolidation rebuilds. +/// Note: 30618 is ONLY synced from remote relays, not self-subscribed. +fn build_announcement_filter(since: Option) -> Filter { + let filter = Filter::new().kinds([ + Kind::Custom(30617), // Repository announcements + Kind::Custom(30618), // Maintainer lists + ]); + + match since { + Some(ts) => filter.since(ts), + None => filter, + } +} + +/// Layer 2: Events tagging one of our repos +/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. +/// Batched per 100 repo refs. +fn tagged_one_of_our_repo_event_filters( + repos: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + let repo_refs: Vec<_> = repos.iter().collect(); + + for chunk in repo_refs.chunks(100) { + let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); + + // Lowercase 'a' tag - standard addressable reference + let mut f1 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); + // Uppercase 'A' tag - some clients use this + let mut f2 = Filter::new() + .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone()); + // Quote 'q' tag - NIP-10 quote references to addressable events + let mut f3 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); + + if let Some(ts) = since { + f1 = f1.since(ts); + f2 = f2.since(ts); + f3 = f3.since(ts); + } + + filters.push(f1); + filters.push(f2); + filters.push(f3); + } + + filters +} + +/// Layer 3: Events tagging one of our root events +/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage. +/// Batched per 100 event IDs. +fn tagged_one_of_our_root_event_filters( + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + let event_ids: Vec = root_events.iter().map(|id| id.to_hex()).collect(); + + for chunk in event_ids.chunks(100) { + let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); + + // Lowercase 'e' tag - standard event reference + let mut f1 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); + // Uppercase 'E' tag - some clients use this + let mut f2 = Filter::new() + .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone()); + // Quote 'q' tag - NIP-10 quote references to events + let mut f3 = Filter::new() + .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); + + if let Some(ts) = since { + f1 = f1.since(ts); + f2 = f2.since(ts); + f3 = f3.since(ts); + } + + filters.push(f1); + filters.push(f2); + filters.push(f3); + } + + filters +} + +/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1) +/// Used by: +/// - compute_actions for incremental subscriptions +/// - consolidation rebuilds (Layer 1 remains active) +fn build_layer2_and_layer3_filters( + repos: &HashSet, + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); + filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); + filters +} +``` + +**Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently. + +### 4. handle_add_filters (SyncManager) + +```rust +impl SyncManager { + async fn handle_add_filters(&mut self, action: AddFilters) { + let AddFilters { relay_url, repos, root_events, filters } = action; + + // Auto-spawn connection if needed + let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); + match state { + None => { + // New relay discovered - create entry and spawn connection + self.relay_sync_index.write().await.insert( + relay_url.clone(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connecting, + last_connected: None, + disconnected_at: None, + connection: None, + } + ); + self.spawn_connection(&relay_url).await; + return; // Subscriptions will happen on connection success + } + Some(state) if state.connection_status != ConnectionStatus::Connected => { + // Not connected - subscriptions will happen on connection success + return; + } + Some(_) => { + // Already connected - proceed with subscription + } + } + + // Subscribe and collect subscription IDs + let conn = self.connections.get(&relay_url).unwrap(); + let mut sub_ids = HashSet::new(); + + for filter in filters { + match conn.client.subscribe(filter, None).await { + Ok(output) => { + for sub_id in output.val { + sub_ids.insert(sub_id); + } + } + Err(e) => { + tracing::warn!("Failed to subscribe: {}", e); + } + } + } + + // Create pending batch + let batch = PendingBatch { + batch_id: self.next_batch_id(), + items: PendingItems { repos, root_events }, + outstanding_subs: sub_ids, + }; + + // Add to pending index + self.pending_sync_index.write().await + .entry(relay_url) + .or_default() + .push(batch); + } +} +``` + +### 5. handle_disconnect + +```rust +impl SyncManager { + /// Called when connection to a relay is lost + async fn handle_disconnect(&mut self, relay_url: &str) { + let mut index = self.relay_sync_index.write().await; + + if let Some(state) = index.get_mut(relay_url) { + state.connection_status = ConnectionStatus::Disconnected; + state.disconnected_at = Some(Timestamp::now()); + state.connection = None; + } + + // Clear pending batches - these items were not confirmed + self.pending_sync_index.write().await.remove(relay_url); + + // Remove from active connections map + self.connections.remove(relay_url); + + // Health tracker records failure for backoff + self.health_tracker.record_failure(relay_url); + } +} +``` + +### 6. handle_connect_or_reconnect (Unified) + +This method handles BOTH initial connection AND reconnection with unified logic: + +```rust +impl SyncManager { + /// Called when connection to a relay succeeds - handles both initial connect and reconnect. + /// + /// Decision tree: + /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history + /// - Quick reconnect (<15min): since = last_connected - 15min + async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { + let mut index = self.relay_sync_index.write().await; + let state = match index.get_mut(relay_url) { + Some(s) => s, + None => return, // Relay was removed while disconnected + }; + + // Determine if this is a fresh sync or quick reconnect + let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); + let last_connected = state.last_connected; + + if is_fresh_sync && state.last_connected.is_some() { + // Stale reconnect (>15min) - clear state + tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); + state.clear_sync_state(); + } + + // Update connection state + state.connection_status = ConnectionStatus::Connected; + state.last_connected = Some(Timestamp::now()); + state.disconnected_at = None; + + // Record success in health tracker + self.health_tracker.record_success(relay_url); + + drop(index); // Release lock + + let conn = match self.connections.get(relay_url) { + Some(c) => c, + None => return, + }; + + if is_fresh_sync { + // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions + + // Step 1: Subscribe Layer 1 (announcements) without since + let layer1 = build_announcement_filter(None); + let _ = conn.client.subscribe(layer1, None).await; + + // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) + self.recompute_actions_for_relay(relay_url).await; + } else { + // Quick reconnect: Layer 1 with since, Layer 2+3 with since + let since = last_connected + .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) + .unwrap_or(Timestamp::from(0)); + + // Step 1: Subscribe Layer 1 (announcements) with since + let layer1 = build_announcement_filter(Some(since)); + let _ = conn.client.subscribe(layer1, None).await; + + // Step 2: Rebuild Layer 2+3 for confirmed items with since + self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; + + // Step 3: Check for NEW items via compute_actions + self.recompute_actions_for_relay(relay_url).await; + } + } + + /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). + /// Used by: + /// - Quick reconnect: rebuild confirmed items with since filter + /// - Consolidation: close and rebuild with since filter + async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option) { + let confirmed = self.relay_sync_index.read().await; + let state = match confirmed.get(relay_url) { + Some(s) => s, + None => return, + }; + + // Build Layer 2+3 filters WITH since + let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); + drop(confirmed); + + // Subscribe directly - no PendingBatch for catch-up (items already confirmed) + let conn = match self.connections.get(relay_url) { + Some(c) => c, + None => return, + }; + + for filter in filters { + let _ = conn.client.subscribe(filter, None).await; + } + } + + /// Rerun compute_actions for a specific relay and process resulting AddFilters. + /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. + async fn recompute_actions_for_relay(&mut self, relay_url: &str) { + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + drop(repo_index); + + // Filter to just this relay + let target = match targets.get(relay_url) { + Some(t) => t.clone(), + None => return, // No repos reference this relay anymore + }; + + let pending = self.pending_sync_index.read().await; + let confirmed = self.relay_sync_index.read().await; + + let mut single_relay_targets = HashMap::new(); + single_relay_targets.insert(relay_url.to_string(), target); + + let actions = compute_actions(&single_relay_targets, &pending, &confirmed); + + drop(pending); + drop(confirmed); + + // Process AddFilters + for action in actions { + self.handle_add_filters(action).await; + } + } +} +``` + +### 7. Daily Timer + +```rust +impl SyncManager { + async fn run_daily_timer(&self) { + loop { + // Random 23-25 hours + let hours = 23.0 + rand::random::() * 2.0; + tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; + + let relay_urls: Vec<_> = self.relay_sync_index.read().await + .keys() + .cloned() + .collect(); + + for relay_url in relay_urls { + self.daily_sync(&relay_url).await; + } + } + } + + /// Perform daily fresh sync for a relay + async fn daily_sync(&mut self, relay_url: &str) { + tracing::info!("Daily sync triggered for {}", relay_url); + + // Close all subscriptions + if let Some(conn) = self.connections.get(relay_url) { + conn.client.unsubscribe_all().await; + } + + // Clear PendingSyncIndex + self.pending_sync_index.write().await.remove(relay_url); + + // Clear confirmed state - triggers fresh sync + { + let mut index = self.relay_sync_index.write().await; + if let Some(state) = index.get_mut(relay_url) { + state.clear_sync_state(); + } + } + + // Recompute actions - will generate AddFilters for everything + self.recompute_actions_for_relay(relay_url).await; + } +} +``` + +### 8. Consolidation (Threshold-Based, Triggered on Add) + +Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active. + +```rust +impl SyncManager { + /// Check filter count and consolidate if needed. + /// Called from handle_add_filters BEFORE adding new filters. + async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { + let current_count = self.get_filter_count(relay_url).await; + + if current_count + new_filter_count > 70 { + self.consolidate(relay_url).await; + } + } + + /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. + /// Does NOT clear state - just reduces filter count. + async fn consolidate(&mut self, relay_url: &str) { + tracing::info!("Consolidating filters for {} (count > 70)", relay_url); + + // Wait for all pending batches to complete first + self.wait_pending_complete(relay_url).await; + + // Close Layer 2+3 subscriptions only - Layer 1 remains active + // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately + // For simplicity, we close all and re-add Layer 1 + if let Some(conn) = self.connections.get(relay_url) { + conn.client.unsubscribe_all().await; + } + + // Re-subscribe Layer 1 with since (maintains announcements stream) + let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); + let conn = self.connections.get(relay_url).unwrap(); + let layer1 = build_announcement_filter(Some(since)); + let _ = conn.client.subscribe(layer1, None).await; + + // Rebuild Layer 2+3 only + self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; + } +} +``` + +**Updated handle_add_filters to check consolidation:** + +```rust +impl SyncManager { + async fn handle_add_filters(&mut self, action: AddFilters) { + let AddFilters { relay_url, repos, root_events, filters } = action; + + // Auto-spawn connection if needed (unchanged) + let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); + match state { + None => { + // New relay discovered - create entry and spawn connection + self.relay_sync_index.write().await.insert( + relay_url.clone(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connecting, + last_connected: None, + disconnected_at: None, + connection: None, + } + ); + self.spawn_connection(&relay_url).await; + return; // Subscriptions will happen on connection success + } + Some(state) if state.connection_status != ConnectionStatus::Connected => { + return; // Not connected - subscriptions will happen on connection success + } + Some(_) => { + // Already connected - proceed + } + } + + // CHECK CONSOLIDATION BEFORE ADDING + self.maybe_consolidate(&relay_url, filters.len()).await; + + // Subscribe and collect subscription IDs + let conn = self.connections.get(&relay_url).unwrap(); + let mut sub_ids = HashSet::new(); + + for filter in filters { + match conn.client.subscribe(filter, None).await { + Ok(output) => { + for sub_id in output.val { + sub_ids.insert(sub_id); + } + } + Err(e) => { + tracing::warn!("Failed to subscribe: {}", e); + } + } + } + + // Create pending batch (unchanged) + let batch = PendingBatch { + batch_id: self.next_batch_id(), + items: PendingItems { repos, root_events }, + outstanding_subs: sub_ids, + }; + + self.pending_sync_index.write().await + .entry(relay_url) + .or_default() + .push(batch); + } +} +``` + +--- + +## Disconnect (Relay Removal) Handling + +```rust +impl SyncManager { + /// Periodically check for relays that should be disconnected + async fn check_disconnects(&mut self) { + let confirmed = self.relay_sync_index.read().await; + let relays_to_disconnect: Vec<_> = confirmed.iter() + .filter(|(_, state)| { + !state.is_bootstrap && + state.repos.is_empty() && + state.root_events.is_empty() + }) + .map(|(url, _)| url.clone()) + .collect(); + drop(confirmed); + + for relay_url in relays_to_disconnect { + self.disconnect_relay(&relay_url).await; + } + } + + async fn disconnect_relay(&mut self, relay_url: &str) { + tracing::info!("Disconnecting relay {} (no repos)", relay_url); + + self.relay_sync_index.write().await.remove(relay_url); + self.pending_sync_index.write().await.remove(relay_url); + + if let Some(conn) = self.connections.remove(relay_url) { + let _ = conn.client.disconnect().await; + } + } +} +``` + +--- + +## State Flow Summary + +```mermaid +flowchart TB + subgraph Input + SS[SelfSubscriber] + OWN[Own Relay] + end + + subgraph RepoSyncIndex - What We Want + RSI[HashMap: Repo to Relays+Events] + end + + subgraph Derived Target + DT[derive_relay_targets fn] + TGT[Per-relay: repos + events we should sync] + end + + subgraph compute_actions - Decision Point + CA[Three-way diff: target - pending - confirmed] + end + + subgraph PendingSyncIndex - In Flight + PSI[Vec PendingBatch per relay] + end + + subgraph RelaySyncIndex - Confirmed State + RLI[RelayState per relay] + CONN[connection_status] + REPOS[repos + root_events] + TIMES[last_connected + disconnected_at] + end + + SS -->|subscribe| OWN + OWN -->|events| SS + SS -->|batch fires| RSI + RSI --> DT + DT --> TGT + TGT --> CA + PSI --> CA + RLI --> CA + CA -->|Layer 2+3 new items| AF[AddFilters] + AF -->|check filter count| CONSOL{count + new > 70?} + CONSOL -->|yes| CONSOLIDATE[consolidate] + CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] + L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] + CONSOL -->|no| SUB[subscribe] + AF -->|spawn if needed| CONN + SUB --> PSI + PSI -->|EOSE| REPOS + + CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] + DISC -->|any reconnect| HC[handle_connect_or_reconnect] + + subgraph handle_connect_or_reconnect + HC --> FRESH_CHECK{is_fresh_sync?} + FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] + FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since] + L1_FRESH --> RCA1[recompute_actions_for_relay] + L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since] + L23_QUICK --> RCA2[recompute_actions_for_relay] + end +``` + +--- + +## Key Design Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | +| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | +| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | +| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | +| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | +| Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | +| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | +| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | +| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | +| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | +| Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | +| Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | +| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | +| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | +| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | +| Connection spawning | Via AddFilters handler | Single path for new relay discovery | +| Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | + +--- + +## Module Structure + +``` +src/sync/ +├── mod.rs # SyncManager, main loop +├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types +├── actions.rs # AddFilters struct, compute_actions, build_filters +├── self_subscriber.rs # SelfSubscriber, batching logic +├── relay_connection.rs # Per-relay WebSocket connection +├── consolidation.rs # Consolidation logic, daily timer +├── health.rs # Health tracking (reuse from v2) +└── metrics.rs # Prometheus metrics (reuse from v2) +``` + +--- + +## Comparison: v3 vs v4 + +| Aspect | v3 | v4 | +|--------|----|----| +| Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | +| Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | +| Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | +| Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | +| Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | +| Since filter application | Applied in handle_reconnect | build_all_filters with optional since | +| PSI clearing | On disconnect | On disconnect (confirmed) | +| Daily timer | Consolidation-style | Fresh sync (different from consolidation) | + +--- + +## Self-Subscriber Flow + +The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions. + +### State Tracking + +```rust +pub struct SelfSubscriber { + own_relay_url: String, + relay_domain: String, + repo_sync_index: RepoSyncIndex, + pending_sync_index: PendingSyncIndex, + relay_sync_index: RelaySyncIndex, + action_tx: mpsc::Sender, + /// Timestamp of last successful connection - used for since filter on reconnection + last_connected: Option, + /// The active client connection + client: Option, +} +``` + +### On Startup / Reconnect (Unified) + +Both initial startup and reconnection use the same `connect_and_subscribe` method: + +```rust +impl SelfSubscriber { + async fn run(mut self) { + loop { + // Connect or reconnect + if let Err(e) = self.connect_and_subscribe().await { + tracing::warn!("Connection failed: {}, will retry", e); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + + // Run event loop until disconnection + self.event_loop().await; + + // Loop will retry connection + } + } + + async fn connect_and_subscribe(&mut self) -> Result<(), Error> { + let client = Client::new(Keys::generate()); + client.add_relay(&self.own_relay_url).await?; + client.connect().await; + + // Build filter - add since only on reconnect + let filter = Filter::new().kinds([ + Kind::Custom(30617), // Repository announcements + Kind::GitPatch, // 1617 + Kind::Custom(1618), // PRs + Kind::Custom(1619), // PR updates + Kind::GitIssue, // 1621 + ]); + + let filter = if let Some(ts) = self.last_connected { + // Reconnection: use since filter + let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer + filter.since(since) + } else { + // Initial connect: no since filter - get full history + filter + }; + + // Update last_connected AFTER computing since + self.last_connected = Some(Timestamp::now()); + + client.subscribe(filter, None).await?; + self.client = Some(client); + Ok(()) + } +} +``` + +### Event Loop with Batching + +```rust +impl SelfSubscriber { + async fn event_loop(&mut self) { + let client = self.client.as_ref().unwrap(); + let mut pending_events: Vec = Vec::new(); + let mut batch_timer: Option = None; + let batch_window = Duration::from_secs(5); + + loop { + let timeout = batch_timer + .map(|t| batch_window.saturating_sub(t.elapsed())) + .unwrap_or(Duration::from_secs(60)); + + tokio::select! { + notification = client.notifications().recv() => { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + pending_events.push(*event); + + // Start timer on first event - does NOT reset + if batch_timer.is_none() { + batch_timer = Some(Instant::now()); + } + } + Ok(RelayPoolNotification::Shutdown) => { + // Connection lost + break; + } + _ => {} + } + } + _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { + // Batch window elapsed + self.process_batch(pending_events.drain(..).collect()).await; + batch_timer = None; + } + } + } + } + + async fn process_batch(&self, events: Vec) { + // 1. Update RepoSyncIndex + for event in events { + match event.kind.as_u16() { + 30617 => self.handle_announcement(&event).await, + 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, + _ => {} + } + } + + // 2. Derive targets and compute actions + let repo_index = self.repo_sync_index.read().await; + let targets = derive_relay_targets(&repo_index); + + let pending = self.pending_sync_index.read().await; + let confirmed = self.relay_sync_index.read().await; + + let actions = compute_actions(&targets, &pending, &confirmed); + + drop(repo_index); + drop(pending); + drop(confirmed); + + // 3. Send actions to SyncManager + for action in actions { + let _ = self.action_tx.send(action).await; + } + } + + async fn handle_announcement(&self, event: &Event) { + // Extract repo_ref from event - 30617:pubkey:identifier + let d_tag = event.tags.iter() + .find_map(|tag| { + if tag.kind() == TagKind::D { + tag.content().map(|s| s.to_string()) + } else { + None + } + }) + .unwrap_or_default(); + + let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); + + // Extract relay URLs from 'r' tags + let relays: HashSet = event.tags.iter() + .filter_map(|tag| { + if tag.kind() == TagKind::Relay { + tag.content().map(|s| s.to_string()) + } else { + None + } + }) + .collect(); + + // Update RepoSyncIndex + let mut index = self.repo_sync_index.write().await; + let needs = index.entry(repo_ref).or_default(); + needs.relays = relays; + } + + async fn handle_root_event(&self, event: &Event) { + // Extract repo_ref from 'a' tag + let repo_ref = event.tags.iter() + .find_map(|tag| { + if tag.kind() == TagKind::A { + tag.content().map(|s| s.to_string()) + } else { + None + } + }); + + if let Some(repo_ref) = repo_ref { + let mut index = self.repo_sync_index.write().await; + let needs = index.entry(repo_ref).or_default(); + needs.root_events.insert(event.id); + } + } +} +``` \ No newline at end of file diff --git a/docs/explanation/state-structure-redesign-proposal.md b/docs/explanation/state-structure-redesign-proposal.md new file mode 100644 index 0000000..0a27cf4 --- /dev/null +++ b/docs/explanation/state-structure-redesign-proposal.md @@ -0,0 +1,373 @@ +# State Structure Redesign Proposal v2 + +## The Core Problem + +We need to transform: +- **Repo Announcements** (30617) that list relays +- **Root Events** (1617/1618/1619/1621) that tag repos + +Into: +- **Per-relay subscriptions**: which repos and root events to sync from each relay + +And generate **RelayActions** when this mapping changes. + +--- + +## Proposed Data Model + +### 1. RepoIndex (Primary Source of Truth) + +```rust +/// Everything we know about repos we're tracking +/// Key: repo addressable ref ("30617:pubkey:identifier") +pub type RepoIndex = Arc>>; + +#[derive(Debug, Clone, Default)] +pub struct RepoInfo { + /// Relay URLs listed in the repo's announcement + pub relays: HashSet, + /// Root event IDs that reference this repo + pub root_events: HashSet, +} +``` + +**Updated by:** Database init, batch processing of new announcements/root events + +### 2. RelayIndex (Applied State) + +```rust +/// What we've told each relay to sync +/// Key: relay URL +pub type RelayIndex = Arc>>; + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct SyncTarget { + /// Repos we're syncing for this relay + pub repos: HashSet, + /// Root events we're tracking + pub root_events: HashSet, +} +``` + +**Updated by:** SyncManager after RelayActions are applied + +--- + +## The Transformation + +```mermaid +flowchart LR + subgraph Input + RA[Repo Announcements] + RE[Root Events] + end + + subgraph RepoIndex + R1[repo_a: relays=X,Y events=1,2] + R2[repo_b: relays=Y,Z events=3] + end + + subgraph Derived Target + T1[relay_X: repos=a events=1,2] + T2[relay_Y: repos=a,b events=1,2,3] + T3[relay_Z: repos=b events=3] + end + + subgraph RelayIndex Applied + A1[relay_X: repos=a events=1,2] + A2[relay_Y: repos=a events=1,2] + end + + RA --> R1 + RA --> R2 + RE --> R1 + RE --> R2 + + R1 --> T1 + R1 --> T2 + R2 --> T2 + R2 --> T3 +``` + +The **diff** between Derived Target and RelayIndex produces RelayActions: +- relay_Y needs AddFilters for repo_b and event 3 +- relay_Z needs SpawnRelay + +--- + +## Algorithm: derive_target_from_repo_index + +```rust +/// Derive what we SHOULD be syncing from the repo data +fn derive_relay_targets(repo_index: &HashMap) -> HashMap { + let mut targets: HashMap = HashMap::new(); + + for (repo_ref, info) in repo_index { + // For each relay that lists this repo + for relay_url in &info.relays { + let target = targets.entry(relay_url.clone()).or_default(); + target.repos.insert(repo_ref.clone()); + target.root_events.extend(info.root_events.iter().cloned()); + } + } + + targets +} +``` + +--- + +## Algorithm: process_batch + +```rust +async fn process_batch(&self, pending: &mut PendingUpdates) { + // ============================================ + // STEP 1: Update RepoIndex from batch + // ============================================ + + let mut repo_index = self.repo_index.write().await; + + // 1a. Process root events - add to repo's root_events set + for event in pending.root_events.drain(..) { + for repo_ref in extract_repo_refs(&event) { + repo_index.entry(repo_ref) + .or_default() + .root_events + .insert(event.id); + } + } + + // 1b. Process announcements - update repo's relay set + for event in pending.announcements.drain(..) { + if !lists_our_service(&event) { + continue; + } + let repo_ref = build_repo_ref(&event); + let relay_urls: HashSet = extract_relay_urls(&event) + .into_iter() + .filter(|url| !is_own_relay(url)) + .collect(); + + // Replace relay set (handles updates that change relays) + repo_index.entry(repo_ref) + .or_default() + .relays = relay_urls; + } + + // ============================================ + // STEP 2: Derive target state from RepoIndex + // ============================================ + + let target = derive_relay_targets(&repo_index); + drop(repo_index); // Release write lock + + // ============================================ + // STEP 3: Diff target vs applied (RelayIndex) + // ============================================ + + let applied = self.relay_index.read().await; + let actions = compute_relay_actions(&target, &applied); + drop(applied); // Release read lock + + // ============================================ + // STEP 4: Send actions & update RelayIndex + // ============================================ + + for action in actions { + match &action { + RelayAction::SpawnRelay { relay_url, repos_and_root_events } => { + // Update RelayIndex with new relay + let mut applied = self.relay_index.write().await; + applied.insert(relay_url.clone(), SyncTarget { + repos: repos_and_root_events.keys().cloned().collect(), + root_events: repos_and_root_events.values() + .flat_map(|e| e.iter().cloned()) + .collect(), + }); + } + RelayAction::AddFilters { relay_url, repos_and_new_root_event } => { + // Update RelayIndex with additions + let mut applied = self.relay_index.write().await; + if let Some(target) = applied.get_mut(relay_url) { + for (repo, events) in repos_and_new_root_event { + target.repos.insert(repo.clone()); + target.root_events.extend(events.iter().cloned()); + } + } + } + } + + // Send action to SyncManager + let _ = self.action_tx.send(action).await; + } +} +``` + +--- + +## Algorithm: compute_relay_actions + +```rust +fn compute_relay_actions( + target: &HashMap, + applied: &HashMap, +) -> Vec { + let mut actions = Vec::new(); + + for (relay_url, target_state) in target { + match applied.get(relay_url) { + None => { + // New relay - spawn it + let mut repos_and_events = HashMap::new(); + for repo in &target_state.repos { + // Get events for this specific repo + let events = target_state.root_events.clone(); // simplified + repos_and_events.insert(repo.clone(), events); + } + actions.push(RelayAction::SpawnRelay { + relay_url: relay_url.clone(), + repos_and_root_events: repos_and_events, + }); + } + Some(applied_state) => { + // Existing relay - check for new repos/events + let new_repos: HashSet<_> = target_state.repos + .difference(&applied_state.repos) + .cloned() + .collect(); + let new_events: HashSet<_> = target_state.root_events + .difference(&applied_state.root_events) + .cloned() + .collect(); + + if !new_repos.is_empty() || !new_events.is_empty() { + let mut repos_and_events = HashMap::new(); + for repo in &new_repos { + repos_and_events.insert(repo.clone(), new_events.clone()); + } + // Also handle new events for existing repos + if !new_events.is_empty() && new_repos.is_empty() { + for repo in &applied_state.repos { + repos_and_events.insert(repo.clone(), new_events.clone()); + } + } + + actions.push(RelayAction::AddFilters { + relay_url: relay_url.clone(), + repos_and_new_root_event: repos_and_events, + }); + } + } + } + } + + // Future: detect relay removal (in applied but not in target) + + actions +} +``` + +--- + +## Handling Announcement Updates + +When an announcement is **updated** and changes its relay list: + +```mermaid +flowchart TD + A[repo_a announcement updated] --> B[Old: relays X,Y] + B --> C[New: relays Y,Z] + C --> D[RepoIndex updated: repo_a.relays = Y,Z] + D --> E[derive_relay_targets] + E --> F[Target: X=empty, Y=repo_a, Z=repo_a] + F --> G[Diff with Applied: X=repo_a, Y=repo_a] + G --> H1[X: repo_a removed - future RemoveFilters] + G --> H2[Z: new relay - SpawnRelay] +``` + +The current RelayAction types only support growth (SpawnRelay, AddFilters). Removal would need a new `RemoveFilters` action type - this is a future enhancement. + +--- + +## Name Mappings + +| Current | Proposed | Semantics | +|---------|----------|-----------| +| `FollowingRepoRootEvents` | `RepoIndex` | Per-repo: relays + root events | +| `SyncRelays` | `RelayIndex` | Per-relay: what we're syncing (applied state) | +| - | `SyncTarget` | Struct for repos + events | +| - | `RepoInfo` | Struct for relay set + event set | + +--- + +## Data Flow Summary + +```mermaid +flowchart TB + subgraph Batch Input + RA[30617 Announcements] + RE[Root Events 1617-1621] + end + + subgraph Step 1: Update Source + RI[RepoIndex] + end + + subgraph Step 2: Derive Target + DT[derive_relay_targets] + TGT[Target HashMap] + end + + subgraph Step 3: Diff + RLI[RelayIndex - Applied] + DIFF[compute_relay_actions] + end + + subgraph Step 4: Apply + ACT[RelayActions] + SM[SyncManager] + end + + RA --> RI + RE --> RI + RI --> DT + DT --> TGT + TGT --> DIFF + RLI --> DIFF + DIFF --> ACT + ACT --> SM + ACT --> |update| RLI +``` + +--- + +## Files to Modify + +| File | Changes | +|------|---------| +| [`src/sync/mod.rs`](src/sync/mod.rs) | Replace type aliases with RepoIndex/RelayIndex + structs | +| [`src/sync/self_subscriber.rs`](src/sync/self_subscriber.rs) | Rewrite process_batch with new algorithm | + +--- + +## Questions for Approval + +1. **Naming**: Are `RepoIndex`/`RelayIndex` and `RepoInfo`/`SyncTarget` clear enough? + +2. **When to update RelayIndex**: Should we: + - (a) Update immediately when generating action (optimistic) ← proposed above + - (b) Update only after SyncManager confirms action succeeded + +3. **Bootstrap relay**: Keep special-casing it in RelayIndex (always present)? + +4. **Future work**: Add `RemoveFilters` action for relay removal, or defer? + +--- + +## Benefits + +1. **Logical flow**: Source → Derived → Diff → Actions +2. **Single source of truth**: RepoIndex is the authoritative data +3. **Clear transformation**: `derive_relay_targets()` is a pure function +4. **Handles updates**: Replacing `repo.relays` naturally handles announcement changes +5. **Testable**: Each step can be unit tested independently \ No newline at end of file -- cgit v1.2.3