upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:49:29 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:49:29 +0000
commit4d21292f7fae6cf50608af0e15051ba2369472c7 (patch)
treea27be4fc33cc5a5801abbad570cd16d17871e396 /src/sync/mod.rs
parent3b65f541b4a3891824c61148d159c1b311e086e8 (diff)
sync: implement unified connect/reconnect with since filters
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs265
1 files changed, 262 insertions, 3 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 449e4ec..8e0b8e1 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -336,6 +336,16 @@ pub struct EoseNotification {
336 pub sub_id: SubscriptionId, 336 pub sub_id: SubscriptionId,
337} 337}
338 338
339/// Notification from spawned tasks about successful connection
340#[derive(Debug)]
341pub struct ConnectNotification {
342 /// The relay URL that connected
343 pub relay_url: String,
344}
345
346/// Quick reconnect window in seconds (15 minutes)
347const QUICK_RECONNECT_WINDOW_SECS: u64 = 15 * 60;
348
339/// Manages proactive synchronization with external relays 349/// Manages proactive synchronization with external relays
340/// 350///
341/// The SyncManager runs as a background task, subscribing to repository 351/// The SyncManager runs as a background task, subscribing to repository
@@ -526,7 +536,10 @@ impl SyncManager {
526 // 3. Create EOSE channel for spawned tasks -> manager communication 536 // 3. Create EOSE channel for spawned tasks -> manager communication
527 let (eose_tx, mut eose_rx) = mpsc::channel::<EoseNotification>(100); 537 let (eose_tx, mut eose_rx) = mpsc::channel::<EoseNotification>(100);
528 538
529 // 4. Spawn self-subscriber 539 // 4. Create connect channel for spawned tasks -> manager communication
540 let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100);
541
542 // 5. Spawn self-subscriber
530 let self_subscriber = SelfSubscriber::new( 543 let self_subscriber = SelfSubscriber::new(
531 format!("ws://{}", self.service_domain), 544 format!("ws://{}", self.service_domain),
532 self.service_domain.clone(), 545 self.service_domain.clone(),
@@ -535,17 +548,18 @@ impl SyncManager {
535 ); 548 );
536 tokio::spawn(async move { self_subscriber.run().await }); 549 tokio::spawn(async move { self_subscriber.run().await });
537 550
538 // 5. Connect to bootstrap relay if configured 551 // 6. Connect to bootstrap relay if configured
539 if let Some(ref bootstrap_url) = self.bootstrap_relay_url { 552 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
540 self.spawn_relay_connection( 553 self.spawn_relay_connection(
541 bootstrap_url.clone(), 554 bootstrap_url.clone(),
542 disconnect_tx.clone(), 555 disconnect_tx.clone(),
543 eose_tx.clone(), 556 eose_tx.clone(),
557 connect_tx.clone(),
544 ) 558 )
545 .await; 559 .await;
546 } 560 }
547 561
548 // 6. Main loop - handle actions from self-subscriber, disconnect, and EOSE notifications 562 // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
549 loop { 563 loop {
550 tokio::select! { 564 tokio::select! {
551 action = action_rx.recv() => { 565 action = action_rx.recv() => {
@@ -563,6 +577,7 @@ impl SyncManager {
563 repos, 577 repos,
564 disconnect_tx.clone(), 578 disconnect_tx.clone(),
565 eose_tx.clone(), 579 eose_tx.clone(),
580 connect_tx.clone(),
566 ).await; 581 ).await;
567 } else { 582 } else {
568 tracing::debug!( 583 tracing::debug!(
@@ -607,7 +622,235 @@ impl SyncManager {
607 } 622 }
608 } 623 }
609 } 624 }
625 connect = connect_rx.recv() => {
626 match connect {
627 Some(notification) => {
628 self.handle_connect_or_reconnect(&notification.relay_url).await;
629 }
630 None => {
631 // All connect senders dropped - unlikely but handle gracefully
632 tracing::debug!("Connect channel closed");
633 }
634 }
635 }
636 }
637 }
638 }
639
640 /// Handle a connection success (called when a relay connects or reconnects)
641 ///
642 /// This method implements smart reconnection logic:
643 /// - Fresh sync if never connected or >15 min since last connection
644 /// - Quick reconnect with since filter if <15 min since last connection
645 ///
646 /// For fresh sync:
647 /// - Clears any stale state
648 /// - Subscribes to Layer 1 without since filter
649 /// - Recomputes actions for new items
650 ///
651 /// For quick reconnect:
652 /// - Preserves existing state
653 /// - Subscribes to Layer 1 with since filter
654 /// - Rebuilds Layer 2 and Layer 3 with since filter
655 /// - Recomputes actions for new items
656 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
657 let now = Timestamp::now();
658
659 // Get the relay state to determine reconnect type
660 let (is_fresh_sync, last_connected, is_bootstrap) = {
661 let index = self.relay_sync_index.read().await;
662 if let Some(state) = index.get(relay_url) {
663 let last_conn = state.last_connected;
664 let is_fresh = match last_conn {
665 None => true, // Never connected before
666 Some(last) => {
667 let elapsed = now.as_secs().saturating_sub(last.as_secs());
668 elapsed > QUICK_RECONNECT_WINDOW_SECS // Stale if > 15 min
669 }
670 };
671 (is_fresh, last_conn, state.is_bootstrap)
672 } else {
673 (true, None, false) // No state found, treat as fresh
674 }
675 };
676
677 // If stale reconnect, clear state
678 if is_fresh_sync && last_connected.is_some() {
679 let mut index = self.relay_sync_index.write().await;
680 if let Some(state) = index.get_mut(relay_url) {
681 state.clear_sync_state();
682 tracing::info!(
683 relay = %relay_url,
684 "Cleared stale sync state (was disconnected > 15 min)"
685 );
686 }
687 }
688
689 // Update connection state
690 {
691 let mut index = self.relay_sync_index.write().await;
692 let state = index.entry(relay_url.to_string()).or_default();
693 state.connection_status = ConnectionStatus::Connected;
694 state.last_connected = Some(now);
695 state.disconnected_at = None;
696 }
697
698 // Record success in health tracker
699 self.health_tracker.record_success(relay_url);
700
701 // Subscribe based on reconnect type
702 if is_fresh_sync {
703 tracing::info!(
704 relay = %relay_url,
705 is_bootstrap = is_bootstrap,
706 "Fresh sync - subscribing to Layer 1 without since filter"
707 );
708 // Fresh sync: Layer 1 without since
709 // Layer 1 subscription is handled by the connection establishment
710 // Just recompute actions for new items
711 self.recompute_actions_for_relay(relay_url).await;
712 } else {
713 // Quick reconnect: use since filter
714 let since_ts = Timestamp::from(
715 last_connected
716 .unwrap()
717 .as_secs()
718 .saturating_sub(QUICK_RECONNECT_WINDOW_SECS),
719 );
720
721 tracing::info!(
722 relay = %relay_url,
723 since = %since_ts,
724 "Quick reconnect - using since filter for incremental sync"
725 );
726
727 // Rebuild Layer 2 and Layer 3 with since filter
728 self.rebuild_layer2_and_layer3(relay_url, Some(since_ts))
729 .await;
730
731 // Recompute actions for any new items discovered while disconnected
732 self.recompute_actions_for_relay(relay_url).await;
733 }
734 }
735
736 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay
737 ///
738 /// Uses the confirmed repos and root_events from RelayState to build filters.
739 /// If since is provided, applies it to all filters for incremental sync.
740 async fn rebuild_layer2_and_layer3(&self, relay_url: &str, since: Option<Timestamp>) {
741 use crate::sync::filters::build_layer2_and_layer3_filters;
742
743 // Get confirmed state from relay_sync_index
744 let (repos, root_events) = {
745 let index = self.relay_sync_index.read().await;
746 match index.get(relay_url) {
747 Some(state) => (state.repos.clone(), state.root_events.clone()),
748 None => {
749 tracing::warn!(
750 relay = %relay_url,
751 "No RelayState found for rebuild_layer2_and_layer3"
752 );
753 return;
754 }
755 }
756 };
757
758 // Nothing to rebuild if no confirmed items
759 if repos.is_empty() && root_events.is_empty() {
760 tracing::debug!(
761 relay = %relay_url,
762 "No confirmed items to rebuild Layer 2/3 for"
763 );
764 return;
765 }
766
767 // Build Layer 2 and Layer 3 filters
768 let filters = build_layer2_and_layer3_filters(&repos, &root_events, since);
769
770 tracing::debug!(
771 relay = %relay_url,
772 filter_count = filters.len(),
773 repos_count = repos.len(),
774 root_events_count = root_events.len(),
775 since = ?since,
776 "Rebuilding Layer 2/3 filters"
777 );
778
779 // Subscribe to filters on the relay connection
780 if let Some(connection) = self.connections.get(relay_url) {
781 for filter in filters {
782 if let Err(e) = connection.subscribe_filter(filter).await {
783 tracing::error!(
784 relay = %relay_url,
785 error = %e,
786 "Failed to subscribe to Layer 2/3 filter during rebuild"
787 );
788 }
610 } 789 }
790 } else {
791 tracing::warn!(
792 relay = %relay_url,
793 "No active connection found for Layer 2/3 rebuild"
794 );
795 }
796 }
797
798 /// Recompute sync actions for a specific relay
799 ///
800 /// Uses derive_relay_targets and compute_actions to find new items
801 /// that need to be synced. For Phase 4, this just logs the actions;
802 /// full handling will be implemented in Phase 5.
803 async fn recompute_actions_for_relay(&self, relay_url: &str) {
804 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
805
806 // Get current state from indexes
807 let repo_index = self.repo_sync_index.read().await;
808 let pending_index = self.pending_sync_index.read().await;
809 let relay_index = self.relay_sync_index.read().await;
810
811 // Derive per-relay targets from repo index
812 let all_targets = derive_relay_targets(&repo_index);
813
814 // Filter to only targets for this specific relay
815 let relay_target = all_targets.get(relay_url);
816
817 if relay_target.is_none() {
818 tracing::debug!(
819 relay = %relay_url,
820 "No sync targets found for relay"
821 );
822 return;
823 }
824
825 // Build single-relay targets map for compute_actions
826 let mut single_relay_targets = std::collections::HashMap::new();
827 if let Some(target) = relay_target {
828 single_relay_targets.insert(relay_url.to_string(), target.clone());
829 }
830
831 // Compute actions for new items
832 let actions = compute_actions(
833 &single_relay_targets,
834 &pending_index,
835 &relay_index,
836 );
837
838 // Log the actions (Phase 5 will process them)
839 for action in &actions {
840 tracing::info!(
841 relay = %action.relay_url,
842 new_repos = action.repos.len(),
843 new_root_events = action.root_events.len(),
844 filters = action.filters.len(),
845 "Discovered new items to sync (Phase 5 will process)"
846 );
847 }
848
849 if actions.is_empty() {
850 tracing::debug!(
851 relay = %relay_url,
852 "No new items to sync for relay"
853 );
611 } 854 }
612 } 855 }
613 856
@@ -680,6 +923,7 @@ impl SyncManager {
680 repos: HashMap<String, HashSet<EventId>>, 923 repos: HashMap<String, HashSet<EventId>>,
681 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, 924 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>,
682 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, 925 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>,
926 connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>,
683 ) { 927 ) {
684 use crate::sync::filters::build_layer2_and_layer3_filters; 928 use crate::sync::filters::build_layer2_and_layer3_filters;
685 use tokio::sync::mpsc; 929 use tokio::sync::mpsc;
@@ -713,6 +957,13 @@ impl SyncManager {
713 ); 957 );
714 } 958 }
715 959
960 // Notify SyncManager of successful connection
961 let _ = connect_tx
962 .send(ConnectNotification {
963 relay_url: relay_url.clone(),
964 })
965 .await;
966
716 // Subscribe to Layer 2+3 filters for the repos 967 // Subscribe to Layer 2+3 filters for the repos
717 let repo_ids: HashSet<String> = repos.keys().cloned().collect(); 968 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
718 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); 969 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
@@ -801,6 +1052,7 @@ impl SyncManager {
801 relay_url: String, 1052 relay_url: String,
802 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, 1053 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>,
803 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, 1054 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>,
1055 connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>,
804 ) { 1056 ) {
805 use tokio::sync::mpsc; 1057 use tokio::sync::mpsc;
806 1058
@@ -833,6 +1085,13 @@ impl SyncManager {
833 ); 1085 );
834 } 1086 }
835 1087
1088 // Notify SyncManager of successful connection
1089 let _ = connect_tx
1090 .send(ConnectNotification {
1091 relay_url: relay_url.clone(),
1092 })
1093 .await;
1094
836 // Create event channel 1095 // Create event channel
837 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); 1096 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
838 1097