upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:51:01 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:51:01 +0000
commit8cfe8546e5ed1118adae6bfa041611e94d15c6dd (patch)
treecf247943474cd95001c13b6e1eba0215a810f6dd /src/sync/self_subscriber.rs
parentc1730d5cafc3af2d5ec8f3bdbed5c32bb15fcb74 (diff)
sync: implement graceful shutdown for all tasks and connections
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs144
1 files changed, 103 insertions, 41 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 229d2e1..73cea2f 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -13,7 +13,7 @@ use std::time::Duration;
13 13
14use nostr_sdk::prelude::*; 14use nostr_sdk::prelude::*;
15use nostr_sdk::Timestamp; 15use nostr_sdk::Timestamp;
16use tokio::sync::mpsc; 16use tokio::sync::{broadcast, mpsc};
17 17
18use super::{RepoSyncIndex, RepoSyncNeeds}; 18use super::{RepoSyncIndex, RepoSyncNeeds};
19 19
@@ -209,7 +209,10 @@ impl SelfSubscriber {
209 /// 209 ///
210 /// Connects to own relay, subscribes to relevant event kinds, 210 /// Connects to own relay, subscribes to relevant event kinds,
211 /// and batches updates before processing them. 211 /// and batches updates before processing them.
212 pub async fn run(mut self) { 212 ///
213 /// The optional shutdown receiver allows graceful termination when
214 /// received via the broadcast channel.
215 pub async fn run(mut self, mut shutdown_rx: Option<broadcast::Receiver<()>>) {
213 let client = Client::default(); 216 let client = Client::default();
214 217
215 // Add own relay 218 // Add own relay
@@ -281,53 +284,112 @@ impl SelfSubscriber {
281 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 284 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
282 285
283 loop { 286 loop {
284 tokio::select! { 287 // Build the select based on whether we have a shutdown receiver
285 notification = notifications.recv() => { 288 if let Some(ref mut rx) = shutdown_rx {
286 match notification { 289 tokio::select! {
287 Ok(RelayPoolNotification::Event { event, .. }) => { 290 notification = notifications.recv() => {
288 // Only process 30617 events that list our relay 291 match notification {
289 if event.kind == Kind::Custom(30617) { 292 Ok(RelayPoolNotification::Event { event, .. }) => {
290 if !self.lists_our_relay(&event) { 293 // Only process 30617 events that list our relay
291 continue; 294 if event.kind == Kind::Custom(30617) {
292 } 295 if !self.lists_our_relay(&event) {
293 296 continue;
294 // Extract repo ID and relays 297 }
295 if let Some(repo_id) = Self::extract_repo_id(&event) { 298
296 let relays = Self::extract_relay_urls(&event); 299 // Extract repo ID and relays
297 let mut root_events = HashSet::new(); 300 if let Some(repo_id) = Self::extract_repo_id(&event) {
298 root_events.insert(event.id); 301 let relays = Self::extract_relay_urls(&event);
299 302 let mut root_events = HashSet::new();
300 pending.add_repo(repo_id, relays, root_events); 303 root_events.insert(event.id);
301 tracing::debug!( 304
305 pending.add_repo(repo_id, relays, root_events);
306 tracing::debug!(
307 event_id = %event.id,
308 "Queued 30617 announcement for batch processing"
309 );
310 }
311 } else {
312 // For root event kinds (1617, 1618, 1619, 1621),
313 // process them to update the RepoSyncIndex
314 tracing::trace!(
315 kind = %event.kind,
302 event_id = %event.id, 316 event_id = %event.id,
303 "Queued 30617 announcement for batch processing" 317 "Received root event"
304 ); 318 );
319 self.handle_root_event(&event).await;
305 } 320 }
306 } else {
307 // For root event kinds (1617, 1618, 1619, 1621),
308 // process them to update the RepoSyncIndex
309 tracing::trace!(
310 kind = %event.kind,
311 event_id = %event.id,
312 "Received root event"
313 );
314 self.handle_root_event(&event).await;
315 } 321 }
322 Ok(RelayPoolNotification::Shutdown) => {
323 tracing::info!("SelfSubscriber received shutdown notification");
324 break;
325 }
326 Err(e) => {
327 tracing::error!(error = %e, "Error receiving notification");
328 break;
329 }
330 _ => {}
316 } 331 }
317 Ok(RelayPoolNotification::Shutdown) => { 332 }
318 tracing::info!("SelfSubscriber received shutdown notification"); 333 _ = timer.tick() => {
319 break; 334 if !pending.is_empty() {
320 } 335 self.process_batch(&mut pending).await;
321 Err(e) => {
322 tracing::error!(error = %e, "Error receiving notification");
323 break;
324 } 336 }
325 _ => {} 337 }
338 _ = rx.recv() => {
339 tracing::info!("SelfSubscriber received shutdown signal");
340 break;
326 } 341 }
327 } 342 }
328 _ = timer.tick() => { 343 } else {
329 if !pending.is_empty() { 344 // No shutdown receiver - original behavior
330 self.process_batch(&mut pending).await; 345 tokio::select! {
346 notification = notifications.recv() => {
347 match notification {
348 Ok(RelayPoolNotification::Event { event, .. }) => {
349 // Only process 30617 events that list our relay
350 if event.kind == Kind::Custom(30617) {
351 if !self.lists_our_relay(&event) {
352 continue;
353 }
354
355 // Extract repo ID and relays
356 if let Some(repo_id) = Self::extract_repo_id(&event) {
357 let relays = Self::extract_relay_urls(&event);
358 let mut root_events = HashSet::new();
359 root_events.insert(event.id);
360
361 pending.add_repo(repo_id, relays, root_events);
362 tracing::debug!(
363 event_id = %event.id,
364 "Queued 30617 announcement for batch processing"
365 );
366 }
367 } else {
368 // For root event kinds (1617, 1618, 1619, 1621),
369 // process them to update the RepoSyncIndex
370 tracing::trace!(
371 kind = %event.kind,
372 event_id = %event.id,
373 "Received root event"
374 );
375 self.handle_root_event(&event).await;
376 }
377 }
378 Ok(RelayPoolNotification::Shutdown) => {
379 tracing::info!("SelfSubscriber received shutdown notification");
380 break;
381 }
382 Err(e) => {
383 tracing::error!(error = %e, "Error receiving notification");
384 break;
385 }
386 _ => {}
387 }
388 }
389 _ = timer.tick() => {
390 if !pending.is_empty() {
391 self.process_batch(&mut pending).await;
392 }
331 } 393 }
332 } 394 }
333 } 395 }