From 8cfe8546e5ed1118adae6bfa041611e94d15c6dd Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 12:51:01 +0000 Subject: sync: implement graceful shutdown for all tasks and connections --- src/sync/mod.rs | 138 ++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 105 insertions(+), 33 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 108ebe9..9a8857c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -36,7 +36,7 @@ use std::time::Duration; use nostr_sdk::prelude::*; use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{broadcast, Mutex, RwLock}; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; @@ -369,7 +369,10 @@ const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; /// - Re-discovers all repos and events from scratch /// /// This detects state drift over time that might occur from missed events. -async fn run_daily_timer(sync_manager: Arc>) { +async fn run_daily_timer( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { use rand::Rng; loop { @@ -383,26 +386,33 @@ async fn run_daily_timer(sync_manager: Arc>) { hours ); - tokio::time::sleep(Duration::from_secs(seconds)).await; - - // Get list of relays - let relay_urls: Vec = { - let manager = sync_manager.lock().await; - let index = manager.relay_sync_index.read().await; - let urls: Vec = index.keys().cloned().collect(); - drop(index); - urls - }; + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(seconds)) => { + // Timer fired - do daily sync + // Get list of relays + let relay_urls: Vec = { + let manager = sync_manager.lock().await; + let index = manager.relay_sync_index.read().await; + let urls: Vec = index.keys().cloned().collect(); + drop(index); + urls + }; - tracing::info!( - relay_count = relay_urls.len(), - "Daily timer fired, starting daily sync for all relays" - ); + tracing::info!( + relay_count = relay_urls.len(), + "Daily timer fired, starting daily sync for all relays" + ); - // Trigger daily sync for each relay - for relay_url in relay_urls { - let mut manager = sync_manager.lock().await; - manager.daily_sync(&relay_url).await; + // Trigger daily sync for each relay + for relay_url in relay_urls { + let mut manager = sync_manager.lock().await; + manager.daily_sync(&relay_url).await; + } + } + _ = shutdown_rx.recv() => { + tracing::info!("Daily timer received shutdown signal"); + break; + } } } } @@ -421,15 +431,23 @@ const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60; /// that are empty will be disconnected to free up resources. /// /// Bootstrap relays are never disconnected, even if empty. -async fn run_disconnect_checker(sync_manager: Arc>) { +async fn run_disconnect_checker( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { loop { - // Check every 60 seconds - tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)).await; - - tracing::debug!("Disconnect checker running"); + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => { + tracing::debug!("Disconnect checker running"); - let mut manager = sync_manager.lock().await; - manager.check_disconnects().await; + let mut manager = sync_manager.lock().await; + manager.check_disconnects().await; + } + _ = shutdown_rx.recv() => { + tracing::info!("Disconnect checker received shutdown signal"); + break; + } + } } } @@ -468,6 +486,8 @@ pub struct SyncManager { eose_tx: Option>, /// Channel for connect notifications (set during run) connect_tx: Option>, + /// Channel for broadcasting shutdown signal to all background tasks + shutdown_tx: Option>, } impl SyncManager { @@ -501,6 +521,7 @@ impl SyncManager { disconnect_tx: None, eose_tx: None, connect_tx: None, + shutdown_tx: None, } } @@ -691,19 +712,24 @@ impl SyncManager { // 4. Create connect channel for spawned tasks -> manager communication let (connect_tx, mut connect_rx) = mpsc::channel::(100); - // 5. Spawn self-subscriber + // 4b. Create shutdown broadcast channel for graceful shutdown + let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); + + // 5. Spawn self-subscriber with shutdown receiver let self_subscriber = SelfSubscriber::new( format!("ws://{}", self.service_domain), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), action_tx, ); - tokio::spawn(async move { self_subscriber.run().await }); + let subscriber_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); // 5b. Store channel senders for use by handlers self.disconnect_tx = Some(disconnect_tx.clone()); self.eose_tx = Some(eose_tx.clone()); self.connect_tx = Some(connect_tx.clone()); + self.shutdown_tx = Some(shutdown_tx.clone()); // 6. Connect to bootstrap relay if configured if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { @@ -713,16 +739,18 @@ impl SyncManager { // 7. Wrap self in Arc for sharing with timer task let sync_manager = Arc::new(Mutex::new(self)); - // 8. Spawn daily timer task + // 8. Spawn daily timer task with shutdown receiver let timer_manager = Arc::clone(&sync_manager); + let timer_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_daily_timer(timer_manager).await; + run_daily_timer(timer_manager, timer_shutdown).await; }); - // 9. Spawn disconnect checker task + // 9. Spawn disconnect checker task with shutdown receiver let checker_manager = Arc::clone(&sync_manager); + let checker_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_disconnect_checker(checker_manager).await; + run_disconnect_checker(checker_manager, checker_shutdown).await; }); // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications @@ -1689,4 +1717,48 @@ impl SyncManager { tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); } + + /// Gracefully shutdown the SyncManager + /// + /// This method: + /// - Sends shutdown signal to all background tasks (daily timer, disconnect checker) + /// - Disconnects all relay connections + /// - Clears all indices (relay_sync_index, pending_sync_index) + /// + /// After calling this method, the SyncManager is no longer usable. + pub async fn shutdown(&mut self) { + tracing::info!("Starting SyncManager shutdown"); + + // 1. Send shutdown signal to all background tasks + if let Some(tx) = &self.shutdown_tx { + let _ = tx.send(()); + tracing::debug!("Sent shutdown signal to background tasks"); + } + + // 2. Disconnect all relay connections + let relay_urls: Vec = self.connections.keys().cloned().collect(); + for relay_url in relay_urls { + if let Some(connection) = self.connections.remove(&relay_url) { + tracing::debug!(relay = %relay_url, "Disconnecting relay"); + connection.disconnect().await; + } + } + + // 3. Clear all indices + { + let mut index = self.relay_sync_index.write().await; + let count = index.len(); + index.clear(); + tracing::debug!(count = count, "Cleared relay_sync_index"); + } + + { + let mut pending = self.pending_sync_index.write().await; + let count = pending.len(); + pending.clear(); + tracing::debug!(count = count, "Cleared pending_sync_index"); + } + + tracing::info!("SyncManager shutdown complete"); + } } \ No newline at end of file -- cgit v1.2.3