upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:51:01 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:51:01 +0000
commit8cfe8546e5ed1118adae6bfa041611e94d15c6dd (patch)
treecf247943474cd95001c13b6e1eba0215a810f6dd /src/sync/mod.rs
parentc1730d5cafc3af2d5ec8f3bdbed5c32bb15fcb74 (diff)
sync: implement graceful shutdown for all tasks and connections
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs138
1 files changed, 105 insertions, 33 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 108ebe9..9a8857c 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -36,7 +36,7 @@ use std::time::Duration;
36 36
37use nostr_sdk::prelude::*; 37use nostr_sdk::prelude::*;
38use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; 38use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
39use tokio::sync::{Mutex, RwLock}; 39use tokio::sync::{broadcast, Mutex, RwLock};
40 40
41use crate::config::Config; 41use crate::config::Config;
42use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 42use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
@@ -369,7 +369,10 @@ const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30;
369/// - Re-discovers all repos and events from scratch 369/// - Re-discovers all repos and events from scratch
370/// 370///
371/// This detects state drift over time that might occur from missed events. 371/// This detects state drift over time that might occur from missed events.
372async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) { 372async fn run_daily_timer(
373 sync_manager: Arc<Mutex<SyncManager>>,
374 mut shutdown_rx: broadcast::Receiver<()>,
375) {
373 use rand::Rng; 376 use rand::Rng;
374 377
375 loop { 378 loop {
@@ -383,26 +386,33 @@ async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) {
383 hours 386 hours
384 ); 387 );
385 388
386 tokio::time::sleep(Duration::from_secs(seconds)).await; 389 tokio::select! {
387 390 _ = tokio::time::sleep(Duration::from_secs(seconds)) => {
388 // Get list of relays 391 // Timer fired - do daily sync
389 let relay_urls: Vec<String> = { 392 // Get list of relays
390 let manager = sync_manager.lock().await; 393 let relay_urls: Vec<String> = {
391 let index = manager.relay_sync_index.read().await; 394 let manager = sync_manager.lock().await;
392 let urls: Vec<String> = index.keys().cloned().collect(); 395 let index = manager.relay_sync_index.read().await;
393 drop(index); 396 let urls: Vec<String> = index.keys().cloned().collect();
394 urls 397 drop(index);
395 }; 398 urls
399 };
396 400
397 tracing::info!( 401 tracing::info!(
398 relay_count = relay_urls.len(), 402 relay_count = relay_urls.len(),
399 "Daily timer fired, starting daily sync for all relays" 403 "Daily timer fired, starting daily sync for all relays"
400 ); 404 );
401 405
402 // Trigger daily sync for each relay 406 // Trigger daily sync for each relay
403 for relay_url in relay_urls { 407 for relay_url in relay_urls {
404 let mut manager = sync_manager.lock().await; 408 let mut manager = sync_manager.lock().await;
405 manager.daily_sync(&relay_url).await; 409 manager.daily_sync(&relay_url).await;
410 }
411 }
412 _ = shutdown_rx.recv() => {
413 tracing::info!("Daily timer received shutdown signal");
414 break;
415 }
406 } 416 }
407 } 417 }
408} 418}
@@ -421,15 +431,23 @@ const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60;
421/// that are empty will be disconnected to free up resources. 431/// that are empty will be disconnected to free up resources.
422/// 432///
423/// Bootstrap relays are never disconnected, even if empty. 433/// Bootstrap relays are never disconnected, even if empty.
424async fn run_disconnect_checker(sync_manager: Arc<Mutex<SyncManager>>) { 434async fn run_disconnect_checker(
435 sync_manager: Arc<Mutex<SyncManager>>,
436 mut shutdown_rx: broadcast::Receiver<()>,
437) {
425 loop { 438 loop {
426 // Check every 60 seconds 439 tokio::select! {
427 tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)).await; 440 _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => {
428 441 tracing::debug!("Disconnect checker running");
429 tracing::debug!("Disconnect checker running");
430 442
431 let mut manager = sync_manager.lock().await; 443 let mut manager = sync_manager.lock().await;
432 manager.check_disconnects().await; 444 manager.check_disconnects().await;
445 }
446 _ = shutdown_rx.recv() => {
447 tracing::info!("Disconnect checker received shutdown signal");
448 break;
449 }
450 }
433 } 451 }
434} 452}
435 453
@@ -468,6 +486,8 @@ pub struct SyncManager {
468 eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>, 486 eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>,
469 /// Channel for connect notifications (set during run) 487 /// Channel for connect notifications (set during run)
470 connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>, 488 connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>,
489 /// Channel for broadcasting shutdown signal to all background tasks
490 shutdown_tx: Option<broadcast::Sender<()>>,
471} 491}
472 492
473impl SyncManager { 493impl SyncManager {
@@ -501,6 +521,7 @@ impl SyncManager {
501 disconnect_tx: None, 521 disconnect_tx: None,
502 eose_tx: None, 522 eose_tx: None,
503 connect_tx: None, 523 connect_tx: None,
524 shutdown_tx: None,
504 } 525 }
505 } 526 }
506 527
@@ -691,19 +712,24 @@ impl SyncManager {
691 // 4. Create connect channel for spawned tasks -> manager communication 712 // 4. Create connect channel for spawned tasks -> manager communication
692 let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100); 713 let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100);
693 714
694 // 5. Spawn self-subscriber 715 // 4b. Create shutdown broadcast channel for graceful shutdown
716 let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
717
718 // 5. Spawn self-subscriber with shutdown receiver
695 let self_subscriber = SelfSubscriber::new( 719 let self_subscriber = SelfSubscriber::new(
696 format!("ws://{}", self.service_domain), 720 format!("ws://{}", self.service_domain),
697 self.service_domain.clone(), 721 self.service_domain.clone(),
698 Arc::clone(&self.repo_sync_index), 722 Arc::clone(&self.repo_sync_index),
699 action_tx, 723 action_tx,
700 ); 724 );
701 tokio::spawn(async move { self_subscriber.run().await }); 725 let subscriber_shutdown = shutdown_tx.subscribe();
726 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });
702 727
703 // 5b. Store channel senders for use by handlers 728 // 5b. Store channel senders for use by handlers
704 self.disconnect_tx = Some(disconnect_tx.clone()); 729 self.disconnect_tx = Some(disconnect_tx.clone());
705 self.eose_tx = Some(eose_tx.clone()); 730 self.eose_tx = Some(eose_tx.clone());
706 self.connect_tx = Some(connect_tx.clone()); 731 self.connect_tx = Some(connect_tx.clone());
732 self.shutdown_tx = Some(shutdown_tx.clone());
707 733
708 // 6. Connect to bootstrap relay if configured 734 // 6. Connect to bootstrap relay if configured
709 if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { 735 if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() {
@@ -713,16 +739,18 @@ impl SyncManager {
713 // 7. Wrap self in Arc<Mutex> for sharing with timer task 739 // 7. Wrap self in Arc<Mutex> for sharing with timer task
714 let sync_manager = Arc::new(Mutex::new(self)); 740 let sync_manager = Arc::new(Mutex::new(self));
715 741
716 // 8. Spawn daily timer task 742 // 8. Spawn daily timer task with shutdown receiver
717 let timer_manager = Arc::clone(&sync_manager); 743 let timer_manager = Arc::clone(&sync_manager);
744 let timer_shutdown = shutdown_tx.subscribe();
718 tokio::spawn(async move { 745 tokio::spawn(async move {
719 run_daily_timer(timer_manager).await; 746 run_daily_timer(timer_manager, timer_shutdown).await;
720 }); 747 });
721 748
722 // 9. Spawn disconnect checker task 749 // 9. Spawn disconnect checker task with shutdown receiver
723 let checker_manager = Arc::clone(&sync_manager); 750 let checker_manager = Arc::clone(&sync_manager);
751 let checker_shutdown = shutdown_tx.subscribe();
724 tokio::spawn(async move { 752 tokio::spawn(async move {
725 run_disconnect_checker(checker_manager).await; 753 run_disconnect_checker(checker_manager, checker_shutdown).await;
726 }); 754 });
727 755
728 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 756 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
@@ -1689,4 +1717,48 @@ impl SyncManager {
1689 1717
1690 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); 1718 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up");
1691 } 1719 }
1720
1721 /// Gracefully shutdown the SyncManager
1722 ///
1723 /// This method:
1724 /// - Sends shutdown signal to all background tasks (daily timer, disconnect checker)
1725 /// - Disconnects all relay connections
1726 /// - Clears all indices (relay_sync_index, pending_sync_index)
1727 ///
1728 /// After calling this method, the SyncManager is no longer usable.
1729 pub async fn shutdown(&mut self) {
1730 tracing::info!("Starting SyncManager shutdown");
1731
1732 // 1. Send shutdown signal to all background tasks
1733 if let Some(tx) = &self.shutdown_tx {
1734 let _ = tx.send(());
1735 tracing::debug!("Sent shutdown signal to background tasks");
1736 }
1737
1738 // 2. Disconnect all relay connections
1739 let relay_urls: Vec<String> = self.connections.keys().cloned().collect();
1740 for relay_url in relay_urls {
1741 if let Some(connection) = self.connections.remove(&relay_url) {
1742 tracing::debug!(relay = %relay_url, "Disconnecting relay");
1743 connection.disconnect().await;
1744 }
1745 }
1746
1747 // 3. Clear all indices
1748 {
1749 let mut index = self.relay_sync_index.write().await;
1750 let count = index.len();
1751 index.clear();
1752 tracing::debug!(count = count, "Cleared relay_sync_index");
1753 }
1754
1755 {
1756 let mut pending = self.pending_sync_index.write().await;
1757 let count = pending.len();
1758 pending.clear();
1759 tracing::debug!(count = count, "Cleared pending_sync_index");
1760 }
1761
1762 tracing::info!("SyncManager shutdown complete");
1763 }
1692} \ No newline at end of file 1764} \ No newline at end of file