From 5f9d3ca0db4ffc9088be0b9ac9558efe1a4da810 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 12:30:19 +0000 Subject: sync: complete AddFilters handler with auto-spawning --- src/sync/mod.rs | 493 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 275 insertions(+), 218 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8e0b8e1..f704ca7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -375,6 +375,12 @@ pub struct SyncManager { health_tracker: Arc, /// Counter for generating unique batch IDs next_batch_id: u64, + /// Channel for disconnect notifications (set during run) + disconnect_tx: Option>, + /// Channel for EOSE notifications (set during run) + eose_tx: Option>, + /// Channel for connect notifications (set during run) + connect_tx: Option>, } impl SyncManager { @@ -405,6 +411,9 @@ impl SyncManager { connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, + disconnect_tx: None, + eose_tx: None, + connect_tx: None, } } @@ -548,15 +557,14 @@ impl SyncManager { ); tokio::spawn(async move { self_subscriber.run().await }); + // 5b. Store channel senders for use by handlers + self.disconnect_tx = Some(disconnect_tx.clone()); + self.eose_tx = Some(eose_tx.clone()); + self.connect_tx = Some(connect_tx.clone()); + // 6. Connect to bootstrap relay if configured if let Some(ref bootstrap_url) = self.bootstrap_relay_url { - self.spawn_relay_connection( - bootstrap_url.clone(), - disconnect_tx.clone(), - eose_tx.clone(), - connect_tx.clone(), - ) - .await; + self.spawn_relay_connection(bootstrap_url.clone()).await; } // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications @@ -565,37 +573,46 @@ impl SyncManager { action = action_rx.recv() => { match action { Some(RelayAction::SpawnRelay { relay_url, repos }) => { - // Check if relay already exists - let relay_index = self.relay_sync_index.read().await; - let exists = relay_index.contains_key(&relay_url); - drop(relay_index); - - if !exists { - tracing::info!(relay = %relay_url, "Spawning new relay connection"); - self.spawn_relay_with_layer2( - relay_url, - repos, - disconnect_tx.clone(), - eose_tx.clone(), - connect_tx.clone(), - ).await; - } else { - tracing::debug!( - relay = %relay_url, - "Relay already exists, considering AddFilters" - ); - // For MVP, we don't handle AddFilters - just log - // Full implementation would call subscribe_filters on existing connection - } + // Convert to AddFilters format and use unified handler + let root_events: HashSet = repos.values().flatten().cloned().collect(); + let repo_ids: HashSet = repos.keys().cloned().collect(); + + // Build filters for these repos + let filters = crate::sync::filters::build_layer2_and_layer3_filters( + &repo_ids, + &root_events, + None, + ); + + let action = AddFilters { + relay_url, + repos: repo_ids, + root_events, + filters, + }; + + self.handle_add_filters(action).await; } Some(RelayAction::AddFilters { relay_url, repos }) => { - tracing::debug!( - relay = %relay_url, - repo_count = repos.len(), - "AddFilters action (MVP: not implemented)" + // Convert to AddFilters format and use unified handler + let root_events: HashSet = repos.values().flatten().cloned().collect(); + let repo_ids: HashSet = repos.keys().cloned().collect(); + + // Build filters for these repos + let filters = crate::sync::filters::build_layer2_and_layer3_filters( + &repo_ids, + &root_events, + None, ); - // For MVP, not implemented - full version would add Layer 2 filters - // to existing relay connection + + let action = AddFilters { + relay_url, + repos: repo_ids, + root_events, + filters, + }; + + self.handle_add_filters(action).await; } None => break, } @@ -637,6 +654,130 @@ impl SyncManager { } } + /// Handle AddFilters action - subscribe to filters on a relay + /// + /// This method handles all filter additions: + /// - For new relays: creates entry with Connecting status, spawns connection + /// - For existing connected relays: subscribes to filters, creates PendingBatch + /// - For disconnected/connecting relays: returns (will be handled on connection) + async fn handle_add_filters(&mut self, action: AddFilters) { + // Step 1: Check if relay exists in relay_sync_index + let connection_status = { + let index = self.relay_sync_index.read().await; + index.get(&action.relay_url).map(|s| s.connection_status) + }; + + match connection_status { + None => { + // New relay - create entry with Connecting status + { + let mut index = self.relay_sync_index.write().await; + let new_state = RelayState { + connection_status: ConnectionStatus::Connecting, + is_bootstrap: false, // Only bootstrap relays set this to true + last_connected: None, + disconnected_at: None, + repos: HashSet::new(), + root_events: HashSet::new(), + }; + index.insert(action.relay_url.clone(), new_state); + } + + tracing::info!( + relay = %action.relay_url, + repos = action.repos.len(), + "Spawning connection for new relay" + ); + + // Spawn connection for new relay + self.spawn_relay_connection(action.relay_url.clone()).await; + // Connection will trigger handle_connect_or_reconnect which will process items + return; + } + Some(ConnectionStatus::Disconnected) | Some(ConnectionStatus::Connecting) => { + // Will be handled when connection succeeds + tracing::debug!( + relay = %action.relay_url, + status = ?connection_status, + "Relay not connected, action will be processed on connection" + ); + return; + } + Some(ConnectionStatus::Connected) => { + // Continue to subscribe + } + } + + // Step 2: Check if consolidation is needed (Phase 6 will implement maybe_consolidate) + // self.maybe_consolidate(&action.relay_url, action.filters.len()); + + // Step 3: Get connection and subscribe to all filters + let connection = match self.connections.get(&action.relay_url) { + Some(conn) => conn, + None => { + tracing::warn!( + relay = %action.relay_url, + "No connection for relay, cannot subscribe" + ); + return; + } + }; + + // Subscribe to each filter and collect subscription IDs + let mut subscription_ids = Vec::new(); + for filter in &action.filters { + match connection.subscribe_filter(filter.clone()).await { + Ok(sub_id) => { + subscription_ids.push(sub_id); + } + Err(e) => { + tracing::error!( + relay = %action.relay_url, + error = %e, + "Failed to subscribe to filter" + ); + } + } + } + + if subscription_ids.is_empty() && !action.filters.is_empty() { + tracing::warn!( + relay = %action.relay_url, + "All filter subscriptions failed, not creating batch" + ); + return; + } + + // Step 4: Create PendingBatch + let batch_id = self.next_batch_id(); + let batch = PendingBatch { + batch_id, + items: PendingItems { + repos: action.repos.clone(), + root_events: action.root_events.clone(), + }, + outstanding_subs: subscription_ids.into_iter().collect(), + }; + + // Step 5: Add to pending_sync_index + { + let mut pending = self.pending_sync_index.write().await; + pending + .entry(action.relay_url.clone()) + .or_insert_with(Vec::new) + .push(batch); + } + + tracing::debug!( + relay = %action.relay_url, + batch_id = batch_id, + repos = action.repos.len(), + root_events = action.root_events.len(), + filters = action.filters.len(), + "Created pending batch for filter subscriptions" + ); + } + /// Handle a connection success (called when a relay connects or reconnects) /// /// This method implements smart reconnection logic: @@ -798,59 +939,57 @@ impl SyncManager { /// Recompute sync actions for a specific relay /// /// Uses derive_relay_targets and compute_actions to find new items - /// that need to be synced. For Phase 4, this just logs the actions; - /// full handling will be implemented in Phase 5. - async fn recompute_actions_for_relay(&self, relay_url: &str) { + /// that need to be synced. Processes AddFilters actions for new items. + async fn recompute_actions_for_relay(&mut self, relay_url: &str) { use crate::sync::algorithms::{compute_actions, derive_relay_targets}; - // Get current state from indexes - let repo_index = self.repo_sync_index.read().await; - let pending_index = self.pending_sync_index.read().await; - let relay_index = self.relay_sync_index.read().await; - - // Derive per-relay targets from repo index - let all_targets = derive_relay_targets(&repo_index); + // Get current state from indexes (need to collect to avoid holding locks) + let all_targets = { + let repo_index = self.repo_sync_index.read().await; + derive_relay_targets(&repo_index) + }; // Filter to only targets for this specific relay - let relay_target = all_targets.get(relay_url); + let relay_target = match all_targets.get(relay_url) { + Some(target) => target.clone(), + None => { + tracing::debug!( + relay = %relay_url, + "No sync targets found for relay" + ); + return; + } + }; + + // Build single-relay targets map for compute_actions + let mut single_relay_targets = std::collections::HashMap::new(); + single_relay_targets.insert(relay_url.to_string(), relay_target); + + // Compute actions for new items + let actions = { + let pending_index = self.pending_sync_index.read().await; + let relay_index = self.relay_sync_index.read().await; + compute_actions(&single_relay_targets, &pending_index, &relay_index) + }; - if relay_target.is_none() { + if actions.is_empty() { tracing::debug!( relay = %relay_url, - "No sync targets found for relay" + "No new items to sync for relay" ); return; } - // Build single-relay targets map for compute_actions - let mut single_relay_targets = std::collections::HashMap::new(); - if let Some(target) = relay_target { - single_relay_targets.insert(relay_url.to_string(), target.clone()); - } - - // Compute actions for new items - let actions = compute_actions( - &single_relay_targets, - &pending_index, - &relay_index, - ); - - // Log the actions (Phase 5 will process them) - for action in &actions { + // Process each action + for action in actions { tracing::info!( relay = %action.relay_url, new_repos = action.repos.len(), new_root_events = action.root_events.len(), filters = action.filters.len(), - "Discovered new items to sync (Phase 5 will process)" - ); - } - - if actions.is_empty() { - tracing::debug!( - relay = %relay_url, - "No new items to sync for relay" + "Processing AddFilters for new items" ); + self.handle_add_filters(action).await; } } @@ -912,179 +1051,81 @@ impl SyncManager { ); } - /// Spawn relay connection with Layer 2 filters for specific repos + /// Spawn a relay connection and start its event loop /// - /// Used when discovering relays from announcements. Connects to the relay, - /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the - /// specific repos we want to sync. - async fn spawn_relay_with_layer2( - &self, - relay_url: String, - repos: HashMap>, - disconnect_tx: tokio::sync::mpsc::Sender, - eose_tx: tokio::sync::mpsc::Sender, - connect_tx: tokio::sync::mpsc::Sender, - ) { - use crate::sync::filters::build_layer2_and_layer3_filters; + /// Creates a new RelayConnection, connects to Layer 1, stores the connection, + /// and spawns event processing tasks. Uses stored channel senders for notifications. + async fn spawn_relay_connection(&mut self, relay_url: String) { use tokio::sync::mpsc; - let database = Arc::clone(&self.database); - let write_policy = self.write_policy.clone(); - let relay_sync_index = Arc::clone(&self.relay_sync_index); - - // Create relay connection - let connection = RelayConnection::new(relay_url.clone()); - - // Connect and subscribe to Layer 1 (announcements) - if let Err(e) = connection.connect_and_subscribe(None).await { - tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); - return; - } - - // Mark as connected in relay sync index - { - let mut index = relay_sync_index.write().await; - index.insert( - relay_url.clone(), - RelayState { - repos: repos.keys().cloned().collect(), - root_events: repos.values().flatten().cloned().collect(), - is_bootstrap: false, - connection_status: ConnectionStatus::Connected, - last_connected: Some(Timestamp::now()), - disconnected_at: None, - }, - ); - } - - // Notify SyncManager of successful connection - let _ = connect_tx - .send(ConnectNotification { - relay_url: relay_url.clone(), - }) - .await; - - // Subscribe to Layer 2+3 filters for the repos - let repo_ids: HashSet = repos.keys().cloned().collect(); - let root_events: HashSet = repos.values().flatten().cloned().collect(); - let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None); - - for filter in filters { - if let Err(e) = connection.subscribe_filter(filter).await { + // Get channel senders (must exist during run) + let disconnect_tx = match &self.disconnect_tx { + Some(tx) => tx.clone(), + None => { tracing::error!( relay = %relay_url, - error = %e, - "Failed to subscribe to Layer 2 filter" + "Cannot spawn connection - channels not initialized" ); + return; } - } - - tracing::info!( - relay = %relay_url, - repo_count = repos.len(), - "Connected to discovered relay with Layer 2+3 filters" - ); - - // Create event channel - let (event_tx, mut event_rx) = mpsc::channel::(1000); - - // Spawn event loop - tokio::spawn(async move { - connection.run_event_loop(event_tx).await; - }); - - // Spawn event processor - let relay_url_clone = relay_url.clone(); - tokio::spawn(async move { - while let Some(relay_event) = event_rx.recv().await { - match relay_event { - RelayEvent::Event(event) => { - Self::process_event_static( - &event, - &relay_url_clone, - &database, - &write_policy, - ) - .await; - } - RelayEvent::EndOfStoredEvents(sub_id) => { - tracing::debug!( - relay = %relay_url_clone, - sub_id = %sub_id, - "EOSE received, notifying SyncManager" - ); - // Notify SyncManager of EOSE - let _ = eose_tx - .send(EoseNotification { - relay_url: relay_url_clone.clone(), - sub_id, - }) - .await; - } - RelayEvent::Closed(reason) => { - tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed"); - // Notify SyncManager of disconnect - let _ = disconnect_tx - .send(DisconnectNotification { - relay_url: relay_url_clone.clone(), - }) - .await; - break; - } - RelayEvent::Shutdown => { - tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); - // Notify SyncManager of disconnect - let _ = disconnect_tx - .send(DisconnectNotification { - relay_url: relay_url_clone.clone(), - }) - .await; - break; - } - } + }; + let eose_tx = match &self.eose_tx { + Some(tx) => tx.clone(), + None => { + tracing::error!( + relay = %relay_url, + "Cannot spawn connection - channels not initialized" + ); + return; } - }); - } - - /// Spawn a relay connection and start its event loop - async fn spawn_relay_connection( - &self, - relay_url: String, - disconnect_tx: tokio::sync::mpsc::Sender, - eose_tx: tokio::sync::mpsc::Sender, - connect_tx: tokio::sync::mpsc::Sender, - ) { - use tokio::sync::mpsc; + }; + let connect_tx = match &self.connect_tx { + Some(tx) => tx.clone(), + None => { + tracing::error!( + relay = %relay_url, + "Cannot spawn connection - channels not initialized" + ); + return; + } + }; let database = Arc::clone(&self.database); let write_policy = self.write_policy.clone(); let relay_sync_index = Arc::clone(&self.relay_sync_index); + // Check if this is a bootstrap relay + let is_bootstrap = self.bootstrap_relay_url.as_ref() == Some(&relay_url); + // Create relay connection let connection = RelayConnection::new(relay_url.clone()); // Connect and subscribe to Layer 1 if let Err(e) = connection.connect_and_subscribe(None).await { - tracing::error!("Failed to connect to relay {}: {}", relay_url, e); + tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); + // Update state to disconnected on failure + { + let mut index = relay_sync_index.write().await; + if let Some(state) = index.get_mut(&relay_url) { + state.connection_status = ConnectionStatus::Disconnected; + } + } return; } // Mark as connected in relay sync index { let mut index = relay_sync_index.write().await; - index.insert( - relay_url.clone(), - RelayState { - repos: HashSet::new(), - root_events: HashSet::new(), - is_bootstrap: true, - connection_status: ConnectionStatus::Connected, - last_connected: Some(Timestamp::now()), - disconnected_at: None, - }, - ); + let state = index.entry(relay_url.clone()).or_default(); + state.connection_status = ConnectionStatus::Connected; + state.is_bootstrap = is_bootstrap; + state.last_connected = Some(Timestamp::now()); + state.disconnected_at = None; } + // Store connection for later use (for subscribing to filters) + self.connections.insert(relay_url.clone(), connection); + // Notify SyncManager of successful connection let _ = connect_tx .send(ConnectNotification { @@ -1092,6 +1133,16 @@ impl SyncManager { }) .await; + // Get the connection back for the event loop + // We need to take it out because run_event_loop consumes self + let connection = match self.connections.remove(&relay_url) { + Some(conn) => conn, + None => { + tracing::error!(relay = %relay_url, "Connection disappeared after insert"); + return; + } + }; + // Create event channel let (event_tx, mut event_rx) = mpsc::channel::(1000); @@ -1155,6 +1206,12 @@ impl SyncManager { } } }); + + tracing::info!( + relay = %relay_url, + is_bootstrap = is_bootstrap, + "Spawned relay connection" + ); } /// Process a single event from a relay (static version for spawned tasks) -- cgit v1.2.3