From 4f171fe53f24b54718a717a77b447175177e29a5 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 11:40:52 +0000 Subject: sync: implement disconnect handler with state cleanup --- src/sync/mod.rs | 153 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 140 insertions(+), 13 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 9ac62ed..6e50eba 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -320,6 +320,13 @@ pub mod event_source { // SyncManager - Main Entry Point // ============================================================================= +/// Notification from spawned tasks about relay disconnections +#[derive(Debug)] +pub struct DisconnectNotification { + /// The relay URL that disconnected + pub relay_url: String, +} + /// Manages proactive synchronization with external relays /// /// The SyncManager runs as a background task, subscribing to repository @@ -388,7 +395,8 @@ impl SyncManager { /// 1. Spawns self-subscriber to monitor own relay for announcements /// 2. Connects to bootstrap relay if configured /// 3. Handles relay actions from self-subscriber - pub async fn run(self) { + /// 4. Handles disconnect notifications from spawned relay tasks + pub async fn run(mut self) { use tokio::sync::mpsc; tracing::info!( @@ -400,7 +408,10 @@ impl SyncManager { // 1. Create action channel for self-subscriber -> manager communication let (action_tx, mut action_rx) = mpsc::channel::(100); - // 2. Spawn self-subscriber + // 2. Create disconnect channel for spawned tasks -> manager communication + let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); + + // 3. Spawn self-subscriber let self_subscriber = SelfSubscriber::new( format!("ws://{}", self.service_domain), self.service_domain.clone(), @@ -409,12 +420,13 @@ impl SyncManager { ); tokio::spawn(async move { self_subscriber.run().await }); - // 3. Connect to bootstrap relay if configured + // 4. Connect to bootstrap relay if configured if let Some(ref bootstrap_url) = self.bootstrap_relay_url { - self.spawn_relay_connection(bootstrap_url.clone()).await; + self.spawn_relay_connection(bootstrap_url.clone(), disconnect_tx.clone()) + .await; } - // 4. Main loop - handle actions from self-subscriber + // 5. Main loop - handle actions from self-subscriber and disconnect notifications loop { tokio::select! { action = action_rx.recv() => { @@ -427,7 +439,7 @@ impl SyncManager { if !exists { tracing::info!(relay = %relay_url, "Spawning new relay connection"); - self.spawn_relay_with_layer2(relay_url, repos).await; + self.spawn_relay_with_layer2(relay_url, repos, disconnect_tx.clone()).await; } else { tracing::debug!( relay = %relay_url, @@ -449,8 +461,77 @@ impl SyncManager { None => break, } } + disconnect = disconnect_rx.recv() => { + match disconnect { + Some(notification) => { + self.handle_disconnect(¬ification.relay_url).await; + } + None => { + // All disconnect senders dropped - unlikely but handle gracefully + tracing::debug!("Disconnect channel closed"); + } + } + } + } + } + } + + /// Handle a relay disconnection + /// + /// This method: + /// - Updates the RelayState in relay_sync_index to Disconnected status + /// - Sets disconnected_at timestamp + /// - Clears pending sync batches for this relay + /// - Removes the relay from active connections + /// - Records the failure in health tracker + async fn handle_disconnect(&mut self, relay_url: &str) { + tracing::warn!(relay = %relay_url, "Handling relay disconnect"); + + // 1. Update RelayState in relay_sync_index + { + let mut index = self.relay_sync_index.write().await; + if let Some(state) = index.get_mut(relay_url) { + state.connection_status = ConnectionStatus::Disconnected; + state.disconnected_at = Some(Timestamp::now()); + tracing::info!( + relay = %relay_url, + repos_tracked = state.repos.len(), + "Relay state updated to disconnected" + ); + } else { + tracing::debug!( + relay = %relay_url, + "No RelayState found for disconnected relay" + ); + } + } + + // 2. Clear pending sync batches for this relay + { + let mut pending = self.pending_sync_index.write().await; + if pending.remove(relay_url).is_some() { + tracing::debug!( + relay = %relay_url, + "Cleared pending sync batches for disconnected relay" + ); } } + + // 3. Remove from active connections + if self.connections.remove(relay_url).is_some() { + tracing::debug!( + relay = %relay_url, + "Removed relay from active connections" + ); + } + + // 4. Record failure in health tracker + self.health_tracker.record_failure(relay_url); + tracing::info!( + relay = %relay_url, + health_state = %self.health_tracker.get_state(relay_url), + "Relay disconnect handling complete" + ); } /// Spawn relay connection with Layer 2 filters for specific repos @@ -462,6 +543,7 @@ impl SyncManager { &self, relay_url: String, repos: HashMap>, + disconnect_tx: tokio::sync::mpsc::Sender, ) { use crate::sync::filters::build_layer2_and_layer3_filters; use tokio::sync::mpsc; @@ -541,8 +623,24 @@ impl SyncManager { RelayEvent::EndOfStoredEvents(_) => { tracing::debug!(relay = %relay_url_clone, "EOSE received"); } - RelayEvent::Closed(_) | RelayEvent::Shutdown => { - tracing::info!(relay = %relay_url_clone, "Relay disconnected"); + RelayEvent::Closed(reason) => { + tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed"); + // Notify SyncManager of disconnect + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; + break; + } + RelayEvent::Shutdown => { + tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); + // Notify SyncManager of disconnect + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; break; } } @@ -551,7 +649,11 @@ impl SyncManager { } /// Spawn a relay connection and start its event loop - async fn spawn_relay_connection(&self, relay_url: String) { + async fn spawn_relay_connection( + &self, + relay_url: String, + disconnect_tx: tokio::sync::mpsc::Sender, + ) { use tokio::sync::mpsc; let database = Arc::clone(&self.database); @@ -597,14 +699,39 @@ impl SyncManager { while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event) => { - Self::process_event_static(&event, &relay_url_clone, &database, &write_policy) - .await; + Self::process_event_static( + &event, + &relay_url_clone, + &database, + &write_policy, + ) + .await; } RelayEvent::EndOfStoredEvents(_) => { tracing::debug!("EOSE from {}", relay_url_clone); } - RelayEvent::Closed(_) | RelayEvent::Shutdown => { - tracing::info!("Relay {} disconnected", relay_url_clone); + RelayEvent::Closed(reason) => { + tracing::info!( + relay = %relay_url_clone, + reason = %reason, + "Relay connection closed" + ); + // Notify SyncManager of disconnect + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; + break; + } + RelayEvent::Shutdown => { + tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); + // Notify SyncManager of disconnect + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; break; } } -- cgit v1.2.3