diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:51:01 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:51:01 +0000 |
| commit | 8cfe8546e5ed1118adae6bfa041611e94d15c6dd (patch) | |
| tree | cf247943474cd95001c13b6e1eba0215a810f6dd /src/sync/self_subscriber.rs | |
| parent | c1730d5cafc3af2d5ec8f3bdbed5c32bb15fcb74 (diff) | |
sync: implement graceful shutdown for all tasks and connections
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 144 |
1 files changed, 103 insertions, 41 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 229d2e1..73cea2f 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -13,7 +13,7 @@ use std::time::Duration; | |||
| 13 | 13 | ||
| 14 | use nostr_sdk::prelude::*; | 14 | use nostr_sdk::prelude::*; |
| 15 | use nostr_sdk::Timestamp; | 15 | use nostr_sdk::Timestamp; |
| 16 | use tokio::sync::mpsc; | 16 | use tokio::sync::{broadcast, mpsc}; |
| 17 | 17 | ||
| 18 | use super::{RepoSyncIndex, RepoSyncNeeds}; | 18 | use super::{RepoSyncIndex, RepoSyncNeeds}; |
| 19 | 19 | ||
| @@ -209,7 +209,10 @@ impl SelfSubscriber { | |||
| 209 | /// | 209 | /// |
| 210 | /// Connects to own relay, subscribes to relevant event kinds, | 210 | /// Connects to own relay, subscribes to relevant event kinds, |
| 211 | /// and batches updates before processing them. | 211 | /// and batches updates before processing them. |
| 212 | pub async fn run(mut self) { | 212 | /// |
| 213 | /// The optional shutdown receiver allows graceful termination when | ||
| 214 | /// received via the broadcast channel. | ||
| 215 | pub async fn run(mut self, mut shutdown_rx: Option<broadcast::Receiver<()>>) { | ||
| 213 | let client = Client::default(); | 216 | let client = Client::default(); |
| 214 | 217 | ||
| 215 | // Add own relay | 218 | // Add own relay |
| @@ -281,53 +284,112 @@ impl SelfSubscriber { | |||
| 281 | timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | 284 | timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); |
| 282 | 285 | ||
| 283 | loop { | 286 | loop { |
| 284 | tokio::select! { | 287 | // Build the select based on whether we have a shutdown receiver |
| 285 | notification = notifications.recv() => { | 288 | if let Some(ref mut rx) = shutdown_rx { |
| 286 | match notification { | 289 | tokio::select! { |
| 287 | Ok(RelayPoolNotification::Event { event, .. }) => { | 290 | notification = notifications.recv() => { |
| 288 | // Only process 30617 events that list our relay | 291 | match notification { |
| 289 | if event.kind == Kind::Custom(30617) { | 292 | Ok(RelayPoolNotification::Event { event, .. }) => { |
| 290 | if !self.lists_our_relay(&event) { | 293 | // Only process 30617 events that list our relay |
| 291 | continue; | 294 | if event.kind == Kind::Custom(30617) { |
| 292 | } | 295 | if !self.lists_our_relay(&event) { |
| 293 | 296 | continue; | |
| 294 | // Extract repo ID and relays | 297 | } |
| 295 | if let Some(repo_id) = Self::extract_repo_id(&event) { | 298 | |
| 296 | let relays = Self::extract_relay_urls(&event); | 299 | // Extract repo ID and relays |
| 297 | let mut root_events = HashSet::new(); | 300 | if let Some(repo_id) = Self::extract_repo_id(&event) { |
| 298 | root_events.insert(event.id); | 301 | let relays = Self::extract_relay_urls(&event); |
| 299 | 302 | let mut root_events = HashSet::new(); | |
| 300 | pending.add_repo(repo_id, relays, root_events); | 303 | root_events.insert(event.id); |
| 301 | tracing::debug!( | 304 | |
| 305 | pending.add_repo(repo_id, relays, root_events); | ||
| 306 | tracing::debug!( | ||
| 307 | event_id = %event.id, | ||
| 308 | "Queued 30617 announcement for batch processing" | ||
| 309 | ); | ||
| 310 | } | ||
| 311 | } else { | ||
| 312 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 313 | // process them to update the RepoSyncIndex | ||
| 314 | tracing::trace!( | ||
| 315 | kind = %event.kind, | ||
| 302 | event_id = %event.id, | 316 | event_id = %event.id, |
| 303 | "Queued 30617 announcement for batch processing" | 317 | "Received root event" |
| 304 | ); | 318 | ); |
| 319 | self.handle_root_event(&event).await; | ||
| 305 | } | 320 | } |
| 306 | } else { | ||
| 307 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 308 | // process them to update the RepoSyncIndex | ||
| 309 | tracing::trace!( | ||
| 310 | kind = %event.kind, | ||
| 311 | event_id = %event.id, | ||
| 312 | "Received root event" | ||
| 313 | ); | ||
| 314 | self.handle_root_event(&event).await; | ||
| 315 | } | 321 | } |
| 322 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 323 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 324 | break; | ||
| 325 | } | ||
| 326 | Err(e) => { | ||
| 327 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 328 | break; | ||
| 329 | } | ||
| 330 | _ => {} | ||
| 316 | } | 331 | } |
| 317 | Ok(RelayPoolNotification::Shutdown) => { | 332 | } |
| 318 | tracing::info!("SelfSubscriber received shutdown notification"); | 333 | _ = timer.tick() => { |
| 319 | break; | 334 | if !pending.is_empty() { |
| 320 | } | 335 | self.process_batch(&mut pending).await; |
| 321 | Err(e) => { | ||
| 322 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 323 | break; | ||
| 324 | } | 336 | } |
| 325 | _ => {} | 337 | } |
| 338 | _ = rx.recv() => { | ||
| 339 | tracing::info!("SelfSubscriber received shutdown signal"); | ||
| 340 | break; | ||
| 326 | } | 341 | } |
| 327 | } | 342 | } |
| 328 | _ = timer.tick() => { | 343 | } else { |
| 329 | if !pending.is_empty() { | 344 | // No shutdown receiver - original behavior |
| 330 | self.process_batch(&mut pending).await; | 345 | tokio::select! { |
| 346 | notification = notifications.recv() => { | ||
| 347 | match notification { | ||
| 348 | Ok(RelayPoolNotification::Event { event, .. }) => { | ||
| 349 | // Only process 30617 events that list our relay | ||
| 350 | if event.kind == Kind::Custom(30617) { | ||
| 351 | if !self.lists_our_relay(&event) { | ||
| 352 | continue; | ||
| 353 | } | ||
| 354 | |||
| 355 | // Extract repo ID and relays | ||
| 356 | if let Some(repo_id) = Self::extract_repo_id(&event) { | ||
| 357 | let relays = Self::extract_relay_urls(&event); | ||
| 358 | let mut root_events = HashSet::new(); | ||
| 359 | root_events.insert(event.id); | ||
| 360 | |||
| 361 | pending.add_repo(repo_id, relays, root_events); | ||
| 362 | tracing::debug!( | ||
| 363 | event_id = %event.id, | ||
| 364 | "Queued 30617 announcement for batch processing" | ||
| 365 | ); | ||
| 366 | } | ||
| 367 | } else { | ||
| 368 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 369 | // process them to update the RepoSyncIndex | ||
| 370 | tracing::trace!( | ||
| 371 | kind = %event.kind, | ||
| 372 | event_id = %event.id, | ||
| 373 | "Received root event" | ||
| 374 | ); | ||
| 375 | self.handle_root_event(&event).await; | ||
| 376 | } | ||
| 377 | } | ||
| 378 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 379 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 380 | break; | ||
| 381 | } | ||
| 382 | Err(e) => { | ||
| 383 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 384 | break; | ||
| 385 | } | ||
| 386 | _ => {} | ||
| 387 | } | ||
| 388 | } | ||
| 389 | _ = timer.tick() => { | ||
| 390 | if !pending.is_empty() { | ||
| 391 | self.process_batch(&mut pending).await; | ||
| 392 | } | ||
| 331 | } | 393 | } |
| 332 | } | 394 | } |
| 333 | } | 395 | } |