From 8cfe8546e5ed1118adae6bfa041611e94d15c6dd Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 12:51:01 +0000 Subject: sync: implement graceful shutdown for all tasks and connections --- src/sync/self_subscriber.rs | 144 +++++++++++++++++++++++++++++++------------- 1 file changed, 103 insertions(+), 41 deletions(-) (limited to 'src/sync/self_subscriber.rs') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 229d2e1..73cea2f 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -13,7 +13,7 @@ use std::time::Duration; use nostr_sdk::prelude::*; use nostr_sdk::Timestamp; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use super::{RepoSyncIndex, RepoSyncNeeds}; @@ -209,7 +209,10 @@ impl SelfSubscriber { /// /// Connects to own relay, subscribes to relevant event kinds, /// and batches updates before processing them. - pub async fn run(mut self) { + /// + /// The optional shutdown receiver allows graceful termination when + /// received via the broadcast channel. + pub async fn run(mut self, mut shutdown_rx: Option>) { let client = Client::default(); // Add own relay @@ -281,53 +284,112 @@ impl SelfSubscriber { timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - tokio::select! { - notification = notifications.recv() => { - match notification { - Ok(RelayPoolNotification::Event { event, .. }) => { - // Only process 30617 events that list our relay - if event.kind == Kind::Custom(30617) { - if !self.lists_our_relay(&event) { - continue; - } - - // Extract repo ID and relays - if let Some(repo_id) = Self::extract_repo_id(&event) { - let relays = Self::extract_relay_urls(&event); - let mut root_events = HashSet::new(); - root_events.insert(event.id); - - pending.add_repo(repo_id, relays, root_events); - tracing::debug!( + // Build the select based on whether we have a shutdown receiver + if let Some(ref mut rx) = shutdown_rx { + tokio::select! { + notification = notifications.recv() => { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + // Only process 30617 events that list our relay + if event.kind == Kind::Custom(30617) { + if !self.lists_our_relay(&event) { + continue; + } + + // Extract repo ID and relays + if let Some(repo_id) = Self::extract_repo_id(&event) { + let relays = Self::extract_relay_urls(&event); + let mut root_events = HashSet::new(); + root_events.insert(event.id); + + pending.add_repo(repo_id, relays, root_events); + tracing::debug!( + event_id = %event.id, + "Queued 30617 announcement for batch processing" + ); + } + } else { + // For root event kinds (1617, 1618, 1619, 1621), + // process them to update the RepoSyncIndex + tracing::trace!( + kind = %event.kind, event_id = %event.id, - "Queued 30617 announcement for batch processing" + "Received root event" ); + self.handle_root_event(&event).await; } - } else { - // For root event kinds (1617, 1618, 1619, 1621), - // process them to update the RepoSyncIndex - tracing::trace!( - kind = %event.kind, - event_id = %event.id, - "Received root event" - ); - self.handle_root_event(&event).await; } + Ok(RelayPoolNotification::Shutdown) => { + tracing::info!("SelfSubscriber received shutdown notification"); + break; + } + Err(e) => { + tracing::error!(error = %e, "Error receiving notification"); + break; + } + _ => {} } - Ok(RelayPoolNotification::Shutdown) => { - tracing::info!("SelfSubscriber received shutdown notification"); - break; - } - Err(e) => { - tracing::error!(error = %e, "Error receiving notification"); - break; + } + _ = timer.tick() => { + if !pending.is_empty() { + self.process_batch(&mut pending).await; } - _ => {} + } + _ = rx.recv() => { + tracing::info!("SelfSubscriber received shutdown signal"); + break; } } - _ = timer.tick() => { - if !pending.is_empty() { - self.process_batch(&mut pending).await; + } else { + // No shutdown receiver - original behavior + tokio::select! { + notification = notifications.recv() => { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + // Only process 30617 events that list our relay + if event.kind == Kind::Custom(30617) { + if !self.lists_our_relay(&event) { + continue; + } + + // Extract repo ID and relays + if let Some(repo_id) = Self::extract_repo_id(&event) { + let relays = Self::extract_relay_urls(&event); + let mut root_events = HashSet::new(); + root_events.insert(event.id); + + pending.add_repo(repo_id, relays, root_events); + tracing::debug!( + event_id = %event.id, + "Queued 30617 announcement for batch processing" + ); + } + } else { + // For root event kinds (1617, 1618, 1619, 1621), + // process them to update the RepoSyncIndex + tracing::trace!( + kind = %event.kind, + event_id = %event.id, + "Received root event" + ); + self.handle_root_event(&event).await; + } + } + Ok(RelayPoolNotification::Shutdown) => { + tracing::info!("SelfSubscriber received shutdown notification"); + break; + } + Err(e) => { + tracing::error!(error = %e, "Error receiving notification"); + break; + } + _ => {} + } + } + _ = timer.tick() => { + if !pending.is_empty() { + self.process_batch(&mut pending).await; + } } } } -- cgit v1.2.3