From 8cfe8546e5ed1118adae6bfa041611e94d15c6dd Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 12:51:01 +0000 Subject: sync: implement graceful shutdown for all tasks and connections --- src/sync/mod.rs | 138 ++++++++++++++++++++++++++++++++---------- src/sync/self_subscriber.rs | 144 +++++++++++++++++++++++++++++++------------- 2 files changed, 208 insertions(+), 74 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 108ebe9..9a8857c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -36,7 +36,7 @@ use std::time::Duration; use nostr_sdk::prelude::*; use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{broadcast, Mutex, RwLock}; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; @@ -369,7 +369,10 @@ const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; /// - Re-discovers all repos and events from scratch /// /// This detects state drift over time that might occur from missed events. -async fn run_daily_timer(sync_manager: Arc>) { +async fn run_daily_timer( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { use rand::Rng; loop { @@ -383,26 +386,33 @@ async fn run_daily_timer(sync_manager: Arc>) { hours ); - tokio::time::sleep(Duration::from_secs(seconds)).await; - - // Get list of relays - let relay_urls: Vec = { - let manager = sync_manager.lock().await; - let index = manager.relay_sync_index.read().await; - let urls: Vec = index.keys().cloned().collect(); - drop(index); - urls - }; + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(seconds)) => { + // Timer fired - do daily sync + // Get list of relays + let relay_urls: Vec = { + let manager = sync_manager.lock().await; + let index = manager.relay_sync_index.read().await; + let urls: Vec = index.keys().cloned().collect(); + drop(index); + urls + }; - tracing::info!( - relay_count = relay_urls.len(), - "Daily timer fired, starting daily sync for all relays" - ); + tracing::info!( + relay_count = relay_urls.len(), + "Daily timer fired, starting daily sync for all relays" + ); - // Trigger daily sync for each relay - for relay_url in relay_urls { - let mut manager = sync_manager.lock().await; - manager.daily_sync(&relay_url).await; + // Trigger daily sync for each relay + for relay_url in relay_urls { + let mut manager = sync_manager.lock().await; + manager.daily_sync(&relay_url).await; + } + } + _ = shutdown_rx.recv() => { + tracing::info!("Daily timer received shutdown signal"); + break; + } } } } @@ -421,15 +431,23 @@ const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60; /// that are empty will be disconnected to free up resources. /// /// Bootstrap relays are never disconnected, even if empty. -async fn run_disconnect_checker(sync_manager: Arc>) { +async fn run_disconnect_checker( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { loop { - // Check every 60 seconds - tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)).await; - - tracing::debug!("Disconnect checker running"); + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => { + tracing::debug!("Disconnect checker running"); - let mut manager = sync_manager.lock().await; - manager.check_disconnects().await; + let mut manager = sync_manager.lock().await; + manager.check_disconnects().await; + } + _ = shutdown_rx.recv() => { + tracing::info!("Disconnect checker received shutdown signal"); + break; + } + } } } @@ -468,6 +486,8 @@ pub struct SyncManager { eose_tx: Option>, /// Channel for connect notifications (set during run) connect_tx: Option>, + /// Channel for broadcasting shutdown signal to all background tasks + shutdown_tx: Option>, } impl SyncManager { @@ -501,6 +521,7 @@ impl SyncManager { disconnect_tx: None, eose_tx: None, connect_tx: None, + shutdown_tx: None, } } @@ -691,19 +712,24 @@ impl SyncManager { // 4. Create connect channel for spawned tasks -> manager communication let (connect_tx, mut connect_rx) = mpsc::channel::(100); - // 5. Spawn self-subscriber + // 4b. Create shutdown broadcast channel for graceful shutdown + let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); + + // 5. Spawn self-subscriber with shutdown receiver let self_subscriber = SelfSubscriber::new( format!("ws://{}", self.service_domain), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), action_tx, ); - tokio::spawn(async move { self_subscriber.run().await }); + let subscriber_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); // 5b. Store channel senders for use by handlers self.disconnect_tx = Some(disconnect_tx.clone()); self.eose_tx = Some(eose_tx.clone()); self.connect_tx = Some(connect_tx.clone()); + self.shutdown_tx = Some(shutdown_tx.clone()); // 6. Connect to bootstrap relay if configured if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { @@ -713,16 +739,18 @@ impl SyncManager { // 7. Wrap self in Arc for sharing with timer task let sync_manager = Arc::new(Mutex::new(self)); - // 8. Spawn daily timer task + // 8. Spawn daily timer task with shutdown receiver let timer_manager = Arc::clone(&sync_manager); + let timer_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_daily_timer(timer_manager).await; + run_daily_timer(timer_manager, timer_shutdown).await; }); - // 9. Spawn disconnect checker task + // 9. Spawn disconnect checker task with shutdown receiver let checker_manager = Arc::clone(&sync_manager); + let checker_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_disconnect_checker(checker_manager).await; + run_disconnect_checker(checker_manager, checker_shutdown).await; }); // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications @@ -1689,4 +1717,48 @@ impl SyncManager { tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); } + + /// Gracefully shutdown the SyncManager + /// + /// This method: + /// - Sends shutdown signal to all background tasks (daily timer, disconnect checker) + /// - Disconnects all relay connections + /// - Clears all indices (relay_sync_index, pending_sync_index) + /// + /// After calling this method, the SyncManager is no longer usable. + pub async fn shutdown(&mut self) { + tracing::info!("Starting SyncManager shutdown"); + + // 1. Send shutdown signal to all background tasks + if let Some(tx) = &self.shutdown_tx { + let _ = tx.send(()); + tracing::debug!("Sent shutdown signal to background tasks"); + } + + // 2. Disconnect all relay connections + let relay_urls: Vec = self.connections.keys().cloned().collect(); + for relay_url in relay_urls { + if let Some(connection) = self.connections.remove(&relay_url) { + tracing::debug!(relay = %relay_url, "Disconnecting relay"); + connection.disconnect().await; + } + } + + // 3. Clear all indices + { + let mut index = self.relay_sync_index.write().await; + let count = index.len(); + index.clear(); + tracing::debug!(count = count, "Cleared relay_sync_index"); + } + + { + let mut pending = self.pending_sync_index.write().await; + let count = pending.len(); + pending.clear(); + tracing::debug!(count = count, "Cleared pending_sync_index"); + } + + tracing::info!("SyncManager shutdown complete"); + } } \ No newline at end of file diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 229d2e1..73cea2f 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -13,7 +13,7 @@ use std::time::Duration; use nostr_sdk::prelude::*; use nostr_sdk::Timestamp; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use super::{RepoSyncIndex, RepoSyncNeeds}; @@ -209,7 +209,10 @@ impl SelfSubscriber { /// /// Connects to own relay, subscribes to relevant event kinds, /// and batches updates before processing them. - pub async fn run(mut self) { + /// + /// The optional shutdown receiver allows graceful termination when + /// received via the broadcast channel. + pub async fn run(mut self, mut shutdown_rx: Option>) { let client = Client::default(); // Add own relay @@ -281,53 +284,112 @@ impl SelfSubscriber { timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - tokio::select! { - notification = notifications.recv() => { - match notification { - Ok(RelayPoolNotification::Event { event, .. }) => { - // Only process 30617 events that list our relay - if event.kind == Kind::Custom(30617) { - if !self.lists_our_relay(&event) { - continue; - } - - // Extract repo ID and relays - if let Some(repo_id) = Self::extract_repo_id(&event) { - let relays = Self::extract_relay_urls(&event); - let mut root_events = HashSet::new(); - root_events.insert(event.id); - - pending.add_repo(repo_id, relays, root_events); - tracing::debug!( + // Build the select based on whether we have a shutdown receiver + if let Some(ref mut rx) = shutdown_rx { + tokio::select! { + notification = notifications.recv() => { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + // Only process 30617 events that list our relay + if event.kind == Kind::Custom(30617) { + if !self.lists_our_relay(&event) { + continue; + } + + // Extract repo ID and relays + if let Some(repo_id) = Self::extract_repo_id(&event) { + let relays = Self::extract_relay_urls(&event); + let mut root_events = HashSet::new(); + root_events.insert(event.id); + + pending.add_repo(repo_id, relays, root_events); + tracing::debug!( + event_id = %event.id, + "Queued 30617 announcement for batch processing" + ); + } + } else { + // For root event kinds (1617, 1618, 1619, 1621), + // process them to update the RepoSyncIndex + tracing::trace!( + kind = %event.kind, event_id = %event.id, - "Queued 30617 announcement for batch processing" + "Received root event" ); + self.handle_root_event(&event).await; } - } else { - // For root event kinds (1617, 1618, 1619, 1621), - // process them to update the RepoSyncIndex - tracing::trace!( - kind = %event.kind, - event_id = %event.id, - "Received root event" - ); - self.handle_root_event(&event).await; } + Ok(RelayPoolNotification::Shutdown) => { + tracing::info!("SelfSubscriber received shutdown notification"); + break; + } + Err(e) => { + tracing::error!(error = %e, "Error receiving notification"); + break; + } + _ => {} } - Ok(RelayPoolNotification::Shutdown) => { - tracing::info!("SelfSubscriber received shutdown notification"); - break; - } - Err(e) => { - tracing::error!(error = %e, "Error receiving notification"); - break; + } + _ = timer.tick() => { + if !pending.is_empty() { + self.process_batch(&mut pending).await; } - _ => {} + } + _ = rx.recv() => { + tracing::info!("SelfSubscriber received shutdown signal"); + break; } } - _ = timer.tick() => { - if !pending.is_empty() { - self.process_batch(&mut pending).await; + } else { + // No shutdown receiver - original behavior + tokio::select! { + notification = notifications.recv() => { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + // Only process 30617 events that list our relay + if event.kind == Kind::Custom(30617) { + if !self.lists_our_relay(&event) { + continue; + } + + // Extract repo ID and relays + if let Some(repo_id) = Self::extract_repo_id(&event) { + let relays = Self::extract_relay_urls(&event); + let mut root_events = HashSet::new(); + root_events.insert(event.id); + + pending.add_repo(repo_id, relays, root_events); + tracing::debug!( + event_id = %event.id, + "Queued 30617 announcement for batch processing" + ); + } + } else { + // For root event kinds (1617, 1618, 1619, 1621), + // process them to update the RepoSyncIndex + tracing::trace!( + kind = %event.kind, + event_id = %event.id, + "Received root event" + ); + self.handle_root_event(&event).await; + } + } + Ok(RelayPoolNotification::Shutdown) => { + tracing::info!("SelfSubscriber received shutdown notification"); + break; + } + Err(e) => { + tracing::error!(error = %e, "Error receiving notification"); + break; + } + _ => {} + } + } + _ = timer.tick() => { + if !pending.is_empty() { + self.process_batch(&mut pending).await; + } } } } -- cgit v1.2.3