upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:40:52 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:40:52 +0000
commit4f171fe53f24b54718a717a77b447175177e29a5 (patch)
treed3607ff2927d03501fbb5eae154ecfb8b5451474 /src/sync
parentea561062c0f08d608f48b6ccd6f8a4b8743b6e3b (diff)
sync: implement disconnect handler with state cleanup
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs153
1 files changed, 140 insertions, 13 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 9ac62ed..6e50eba 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -320,6 +320,13 @@ pub mod event_source {
320// SyncManager - Main Entry Point 320// SyncManager - Main Entry Point
321// ============================================================================= 321// =============================================================================
322 322
323/// Notification from spawned tasks about relay disconnections
324#[derive(Debug)]
325pub struct DisconnectNotification {
326 /// The relay URL that disconnected
327 pub relay_url: String,
328}
329
323/// Manages proactive synchronization with external relays 330/// Manages proactive synchronization with external relays
324/// 331///
325/// The SyncManager runs as a background task, subscribing to repository 332/// The SyncManager runs as a background task, subscribing to repository
@@ -388,7 +395,8 @@ impl SyncManager {
388 /// 1. Spawns self-subscriber to monitor own relay for announcements 395 /// 1. Spawns self-subscriber to monitor own relay for announcements
389 /// 2. Connects to bootstrap relay if configured 396 /// 2. Connects to bootstrap relay if configured
390 /// 3. Handles relay actions from self-subscriber 397 /// 3. Handles relay actions from self-subscriber
391 pub async fn run(self) { 398 /// 4. Handles disconnect notifications from spawned relay tasks
399 pub async fn run(mut self) {
392 use tokio::sync::mpsc; 400 use tokio::sync::mpsc;
393 401
394 tracing::info!( 402 tracing::info!(
@@ -400,7 +408,10 @@ impl SyncManager {
400 // 1. Create action channel for self-subscriber -> manager communication 408 // 1. Create action channel for self-subscriber -> manager communication
401 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); 409 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100);
402 410
403 // 2. Spawn self-subscriber 411 // 2. Create disconnect channel for spawned tasks -> manager communication
412 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100);
413
414 // 3. Spawn self-subscriber
404 let self_subscriber = SelfSubscriber::new( 415 let self_subscriber = SelfSubscriber::new(
405 format!("ws://{}", self.service_domain), 416 format!("ws://{}", self.service_domain),
406 self.service_domain.clone(), 417 self.service_domain.clone(),
@@ -409,12 +420,13 @@ impl SyncManager {
409 ); 420 );
410 tokio::spawn(async move { self_subscriber.run().await }); 421 tokio::spawn(async move { self_subscriber.run().await });
411 422
412 // 3. Connect to bootstrap relay if configured 423 // 4. Connect to bootstrap relay if configured
413 if let Some(ref bootstrap_url) = self.bootstrap_relay_url { 424 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
414 self.spawn_relay_connection(bootstrap_url.clone()).await; 425 self.spawn_relay_connection(bootstrap_url.clone(), disconnect_tx.clone())
426 .await;
415 } 427 }
416 428
417 // 4. Main loop - handle actions from self-subscriber 429 // 5. Main loop - handle actions from self-subscriber and disconnect notifications
418 loop { 430 loop {
419 tokio::select! { 431 tokio::select! {
420 action = action_rx.recv() => { 432 action = action_rx.recv() => {
@@ -427,7 +439,7 @@ impl SyncManager {
427 439
428 if !exists { 440 if !exists {
429 tracing::info!(relay = %relay_url, "Spawning new relay connection"); 441 tracing::info!(relay = %relay_url, "Spawning new relay connection");
430 self.spawn_relay_with_layer2(relay_url, repos).await; 442 self.spawn_relay_with_layer2(relay_url, repos, disconnect_tx.clone()).await;
431 } else { 443 } else {
432 tracing::debug!( 444 tracing::debug!(
433 relay = %relay_url, 445 relay = %relay_url,
@@ -449,8 +461,77 @@ impl SyncManager {
449 None => break, 461 None => break,
450 } 462 }
451 } 463 }
464 disconnect = disconnect_rx.recv() => {
465 match disconnect {
466 Some(notification) => {
467 self.handle_disconnect(&notification.relay_url).await;
468 }
469 None => {
470 // All disconnect senders dropped - unlikely but handle gracefully
471 tracing::debug!("Disconnect channel closed");
472 }
473 }
474 }
475 }
476 }
477 }
478
479 /// Handle a relay disconnection
480 ///
481 /// This method:
482 /// - Updates the RelayState in relay_sync_index to Disconnected status
483 /// - Sets disconnected_at timestamp
484 /// - Clears pending sync batches for this relay
485 /// - Removes the relay from active connections
486 /// - Records the failure in health tracker
487 async fn handle_disconnect(&mut self, relay_url: &str) {
488 tracing::warn!(relay = %relay_url, "Handling relay disconnect");
489
490 // 1. Update RelayState in relay_sync_index
491 {
492 let mut index = self.relay_sync_index.write().await;
493 if let Some(state) = index.get_mut(relay_url) {
494 state.connection_status = ConnectionStatus::Disconnected;
495 state.disconnected_at = Some(Timestamp::now());
496 tracing::info!(
497 relay = %relay_url,
498 repos_tracked = state.repos.len(),
499 "Relay state updated to disconnected"
500 );
501 } else {
502 tracing::debug!(
503 relay = %relay_url,
504 "No RelayState found for disconnected relay"
505 );
506 }
507 }
508
509 // 2. Clear pending sync batches for this relay
510 {
511 let mut pending = self.pending_sync_index.write().await;
512 if pending.remove(relay_url).is_some() {
513 tracing::debug!(
514 relay = %relay_url,
515 "Cleared pending sync batches for disconnected relay"
516 );
452 } 517 }
453 } 518 }
519
520 // 3. Remove from active connections
521 if self.connections.remove(relay_url).is_some() {
522 tracing::debug!(
523 relay = %relay_url,
524 "Removed relay from active connections"
525 );
526 }
527
528 // 4. Record failure in health tracker
529 self.health_tracker.record_failure(relay_url);
530 tracing::info!(
531 relay = %relay_url,
532 health_state = %self.health_tracker.get_state(relay_url),
533 "Relay disconnect handling complete"
534 );
454 } 535 }
455 536
456 /// Spawn relay connection with Layer 2 filters for specific repos 537 /// Spawn relay connection with Layer 2 filters for specific repos
@@ -462,6 +543,7 @@ impl SyncManager {
462 &self, 543 &self,
463 relay_url: String, 544 relay_url: String,
464 repos: HashMap<String, HashSet<EventId>>, 545 repos: HashMap<String, HashSet<EventId>>,
546 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>,
465 ) { 547 ) {
466 use crate::sync::filters::build_layer2_and_layer3_filters; 548 use crate::sync::filters::build_layer2_and_layer3_filters;
467 use tokio::sync::mpsc; 549 use tokio::sync::mpsc;
@@ -541,8 +623,24 @@ impl SyncManager {
541 RelayEvent::EndOfStoredEvents(_) => { 623 RelayEvent::EndOfStoredEvents(_) => {
542 tracing::debug!(relay = %relay_url_clone, "EOSE received"); 624 tracing::debug!(relay = %relay_url_clone, "EOSE received");
543 } 625 }
544 RelayEvent::Closed(_) | RelayEvent::Shutdown => { 626 RelayEvent::Closed(reason) => {
545 tracing::info!(relay = %relay_url_clone, "Relay disconnected"); 627 tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed");
628 // Notify SyncManager of disconnect
629 let _ = disconnect_tx
630 .send(DisconnectNotification {
631 relay_url: relay_url_clone.clone(),
632 })
633 .await;
634 break;
635 }
636 RelayEvent::Shutdown => {
637 tracing::info!(relay = %relay_url_clone, "Relay shutdown detected");
638 // Notify SyncManager of disconnect
639 let _ = disconnect_tx
640 .send(DisconnectNotification {
641 relay_url: relay_url_clone.clone(),
642 })
643 .await;
546 break; 644 break;
547 } 645 }
548 } 646 }
@@ -551,7 +649,11 @@ impl SyncManager {
551 } 649 }
552 650
553 /// Spawn a relay connection and start its event loop 651 /// Spawn a relay connection and start its event loop
554 async fn spawn_relay_connection(&self, relay_url: String) { 652 async fn spawn_relay_connection(
653 &self,
654 relay_url: String,
655 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>,
656 ) {
555 use tokio::sync::mpsc; 657 use tokio::sync::mpsc;
556 658
557 let database = Arc::clone(&self.database); 659 let database = Arc::clone(&self.database);
@@ -597,14 +699,39 @@ impl SyncManager {
597 while let Some(relay_event) = event_rx.recv().await { 699 while let Some(relay_event) = event_rx.recv().await {
598 match relay_event { 700 match relay_event {
599 RelayEvent::Event(event) => { 701 RelayEvent::Event(event) => {
600 Self::process_event_static(&event, &relay_url_clone, &database, &write_policy) 702 Self::process_event_static(
601 .await; 703 &event,
704 &relay_url_clone,
705 &database,
706 &write_policy,
707 )
708 .await;
602 } 709 }
603 RelayEvent::EndOfStoredEvents(_) => { 710 RelayEvent::EndOfStoredEvents(_) => {
604 tracing::debug!("EOSE from {}", relay_url_clone); 711 tracing::debug!("EOSE from {}", relay_url_clone);
605 } 712 }
606 RelayEvent::Closed(_) | RelayEvent::Shutdown => { 713 RelayEvent::Closed(reason) => {
607 tracing::info!("Relay {} disconnected", relay_url_clone); 714 tracing::info!(
715 relay = %relay_url_clone,
716 reason = %reason,
717 "Relay connection closed"
718 );
719 // Notify SyncManager of disconnect
720 let _ = disconnect_tx
721 .send(DisconnectNotification {
722 relay_url: relay_url_clone.clone(),
723 })
724 .await;
725 break;
726 }
727 RelayEvent::Shutdown => {
728 tracing::info!(relay = %relay_url_clone, "Relay shutdown detected");
729 // Notify SyncManager of disconnect
730 let _ = disconnect_tx
731 .send(DisconnectNotification {
732 relay_url: relay_url_clone.clone(),
733 })
734 .await;
608 break; 735 break;
609 } 736 }
610 } 737 }