upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:51:01 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:51:01 +0000
commit8cfe8546e5ed1118adae6bfa041611e94d15c6dd (patch)
treecf247943474cd95001c13b6e1eba0215a810f6dd /src
parentc1730d5cafc3af2d5ec8f3bdbed5c32bb15fcb74 (diff)
sync: implement graceful shutdown for all tasks and connections
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs138
-rw-r--r--src/sync/self_subscriber.rs144
2 files changed, 208 insertions, 74 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 108ebe9..9a8857c 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -36,7 +36,7 @@ use std::time::Duration;
36 36
37use nostr_sdk::prelude::*; 37use nostr_sdk::prelude::*;
38use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; 38use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
39use tokio::sync::{Mutex, RwLock}; 39use tokio::sync::{broadcast, Mutex, RwLock};
40 40
41use crate::config::Config; 41use crate::config::Config;
42use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 42use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
@@ -369,7 +369,10 @@ const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30;
369/// - Re-discovers all repos and events from scratch 369/// - Re-discovers all repos and events from scratch
370/// 370///
371/// This detects state drift over time that might occur from missed events. 371/// This detects state drift over time that might occur from missed events.
372async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) { 372async fn run_daily_timer(
373 sync_manager: Arc<Mutex<SyncManager>>,
374 mut shutdown_rx: broadcast::Receiver<()>,
375) {
373 use rand::Rng; 376 use rand::Rng;
374 377
375 loop { 378 loop {
@@ -383,26 +386,33 @@ async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) {
383 hours 386 hours
384 ); 387 );
385 388
386 tokio::time::sleep(Duration::from_secs(seconds)).await; 389 tokio::select! {
387 390 _ = tokio::time::sleep(Duration::from_secs(seconds)) => {
388 // Get list of relays 391 // Timer fired - do daily sync
389 let relay_urls: Vec<String> = { 392 // Get list of relays
390 let manager = sync_manager.lock().await; 393 let relay_urls: Vec<String> = {
391 let index = manager.relay_sync_index.read().await; 394 let manager = sync_manager.lock().await;
392 let urls: Vec<String> = index.keys().cloned().collect(); 395 let index = manager.relay_sync_index.read().await;
393 drop(index); 396 let urls: Vec<String> = index.keys().cloned().collect();
394 urls 397 drop(index);
395 }; 398 urls
399 };
396 400
397 tracing::info!( 401 tracing::info!(
398 relay_count = relay_urls.len(), 402 relay_count = relay_urls.len(),
399 "Daily timer fired, starting daily sync for all relays" 403 "Daily timer fired, starting daily sync for all relays"
400 ); 404 );
401 405
402 // Trigger daily sync for each relay 406 // Trigger daily sync for each relay
403 for relay_url in relay_urls { 407 for relay_url in relay_urls {
404 let mut manager = sync_manager.lock().await; 408 let mut manager = sync_manager.lock().await;
405 manager.daily_sync(&relay_url).await; 409 manager.daily_sync(&relay_url).await;
410 }
411 }
412 _ = shutdown_rx.recv() => {
413 tracing::info!("Daily timer received shutdown signal");
414 break;
415 }
406 } 416 }
407 } 417 }
408} 418}
@@ -421,15 +431,23 @@ const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60;
421/// that are empty will be disconnected to free up resources. 431/// that are empty will be disconnected to free up resources.
422/// 432///
423/// Bootstrap relays are never disconnected, even if empty. 433/// Bootstrap relays are never disconnected, even if empty.
424async fn run_disconnect_checker(sync_manager: Arc<Mutex<SyncManager>>) { 434async fn run_disconnect_checker(
435 sync_manager: Arc<Mutex<SyncManager>>,
436 mut shutdown_rx: broadcast::Receiver<()>,
437) {
425 loop { 438 loop {
426 // Check every 60 seconds 439 tokio::select! {
427 tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)).await; 440 _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => {
428 441 tracing::debug!("Disconnect checker running");
429 tracing::debug!("Disconnect checker running");
430 442
431 let mut manager = sync_manager.lock().await; 443 let mut manager = sync_manager.lock().await;
432 manager.check_disconnects().await; 444 manager.check_disconnects().await;
445 }
446 _ = shutdown_rx.recv() => {
447 tracing::info!("Disconnect checker received shutdown signal");
448 break;
449 }
450 }
433 } 451 }
434} 452}
435 453
@@ -468,6 +486,8 @@ pub struct SyncManager {
468 eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>, 486 eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>,
469 /// Channel for connect notifications (set during run) 487 /// Channel for connect notifications (set during run)
470 connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>, 488 connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>,
489 /// Channel for broadcasting shutdown signal to all background tasks
490 shutdown_tx: Option<broadcast::Sender<()>>,
471} 491}
472 492
473impl SyncManager { 493impl SyncManager {
@@ -501,6 +521,7 @@ impl SyncManager {
501 disconnect_tx: None, 521 disconnect_tx: None,
502 eose_tx: None, 522 eose_tx: None,
503 connect_tx: None, 523 connect_tx: None,
524 shutdown_tx: None,
504 } 525 }
505 } 526 }
506 527
@@ -691,19 +712,24 @@ impl SyncManager {
691 // 4. Create connect channel for spawned tasks -> manager communication 712 // 4. Create connect channel for spawned tasks -> manager communication
692 let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100); 713 let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100);
693 714
694 // 5. Spawn self-subscriber 715 // 4b. Create shutdown broadcast channel for graceful shutdown
716 let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
717
718 // 5. Spawn self-subscriber with shutdown receiver
695 let self_subscriber = SelfSubscriber::new( 719 let self_subscriber = SelfSubscriber::new(
696 format!("ws://{}", self.service_domain), 720 format!("ws://{}", self.service_domain),
697 self.service_domain.clone(), 721 self.service_domain.clone(),
698 Arc::clone(&self.repo_sync_index), 722 Arc::clone(&self.repo_sync_index),
699 action_tx, 723 action_tx,
700 ); 724 );
701 tokio::spawn(async move { self_subscriber.run().await }); 725 let subscriber_shutdown = shutdown_tx.subscribe();
726 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });
702 727
703 // 5b. Store channel senders for use by handlers 728 // 5b. Store channel senders for use by handlers
704 self.disconnect_tx = Some(disconnect_tx.clone()); 729 self.disconnect_tx = Some(disconnect_tx.clone());
705 self.eose_tx = Some(eose_tx.clone()); 730 self.eose_tx = Some(eose_tx.clone());
706 self.connect_tx = Some(connect_tx.clone()); 731 self.connect_tx = Some(connect_tx.clone());
732 self.shutdown_tx = Some(shutdown_tx.clone());
707 733
708 // 6. Connect to bootstrap relay if configured 734 // 6. Connect to bootstrap relay if configured
709 if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { 735 if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() {
@@ -713,16 +739,18 @@ impl SyncManager {
713 // 7. Wrap self in Arc<Mutex> for sharing with timer task 739 // 7. Wrap self in Arc<Mutex> for sharing with timer task
714 let sync_manager = Arc::new(Mutex::new(self)); 740 let sync_manager = Arc::new(Mutex::new(self));
715 741
716 // 8. Spawn daily timer task 742 // 8. Spawn daily timer task with shutdown receiver
717 let timer_manager = Arc::clone(&sync_manager); 743 let timer_manager = Arc::clone(&sync_manager);
744 let timer_shutdown = shutdown_tx.subscribe();
718 tokio::spawn(async move { 745 tokio::spawn(async move {
719 run_daily_timer(timer_manager).await; 746 run_daily_timer(timer_manager, timer_shutdown).await;
720 }); 747 });
721 748
722 // 9. Spawn disconnect checker task 749 // 9. Spawn disconnect checker task with shutdown receiver
723 let checker_manager = Arc::clone(&sync_manager); 750 let checker_manager = Arc::clone(&sync_manager);
751 let checker_shutdown = shutdown_tx.subscribe();
724 tokio::spawn(async move { 752 tokio::spawn(async move {
725 run_disconnect_checker(checker_manager).await; 753 run_disconnect_checker(checker_manager, checker_shutdown).await;
726 }); 754 });
727 755
728 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 756 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
@@ -1689,4 +1717,48 @@ impl SyncManager {
1689 1717
1690 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); 1718 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up");
1691 } 1719 }
1720
1721 /// Gracefully shutdown the SyncManager
1722 ///
1723 /// This method:
1724 /// - Sends shutdown signal to all background tasks (daily timer, disconnect checker)
1725 /// - Disconnects all relay connections
1726 /// - Clears all indices (relay_sync_index, pending_sync_index)
1727 ///
1728 /// After calling this method, the SyncManager is no longer usable.
1729 pub async fn shutdown(&mut self) {
1730 tracing::info!("Starting SyncManager shutdown");
1731
1732 // 1. Send shutdown signal to all background tasks
1733 if let Some(tx) = &self.shutdown_tx {
1734 let _ = tx.send(());
1735 tracing::debug!("Sent shutdown signal to background tasks");
1736 }
1737
1738 // 2. Disconnect all relay connections
1739 let relay_urls: Vec<String> = self.connections.keys().cloned().collect();
1740 for relay_url in relay_urls {
1741 if let Some(connection) = self.connections.remove(&relay_url) {
1742 tracing::debug!(relay = %relay_url, "Disconnecting relay");
1743 connection.disconnect().await;
1744 }
1745 }
1746
1747 // 3. Clear all indices
1748 {
1749 let mut index = self.relay_sync_index.write().await;
1750 let count = index.len();
1751 index.clear();
1752 tracing::debug!(count = count, "Cleared relay_sync_index");
1753 }
1754
1755 {
1756 let mut pending = self.pending_sync_index.write().await;
1757 let count = pending.len();
1758 pending.clear();
1759 tracing::debug!(count = count, "Cleared pending_sync_index");
1760 }
1761
1762 tracing::info!("SyncManager shutdown complete");
1763 }
1692} \ No newline at end of file 1764} \ No newline at end of file
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 229d2e1..73cea2f 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -13,7 +13,7 @@ use std::time::Duration;
13 13
14use nostr_sdk::prelude::*; 14use nostr_sdk::prelude::*;
15use nostr_sdk::Timestamp; 15use nostr_sdk::Timestamp;
16use tokio::sync::mpsc; 16use tokio::sync::{broadcast, mpsc};
17 17
18use super::{RepoSyncIndex, RepoSyncNeeds}; 18use super::{RepoSyncIndex, RepoSyncNeeds};
19 19
@@ -209,7 +209,10 @@ impl SelfSubscriber {
209 /// 209 ///
210 /// Connects to own relay, subscribes to relevant event kinds, 210 /// Connects to own relay, subscribes to relevant event kinds,
211 /// and batches updates before processing them. 211 /// and batches updates before processing them.
212 pub async fn run(mut self) { 212 ///
213 /// The optional shutdown receiver allows graceful termination when
214 /// received via the broadcast channel.
215 pub async fn run(mut self, mut shutdown_rx: Option<broadcast::Receiver<()>>) {
213 let client = Client::default(); 216 let client = Client::default();
214 217
215 // Add own relay 218 // Add own relay
@@ -281,53 +284,112 @@ impl SelfSubscriber {
281 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 284 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
282 285
283 loop { 286 loop {
284 tokio::select! { 287 // Build the select based on whether we have a shutdown receiver
285 notification = notifications.recv() => { 288 if let Some(ref mut rx) = shutdown_rx {
286 match notification { 289 tokio::select! {
287 Ok(RelayPoolNotification::Event { event, .. }) => { 290 notification = notifications.recv() => {
288 // Only process 30617 events that list our relay 291 match notification {
289 if event.kind == Kind::Custom(30617) { 292 Ok(RelayPoolNotification::Event { event, .. }) => {
290 if !self.lists_our_relay(&event) { 293 // Only process 30617 events that list our relay
291 continue; 294 if event.kind == Kind::Custom(30617) {
292 } 295 if !self.lists_our_relay(&event) {
293 296 continue;
294 // Extract repo ID and relays 297 }
295 if let Some(repo_id) = Self::extract_repo_id(&event) { 298
296 let relays = Self::extract_relay_urls(&event); 299 // Extract repo ID and relays
297 let mut root_events = HashSet::new(); 300 if let Some(repo_id) = Self::extract_repo_id(&event) {
298 root_events.insert(event.id); 301 let relays = Self::extract_relay_urls(&event);
299 302 let mut root_events = HashSet::new();
300 pending.add_repo(repo_id, relays, root_events); 303 root_events.insert(event.id);
301 tracing::debug!( 304
305 pending.add_repo(repo_id, relays, root_events);
306 tracing::debug!(
307 event_id = %event.id,
308 "Queued 30617 announcement for batch processing"
309 );
310 }
311 } else {
312 // For root event kinds (1617, 1618, 1619, 1621),
313 // process them to update the RepoSyncIndex
314 tracing::trace!(
315 kind = %event.kind,
302 event_id = %event.id, 316 event_id = %event.id,
303 "Queued 30617 announcement for batch processing" 317 "Received root event"
304 ); 318 );
319 self.handle_root_event(&event).await;
305 } 320 }
306 } else {
307 // For root event kinds (1617, 1618, 1619, 1621),
308 // process them to update the RepoSyncIndex
309 tracing::trace!(
310 kind = %event.kind,
311 event_id = %event.id,
312 "Received root event"
313 );
314 self.handle_root_event(&event).await;
315 } 321 }
322 Ok(RelayPoolNotification::Shutdown) => {
323 tracing::info!("SelfSubscriber received shutdown notification");
324 break;
325 }
326 Err(e) => {
327 tracing::error!(error = %e, "Error receiving notification");
328 break;
329 }
330 _ => {}
316 } 331 }
317 Ok(RelayPoolNotification::Shutdown) => { 332 }
318 tracing::info!("SelfSubscriber received shutdown notification"); 333 _ = timer.tick() => {
319 break; 334 if !pending.is_empty() {
320 } 335 self.process_batch(&mut pending).await;
321 Err(e) => {
322 tracing::error!(error = %e, "Error receiving notification");
323 break;
324 } 336 }
325 _ => {} 337 }
338 _ = rx.recv() => {
339 tracing::info!("SelfSubscriber received shutdown signal");
340 break;
326 } 341 }
327 } 342 }
328 _ = timer.tick() => { 343 } else {
329 if !pending.is_empty() { 344 // No shutdown receiver - original behavior
330 self.process_batch(&mut pending).await; 345 tokio::select! {
346 notification = notifications.recv() => {
347 match notification {
348 Ok(RelayPoolNotification::Event { event, .. }) => {
349 // Only process 30617 events that list our relay
350 if event.kind == Kind::Custom(30617) {
351 if !self.lists_our_relay(&event) {
352 continue;
353 }
354
355 // Extract repo ID and relays
356 if let Some(repo_id) = Self::extract_repo_id(&event) {
357 let relays = Self::extract_relay_urls(&event);
358 let mut root_events = HashSet::new();
359 root_events.insert(event.id);
360
361 pending.add_repo(repo_id, relays, root_events);
362 tracing::debug!(
363 event_id = %event.id,
364 "Queued 30617 announcement for batch processing"
365 );
366 }
367 } else {
368 // For root event kinds (1617, 1618, 1619, 1621),
369 // process them to update the RepoSyncIndex
370 tracing::trace!(
371 kind = %event.kind,
372 event_id = %event.id,
373 "Received root event"
374 );
375 self.handle_root_event(&event).await;
376 }
377 }
378 Ok(RelayPoolNotification::Shutdown) => {
379 tracing::info!("SelfSubscriber received shutdown notification");
380 break;
381 }
382 Err(e) => {
383 tracing::error!(error = %e, "Error receiving notification");
384 break;
385 }
386 _ => {}
387 }
388 }
389 _ = timer.tick() => {
390 if !pending.is_empty() {
391 self.process_batch(&mut pending).await;
392 }
331 } 393 }
332 } 394 }
333 } 395 }