diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:51:01 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:51:01 +0000 |
| commit | 8cfe8546e5ed1118adae6bfa041611e94d15c6dd (patch) | |
| tree | cf247943474cd95001c13b6e1eba0215a810f6dd /src | |
| parent | c1730d5cafc3af2d5ec8f3bdbed5c32bb15fcb74 (diff) | |
sync: implement graceful shutdown for all tasks and connections
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/mod.rs | 138 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 144 |
2 files changed, 208 insertions, 74 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 108ebe9..9a8857c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -36,7 +36,7 @@ use std::time::Duration; | |||
| 36 | 36 | ||
| 37 | use nostr_sdk::prelude::*; | 37 | use nostr_sdk::prelude::*; |
| 38 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; | 38 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; |
| 39 | use tokio::sync::{Mutex, RwLock}; | 39 | use tokio::sync::{broadcast, Mutex, RwLock}; |
| 40 | 40 | ||
| 41 | use crate::config::Config; | 41 | use crate::config::Config; |
| 42 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 42 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| @@ -369,7 +369,10 @@ const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; | |||
| 369 | /// - Re-discovers all repos and events from scratch | 369 | /// - Re-discovers all repos and events from scratch |
| 370 | /// | 370 | /// |
| 371 | /// This detects state drift over time that might occur from missed events. | 371 | /// This detects state drift over time that might occur from missed events. |
| 372 | async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) { | 372 | async fn run_daily_timer( |
| 373 | sync_manager: Arc<Mutex<SyncManager>>, | ||
| 374 | mut shutdown_rx: broadcast::Receiver<()>, | ||
| 375 | ) { | ||
| 373 | use rand::Rng; | 376 | use rand::Rng; |
| 374 | 377 | ||
| 375 | loop { | 378 | loop { |
| @@ -383,26 +386,33 @@ async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) { | |||
| 383 | hours | 386 | hours |
| 384 | ); | 387 | ); |
| 385 | 388 | ||
| 386 | tokio::time::sleep(Duration::from_secs(seconds)).await; | 389 | tokio::select! { |
| 387 | 390 | _ = tokio::time::sleep(Duration::from_secs(seconds)) => { | |
| 388 | // Get list of relays | 391 | // Timer fired - do daily sync |
| 389 | let relay_urls: Vec<String> = { | 392 | // Get list of relays |
| 390 | let manager = sync_manager.lock().await; | 393 | let relay_urls: Vec<String> = { |
| 391 | let index = manager.relay_sync_index.read().await; | 394 | let manager = sync_manager.lock().await; |
| 392 | let urls: Vec<String> = index.keys().cloned().collect(); | 395 | let index = manager.relay_sync_index.read().await; |
| 393 | drop(index); | 396 | let urls: Vec<String> = index.keys().cloned().collect(); |
| 394 | urls | 397 | drop(index); |
| 395 | }; | 398 | urls |
| 399 | }; | ||
| 396 | 400 | ||
| 397 | tracing::info!( | 401 | tracing::info!( |
| 398 | relay_count = relay_urls.len(), | 402 | relay_count = relay_urls.len(), |
| 399 | "Daily timer fired, starting daily sync for all relays" | 403 | "Daily timer fired, starting daily sync for all relays" |
| 400 | ); | 404 | ); |
| 401 | 405 | ||
| 402 | // Trigger daily sync for each relay | 406 | // Trigger daily sync for each relay |
| 403 | for relay_url in relay_urls { | 407 | for relay_url in relay_urls { |
| 404 | let mut manager = sync_manager.lock().await; | 408 | let mut manager = sync_manager.lock().await; |
| 405 | manager.daily_sync(&relay_url).await; | 409 | manager.daily_sync(&relay_url).await; |
| 410 | } | ||
| 411 | } | ||
| 412 | _ = shutdown_rx.recv() => { | ||
| 413 | tracing::info!("Daily timer received shutdown signal"); | ||
| 414 | break; | ||
| 415 | } | ||
| 406 | } | 416 | } |
| 407 | } | 417 | } |
| 408 | } | 418 | } |
| @@ -421,15 +431,23 @@ const DISCONNECT_CHECK_INTERVAL_SECS: u64 = 60; | |||
| 421 | /// that are empty will be disconnected to free up resources. | 431 | /// that are empty will be disconnected to free up resources. |
| 422 | /// | 432 | /// |
| 423 | /// Bootstrap relays are never disconnected, even if empty. | 433 | /// Bootstrap relays are never disconnected, even if empty. |
| 424 | async fn run_disconnect_checker(sync_manager: Arc<Mutex<SyncManager>>) { | 434 | async fn run_disconnect_checker( |
| 435 | sync_manager: Arc<Mutex<SyncManager>>, | ||
| 436 | mut shutdown_rx: broadcast::Receiver<()>, | ||
| 437 | ) { | ||
| 425 | loop { | 438 | loop { |
| 426 | // Check every 60 seconds | 439 | tokio::select! { |
| 427 | tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)).await; | 440 | _ = tokio::time::sleep(Duration::from_secs(DISCONNECT_CHECK_INTERVAL_SECS)) => { |
| 428 | 441 | tracing::debug!("Disconnect checker running"); | |
| 429 | tracing::debug!("Disconnect checker running"); | ||
| 430 | 442 | ||
| 431 | let mut manager = sync_manager.lock().await; | 443 | let mut manager = sync_manager.lock().await; |
| 432 | manager.check_disconnects().await; | 444 | manager.check_disconnects().await; |
| 445 | } | ||
| 446 | _ = shutdown_rx.recv() => { | ||
| 447 | tracing::info!("Disconnect checker received shutdown signal"); | ||
| 448 | break; | ||
| 449 | } | ||
| 450 | } | ||
| 433 | } | 451 | } |
| 434 | } | 452 | } |
| 435 | 453 | ||
| @@ -468,6 +486,8 @@ pub struct SyncManager { | |||
| 468 | eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>, | 486 | eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>, |
| 469 | /// Channel for connect notifications (set during run) | 487 | /// Channel for connect notifications (set during run) |
| 470 | connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>, | 488 | connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>, |
| 489 | /// Channel for broadcasting shutdown signal to all background tasks | ||
| 490 | shutdown_tx: Option<broadcast::Sender<()>>, | ||
| 471 | } | 491 | } |
| 472 | 492 | ||
| 473 | impl SyncManager { | 493 | impl SyncManager { |
| @@ -501,6 +521,7 @@ impl SyncManager { | |||
| 501 | disconnect_tx: None, | 521 | disconnect_tx: None, |
| 502 | eose_tx: None, | 522 | eose_tx: None, |
| 503 | connect_tx: None, | 523 | connect_tx: None, |
| 524 | shutdown_tx: None, | ||
| 504 | } | 525 | } |
| 505 | } | 526 | } |
| 506 | 527 | ||
| @@ -691,19 +712,24 @@ impl SyncManager { | |||
| 691 | // 4. Create connect channel for spawned tasks -> manager communication | 712 | // 4. Create connect channel for spawned tasks -> manager communication |
| 692 | let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100); | 713 | let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100); |
| 693 | 714 | ||
| 694 | // 5. Spawn self-subscriber | 715 | // 4b. Create shutdown broadcast channel for graceful shutdown |
| 716 | let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); | ||
| 717 | |||
| 718 | // 5. Spawn self-subscriber with shutdown receiver | ||
| 695 | let self_subscriber = SelfSubscriber::new( | 719 | let self_subscriber = SelfSubscriber::new( |
| 696 | format!("ws://{}", self.service_domain), | 720 | format!("ws://{}", self.service_domain), |
| 697 | self.service_domain.clone(), | 721 | self.service_domain.clone(), |
| 698 | Arc::clone(&self.repo_sync_index), | 722 | Arc::clone(&self.repo_sync_index), |
| 699 | action_tx, | 723 | action_tx, |
| 700 | ); | 724 | ); |
| 701 | tokio::spawn(async move { self_subscriber.run().await }); | 725 | let subscriber_shutdown = shutdown_tx.subscribe(); |
| 726 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); | ||
| 702 | 727 | ||
| 703 | // 5b. Store channel senders for use by handlers | 728 | // 5b. Store channel senders for use by handlers |
| 704 | self.disconnect_tx = Some(disconnect_tx.clone()); | 729 | self.disconnect_tx = Some(disconnect_tx.clone()); |
| 705 | self.eose_tx = Some(eose_tx.clone()); | 730 | self.eose_tx = Some(eose_tx.clone()); |
| 706 | self.connect_tx = Some(connect_tx.clone()); | 731 | self.connect_tx = Some(connect_tx.clone()); |
| 732 | self.shutdown_tx = Some(shutdown_tx.clone()); | ||
| 707 | 733 | ||
| 708 | // 6. Connect to bootstrap relay if configured | 734 | // 6. Connect to bootstrap relay if configured |
| 709 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { | 735 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { |
| @@ -713,16 +739,18 @@ impl SyncManager { | |||
| 713 | // 7. Wrap self in Arc<Mutex> for sharing with timer task | 739 | // 7. Wrap self in Arc<Mutex> for sharing with timer task |
| 714 | let sync_manager = Arc::new(Mutex::new(self)); | 740 | let sync_manager = Arc::new(Mutex::new(self)); |
| 715 | 741 | ||
| 716 | // 8. Spawn daily timer task | 742 | // 8. Spawn daily timer task with shutdown receiver |
| 717 | let timer_manager = Arc::clone(&sync_manager); | 743 | let timer_manager = Arc::clone(&sync_manager); |
| 744 | let timer_shutdown = shutdown_tx.subscribe(); | ||
| 718 | tokio::spawn(async move { | 745 | tokio::spawn(async move { |
| 719 | run_daily_timer(timer_manager).await; | 746 | run_daily_timer(timer_manager, timer_shutdown).await; |
| 720 | }); | 747 | }); |
| 721 | 748 | ||
| 722 | // 9. Spawn disconnect checker task | 749 | // 9. Spawn disconnect checker task with shutdown receiver |
| 723 | let checker_manager = Arc::clone(&sync_manager); | 750 | let checker_manager = Arc::clone(&sync_manager); |
| 751 | let checker_shutdown = shutdown_tx.subscribe(); | ||
| 724 | tokio::spawn(async move { | 752 | tokio::spawn(async move { |
| 725 | run_disconnect_checker(checker_manager).await; | 753 | run_disconnect_checker(checker_manager, checker_shutdown).await; |
| 726 | }); | 754 | }); |
| 727 | 755 | ||
| 728 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 756 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| @@ -1689,4 +1717,48 @@ impl SyncManager { | |||
| 1689 | 1717 | ||
| 1690 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); | 1718 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); |
| 1691 | } | 1719 | } |
| 1720 | |||
| 1721 | /// Gracefully shutdown the SyncManager | ||
| 1722 | /// | ||
| 1723 | /// This method: | ||
| 1724 | /// - Sends shutdown signal to all background tasks (daily timer, disconnect checker) | ||
| 1725 | /// - Disconnects all relay connections | ||
| 1726 | /// - Clears all indices (relay_sync_index, pending_sync_index) | ||
| 1727 | /// | ||
| 1728 | /// After calling this method, the SyncManager is no longer usable. | ||
| 1729 | pub async fn shutdown(&mut self) { | ||
| 1730 | tracing::info!("Starting SyncManager shutdown"); | ||
| 1731 | |||
| 1732 | // 1. Send shutdown signal to all background tasks | ||
| 1733 | if let Some(tx) = &self.shutdown_tx { | ||
| 1734 | let _ = tx.send(()); | ||
| 1735 | tracing::debug!("Sent shutdown signal to background tasks"); | ||
| 1736 | } | ||
| 1737 | |||
| 1738 | // 2. Disconnect all relay connections | ||
| 1739 | let relay_urls: Vec<String> = self.connections.keys().cloned().collect(); | ||
| 1740 | for relay_url in relay_urls { | ||
| 1741 | if let Some(connection) = self.connections.remove(&relay_url) { | ||
| 1742 | tracing::debug!(relay = %relay_url, "Disconnecting relay"); | ||
| 1743 | connection.disconnect().await; | ||
| 1744 | } | ||
| 1745 | } | ||
| 1746 | |||
| 1747 | // 3. Clear all indices | ||
| 1748 | { | ||
| 1749 | let mut index = self.relay_sync_index.write().await; | ||
| 1750 | let count = index.len(); | ||
| 1751 | index.clear(); | ||
| 1752 | tracing::debug!(count = count, "Cleared relay_sync_index"); | ||
| 1753 | } | ||
| 1754 | |||
| 1755 | { | ||
| 1756 | let mut pending = self.pending_sync_index.write().await; | ||
| 1757 | let count = pending.len(); | ||
| 1758 | pending.clear(); | ||
| 1759 | tracing::debug!(count = count, "Cleared pending_sync_index"); | ||
| 1760 | } | ||
| 1761 | |||
| 1762 | tracing::info!("SyncManager shutdown complete"); | ||
| 1763 | } | ||
| 1692 | } \ No newline at end of file | 1764 | } \ No newline at end of file |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 229d2e1..73cea2f 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -13,7 +13,7 @@ use std::time::Duration; | |||
| 13 | 13 | ||
| 14 | use nostr_sdk::prelude::*; | 14 | use nostr_sdk::prelude::*; |
| 15 | use nostr_sdk::Timestamp; | 15 | use nostr_sdk::Timestamp; |
| 16 | use tokio::sync::mpsc; | 16 | use tokio::sync::{broadcast, mpsc}; |
| 17 | 17 | ||
| 18 | use super::{RepoSyncIndex, RepoSyncNeeds}; | 18 | use super::{RepoSyncIndex, RepoSyncNeeds}; |
| 19 | 19 | ||
| @@ -209,7 +209,10 @@ impl SelfSubscriber { | |||
| 209 | /// | 209 | /// |
| 210 | /// Connects to own relay, subscribes to relevant event kinds, | 210 | /// Connects to own relay, subscribes to relevant event kinds, |
| 211 | /// and batches updates before processing them. | 211 | /// and batches updates before processing them. |
| 212 | pub async fn run(mut self) { | 212 | /// |
| 213 | /// The optional shutdown receiver allows graceful termination when | ||
| 214 | /// received via the broadcast channel. | ||
| 215 | pub async fn run(mut self, mut shutdown_rx: Option<broadcast::Receiver<()>>) { | ||
| 213 | let client = Client::default(); | 216 | let client = Client::default(); |
| 214 | 217 | ||
| 215 | // Add own relay | 218 | // Add own relay |
| @@ -281,53 +284,112 @@ impl SelfSubscriber { | |||
| 281 | timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | 284 | timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); |
| 282 | 285 | ||
| 283 | loop { | 286 | loop { |
| 284 | tokio::select! { | 287 | // Build the select based on whether we have a shutdown receiver |
| 285 | notification = notifications.recv() => { | 288 | if let Some(ref mut rx) = shutdown_rx { |
| 286 | match notification { | 289 | tokio::select! { |
| 287 | Ok(RelayPoolNotification::Event { event, .. }) => { | 290 | notification = notifications.recv() => { |
| 288 | // Only process 30617 events that list our relay | 291 | match notification { |
| 289 | if event.kind == Kind::Custom(30617) { | 292 | Ok(RelayPoolNotification::Event { event, .. }) => { |
| 290 | if !self.lists_our_relay(&event) { | 293 | // Only process 30617 events that list our relay |
| 291 | continue; | 294 | if event.kind == Kind::Custom(30617) { |
| 292 | } | 295 | if !self.lists_our_relay(&event) { |
| 293 | 296 | continue; | |
| 294 | // Extract repo ID and relays | 297 | } |
| 295 | if let Some(repo_id) = Self::extract_repo_id(&event) { | 298 | |
| 296 | let relays = Self::extract_relay_urls(&event); | 299 | // Extract repo ID and relays |
| 297 | let mut root_events = HashSet::new(); | 300 | if let Some(repo_id) = Self::extract_repo_id(&event) { |
| 298 | root_events.insert(event.id); | 301 | let relays = Self::extract_relay_urls(&event); |
| 299 | 302 | let mut root_events = HashSet::new(); | |
| 300 | pending.add_repo(repo_id, relays, root_events); | 303 | root_events.insert(event.id); |
| 301 | tracing::debug!( | 304 | |
| 305 | pending.add_repo(repo_id, relays, root_events); | ||
| 306 | tracing::debug!( | ||
| 307 | event_id = %event.id, | ||
| 308 | "Queued 30617 announcement for batch processing" | ||
| 309 | ); | ||
| 310 | } | ||
| 311 | } else { | ||
| 312 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 313 | // process them to update the RepoSyncIndex | ||
| 314 | tracing::trace!( | ||
| 315 | kind = %event.kind, | ||
| 302 | event_id = %event.id, | 316 | event_id = %event.id, |
| 303 | "Queued 30617 announcement for batch processing" | 317 | "Received root event" |
| 304 | ); | 318 | ); |
| 319 | self.handle_root_event(&event).await; | ||
| 305 | } | 320 | } |
| 306 | } else { | ||
| 307 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 308 | // process them to update the RepoSyncIndex | ||
| 309 | tracing::trace!( | ||
| 310 | kind = %event.kind, | ||
| 311 | event_id = %event.id, | ||
| 312 | "Received root event" | ||
| 313 | ); | ||
| 314 | self.handle_root_event(&event).await; | ||
| 315 | } | 321 | } |
| 322 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 323 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 324 | break; | ||
| 325 | } | ||
| 326 | Err(e) => { | ||
| 327 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 328 | break; | ||
| 329 | } | ||
| 330 | _ => {} | ||
| 316 | } | 331 | } |
| 317 | Ok(RelayPoolNotification::Shutdown) => { | 332 | } |
| 318 | tracing::info!("SelfSubscriber received shutdown notification"); | 333 | _ = timer.tick() => { |
| 319 | break; | 334 | if !pending.is_empty() { |
| 320 | } | 335 | self.process_batch(&mut pending).await; |
| 321 | Err(e) => { | ||
| 322 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 323 | break; | ||
| 324 | } | 336 | } |
| 325 | _ => {} | 337 | } |
| 338 | _ = rx.recv() => { | ||
| 339 | tracing::info!("SelfSubscriber received shutdown signal"); | ||
| 340 | break; | ||
| 326 | } | 341 | } |
| 327 | } | 342 | } |
| 328 | _ = timer.tick() => { | 343 | } else { |
| 329 | if !pending.is_empty() { | 344 | // No shutdown receiver - original behavior |
| 330 | self.process_batch(&mut pending).await; | 345 | tokio::select! { |
| 346 | notification = notifications.recv() => { | ||
| 347 | match notification { | ||
| 348 | Ok(RelayPoolNotification::Event { event, .. }) => { | ||
| 349 | // Only process 30617 events that list our relay | ||
| 350 | if event.kind == Kind::Custom(30617) { | ||
| 351 | if !self.lists_our_relay(&event) { | ||
| 352 | continue; | ||
| 353 | } | ||
| 354 | |||
| 355 | // Extract repo ID and relays | ||
| 356 | if let Some(repo_id) = Self::extract_repo_id(&event) { | ||
| 357 | let relays = Self::extract_relay_urls(&event); | ||
| 358 | let mut root_events = HashSet::new(); | ||
| 359 | root_events.insert(event.id); | ||
| 360 | |||
| 361 | pending.add_repo(repo_id, relays, root_events); | ||
| 362 | tracing::debug!( | ||
| 363 | event_id = %event.id, | ||
| 364 | "Queued 30617 announcement for batch processing" | ||
| 365 | ); | ||
| 366 | } | ||
| 367 | } else { | ||
| 368 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 369 | // process them to update the RepoSyncIndex | ||
| 370 | tracing::trace!( | ||
| 371 | kind = %event.kind, | ||
| 372 | event_id = %event.id, | ||
| 373 | "Received root event" | ||
| 374 | ); | ||
| 375 | self.handle_root_event(&event).await; | ||
| 376 | } | ||
| 377 | } | ||
| 378 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 379 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 380 | break; | ||
| 381 | } | ||
| 382 | Err(e) => { | ||
| 383 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 384 | break; | ||
| 385 | } | ||
| 386 | _ => {} | ||
| 387 | } | ||
| 388 | } | ||
| 389 | _ = timer.tick() => { | ||
| 390 | if !pending.is_empty() { | ||
| 391 | self.process_batch(&mut pending).await; | ||
| 392 | } | ||
| 331 | } | 393 | } |
| 332 | } | 394 | } |
| 333 | } | 395 | } |