upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:30:19 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:30:19 +0000
commit5f9d3ca0db4ffc9088be0b9ac9558efe1a4da810 (patch)
treeb4f0879668f560f10ad9f211481d3f45d11c12e1 /src/sync
parent4d21292f7fae6cf50608af0e15051ba2369472c7 (diff)
sync: complete AddFilters handler with auto-spawning
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs493
1 files changed, 275 insertions, 218 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 8e0b8e1..f704ca7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -375,6 +375,12 @@ pub struct SyncManager {
375 health_tracker: Arc<RelayHealthTracker>, 375 health_tracker: Arc<RelayHealthTracker>,
376 /// Counter for generating unique batch IDs 376 /// Counter for generating unique batch IDs
377 next_batch_id: u64, 377 next_batch_id: u64,
378 /// Channel for disconnect notifications (set during run)
379 disconnect_tx: Option<tokio::sync::mpsc::Sender<DisconnectNotification>>,
380 /// Channel for EOSE notifications (set during run)
381 eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>,
382 /// Channel for connect notifications (set during run)
383 connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>,
378} 384}
379 385
380impl SyncManager { 386impl SyncManager {
@@ -405,6 +411,9 @@ impl SyncManager {
405 connections: HashMap::new(), 411 connections: HashMap::new(),
406 health_tracker: Arc::new(RelayHealthTracker::new(config)), 412 health_tracker: Arc::new(RelayHealthTracker::new(config)),
407 next_batch_id: 0, 413 next_batch_id: 0,
414 disconnect_tx: None,
415 eose_tx: None,
416 connect_tx: None,
408 } 417 }
409 } 418 }
410 419
@@ -548,15 +557,14 @@ impl SyncManager {
548 ); 557 );
549 tokio::spawn(async move { self_subscriber.run().await }); 558 tokio::spawn(async move { self_subscriber.run().await });
550 559
560 // 5b. Store channel senders for use by handlers
561 self.disconnect_tx = Some(disconnect_tx.clone());
562 self.eose_tx = Some(eose_tx.clone());
563 self.connect_tx = Some(connect_tx.clone());
564
551 // 6. Connect to bootstrap relay if configured 565 // 6. Connect to bootstrap relay if configured
552 if let Some(ref bootstrap_url) = self.bootstrap_relay_url { 566 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
553 self.spawn_relay_connection( 567 self.spawn_relay_connection(bootstrap_url.clone()).await;
554 bootstrap_url.clone(),
555 disconnect_tx.clone(),
556 eose_tx.clone(),
557 connect_tx.clone(),
558 )
559 .await;
560 } 568 }
561 569
562 // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 570 // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
@@ -565,37 +573,46 @@ impl SyncManager {
565 action = action_rx.recv() => { 573 action = action_rx.recv() => {
566 match action { 574 match action {
567 Some(RelayAction::SpawnRelay { relay_url, repos }) => { 575 Some(RelayAction::SpawnRelay { relay_url, repos }) => {
568 // Check if relay already exists 576 // Convert to AddFilters format and use unified handler
569 let relay_index = self.relay_sync_index.read().await; 577 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
570 let exists = relay_index.contains_key(&relay_url); 578 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
571 drop(relay_index); 579
572 580 // Build filters for these repos
573 if !exists { 581 let filters = crate::sync::filters::build_layer2_and_layer3_filters(
574 tracing::info!(relay = %relay_url, "Spawning new relay connection"); 582 &repo_ids,
575 self.spawn_relay_with_layer2( 583 &root_events,
576 relay_url, 584 None,
577 repos, 585 );
578 disconnect_tx.clone(), 586
579 eose_tx.clone(), 587 let action = AddFilters {
580 connect_tx.clone(), 588 relay_url,
581 ).await; 589 repos: repo_ids,
582 } else { 590 root_events,
583 tracing::debug!( 591 filters,
584 relay = %relay_url, 592 };
585 "Relay already exists, considering AddFilters" 593
586 ); 594 self.handle_add_filters(action).await;
587 // For MVP, we don't handle AddFilters - just log
588 // Full implementation would call subscribe_filters on existing connection
589 }
590 } 595 }
591 Some(RelayAction::AddFilters { relay_url, repos }) => { 596 Some(RelayAction::AddFilters { relay_url, repos }) => {
592 tracing::debug!( 597 // Convert to AddFilters format and use unified handler
593 relay = %relay_url, 598 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
594 repo_count = repos.len(), 599 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
595 "AddFilters action (MVP: not implemented)" 600
601 // Build filters for these repos
602 let filters = crate::sync::filters::build_layer2_and_layer3_filters(
603 &repo_ids,
604 &root_events,
605 None,
596 ); 606 );
597 // For MVP, not implemented - full version would add Layer 2 filters 607
598 // to existing relay connection 608 let action = AddFilters {
609 relay_url,
610 repos: repo_ids,
611 root_events,
612 filters,
613 };
614
615 self.handle_add_filters(action).await;
599 } 616 }
600 None => break, 617 None => break,
601 } 618 }
@@ -637,6 +654,130 @@ impl SyncManager {
637 } 654 }
638 } 655 }
639 656
657 /// Handle AddFilters action - subscribe to filters on a relay
658 ///
659 /// This method handles all filter additions:
660 /// - For new relays: creates entry with Connecting status, spawns connection
661 /// - For existing connected relays: subscribes to filters, creates PendingBatch
662 /// - For disconnected/connecting relays: returns (will be handled on connection)
663 async fn handle_add_filters(&mut self, action: AddFilters) {
664 // Step 1: Check if relay exists in relay_sync_index
665 let connection_status = {
666 let index = self.relay_sync_index.read().await;
667 index.get(&action.relay_url).map(|s| s.connection_status)
668 };
669
670 match connection_status {
671 None => {
672 // New relay - create entry with Connecting status
673 {
674 let mut index = self.relay_sync_index.write().await;
675 let new_state = RelayState {
676 connection_status: ConnectionStatus::Connecting,
677 is_bootstrap: false, // Only bootstrap relays set this to true
678 last_connected: None,
679 disconnected_at: None,
680 repos: HashSet::new(),
681 root_events: HashSet::new(),
682 };
683 index.insert(action.relay_url.clone(), new_state);
684 }
685
686 tracing::info!(
687 relay = %action.relay_url,
688 repos = action.repos.len(),
689 "Spawning connection for new relay"
690 );
691
692 // Spawn connection for new relay
693 self.spawn_relay_connection(action.relay_url.clone()).await;
694 // Connection will trigger handle_connect_or_reconnect which will process items
695 return;
696 }
697 Some(ConnectionStatus::Disconnected) | Some(ConnectionStatus::Connecting) => {
698 // Will be handled when connection succeeds
699 tracing::debug!(
700 relay = %action.relay_url,
701 status = ?connection_status,
702 "Relay not connected, action will be processed on connection"
703 );
704 return;
705 }
706 Some(ConnectionStatus::Connected) => {
707 // Continue to subscribe
708 }
709 }
710
711 // Step 2: Check if consolidation is needed (Phase 6 will implement maybe_consolidate)
712 // self.maybe_consolidate(&action.relay_url, action.filters.len());
713
714 // Step 3: Get connection and subscribe to all filters
715 let connection = match self.connections.get(&action.relay_url) {
716 Some(conn) => conn,
717 None => {
718 tracing::warn!(
719 relay = %action.relay_url,
720 "No connection for relay, cannot subscribe"
721 );
722 return;
723 }
724 };
725
726 // Subscribe to each filter and collect subscription IDs
727 let mut subscription_ids = Vec::new();
728 for filter in &action.filters {
729 match connection.subscribe_filter(filter.clone()).await {
730 Ok(sub_id) => {
731 subscription_ids.push(sub_id);
732 }
733 Err(e) => {
734 tracing::error!(
735 relay = %action.relay_url,
736 error = %e,
737 "Failed to subscribe to filter"
738 );
739 }
740 }
741 }
742
743 if subscription_ids.is_empty() && !action.filters.is_empty() {
744 tracing::warn!(
745 relay = %action.relay_url,
746 "All filter subscriptions failed, not creating batch"
747 );
748 return;
749 }
750
751 // Step 4: Create PendingBatch
752 let batch_id = self.next_batch_id();
753 let batch = PendingBatch {
754 batch_id,
755 items: PendingItems {
756 repos: action.repos.clone(),
757 root_events: action.root_events.clone(),
758 },
759 outstanding_subs: subscription_ids.into_iter().collect(),
760 };
761
762 // Step 5: Add to pending_sync_index
763 {
764 let mut pending = self.pending_sync_index.write().await;
765 pending
766 .entry(action.relay_url.clone())
767 .or_insert_with(Vec::new)
768 .push(batch);
769 }
770
771 tracing::debug!(
772 relay = %action.relay_url,
773 batch_id = batch_id,
774 repos = action.repos.len(),
775 root_events = action.root_events.len(),
776 filters = action.filters.len(),
777 "Created pending batch for filter subscriptions"
778 );
779 }
780
640 /// Handle a connection success (called when a relay connects or reconnects) 781 /// Handle a connection success (called when a relay connects or reconnects)
641 /// 782 ///
642 /// This method implements smart reconnection logic: 783 /// This method implements smart reconnection logic:
@@ -798,59 +939,57 @@ impl SyncManager {
798 /// Recompute sync actions for a specific relay 939 /// Recompute sync actions for a specific relay
799 /// 940 ///
800 /// Uses derive_relay_targets and compute_actions to find new items 941 /// Uses derive_relay_targets and compute_actions to find new items
801 /// that need to be synced. For Phase 4, this just logs the actions; 942 /// that need to be synced. Processes AddFilters actions for new items.
802 /// full handling will be implemented in Phase 5. 943 async fn recompute_actions_for_relay(&mut self, relay_url: &str) {
803 async fn recompute_actions_for_relay(&self, relay_url: &str) {
804 use crate::sync::algorithms::{compute_actions, derive_relay_targets}; 944 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
805 945
806 // Get current state from indexes 946 // Get current state from indexes (need to collect to avoid holding locks)
807 let repo_index = self.repo_sync_index.read().await; 947 let all_targets = {
808 let pending_index = self.pending_sync_index.read().await; 948 let repo_index = self.repo_sync_index.read().await;
809 let relay_index = self.relay_sync_index.read().await; 949 derive_relay_targets(&repo_index)
810 950 };
811 // Derive per-relay targets from repo index
812 let all_targets = derive_relay_targets(&repo_index);
813 951
814 // Filter to only targets for this specific relay 952 // Filter to only targets for this specific relay
815 let relay_target = all_targets.get(relay_url); 953 let relay_target = match all_targets.get(relay_url) {
954 Some(target) => target.clone(),
955 None => {
956 tracing::debug!(
957 relay = %relay_url,
958 "No sync targets found for relay"
959 );
960 return;
961 }
962 };
963
964 // Build single-relay targets map for compute_actions
965 let mut single_relay_targets = std::collections::HashMap::new();
966 single_relay_targets.insert(relay_url.to_string(), relay_target);
967
968 // Compute actions for new items
969 let actions = {
970 let pending_index = self.pending_sync_index.read().await;
971 let relay_index = self.relay_sync_index.read().await;
972 compute_actions(&single_relay_targets, &pending_index, &relay_index)
973 };
816 974
817 if relay_target.is_none() { 975 if actions.is_empty() {
818 tracing::debug!( 976 tracing::debug!(
819 relay = %relay_url, 977 relay = %relay_url,
820 "No sync targets found for relay" 978 "No new items to sync for relay"
821 ); 979 );
822 return; 980 return;
823 } 981 }
824 982
825 // Build single-relay targets map for compute_actions 983 // Process each action
826 let mut single_relay_targets = std::collections::HashMap::new(); 984 for action in actions {
827 if let Some(target) = relay_target {
828 single_relay_targets.insert(relay_url.to_string(), target.clone());
829 }
830
831 // Compute actions for new items
832 let actions = compute_actions(
833 &single_relay_targets,
834 &pending_index,
835 &relay_index,
836 );
837
838 // Log the actions (Phase 5 will process them)
839 for action in &actions {
840 tracing::info!( 985 tracing::info!(
841 relay = %action.relay_url, 986 relay = %action.relay_url,
842 new_repos = action.repos.len(), 987 new_repos = action.repos.len(),
843 new_root_events = action.root_events.len(), 988 new_root_events = action.root_events.len(),
844 filters = action.filters.len(), 989 filters = action.filters.len(),
845 "Discovered new items to sync (Phase 5 will process)" 990 "Processing AddFilters for new items"
846 );
847 }
848
849 if actions.is_empty() {
850 tracing::debug!(
851 relay = %relay_url,
852 "No new items to sync for relay"
853 ); 991 );
992 self.handle_add_filters(action).await;
854 } 993 }
855 } 994 }
856 995
@@ -912,179 +1051,81 @@ impl SyncManager {
912 ); 1051 );
913 } 1052 }
914 1053
915 /// Spawn relay connection with Layer 2 filters for specific repos 1054 /// Spawn a relay connection and start its event loop
916 /// 1055 ///
917 /// Used when discovering relays from announcements. Connects to the relay, 1056 /// Creates a new RelayConnection, connects to Layer 1, stores the connection,
918 /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the 1057 /// and spawns event processing tasks. Uses stored channel senders for notifications.
919 /// specific repos we want to sync. 1058 async fn spawn_relay_connection(&mut self, relay_url: String) {
920 async fn spawn_relay_with_layer2(
921 &self,
922 relay_url: String,
923 repos: HashMap<String, HashSet<EventId>>,
924 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>,
925 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>,
926 connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>,
927 ) {
928 use crate::sync::filters::build_layer2_and_layer3_filters;
929 use tokio::sync::mpsc; 1059 use tokio::sync::mpsc;
930 1060
931 let database = Arc::clone(&self.database); 1061 // Get channel senders (must exist during run)
932 let write_policy = self.write_policy.clone(); 1062 let disconnect_tx = match &self.disconnect_tx {
933 let relay_sync_index = Arc::clone(&self.relay_sync_index); 1063 Some(tx) => tx.clone(),
934 1064 None => {
935 // Create relay connection
936 let connection = RelayConnection::new(relay_url.clone());
937
938 // Connect and subscribe to Layer 1 (announcements)
939 if let Err(e) = connection.connect_and_subscribe(None).await {
940 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
941 return;
942 }
943
944 // Mark as connected in relay sync index
945 {
946 let mut index = relay_sync_index.write().await;
947 index.insert(
948 relay_url.clone(),
949 RelayState {
950 repos: repos.keys().cloned().collect(),
951 root_events: repos.values().flatten().cloned().collect(),
952 is_bootstrap: false,
953 connection_status: ConnectionStatus::Connected,
954 last_connected: Some(Timestamp::now()),
955 disconnected_at: None,
956 },
957 );
958 }
959
960 // Notify SyncManager of successful connection
961 let _ = connect_tx
962 .send(ConnectNotification {
963 relay_url: relay_url.clone(),
964 })
965 .await;
966
967 // Subscribe to Layer 2+3 filters for the repos
968 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
969 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
970 let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None);
971
972 for filter in filters {
973 if let Err(e) = connection.subscribe_filter(filter).await {
974 tracing::error!( 1065 tracing::error!(
975 relay = %relay_url, 1066 relay = %relay_url,
976 error = %e, 1067 "Cannot spawn connection - channels not initialized"
977 "Failed to subscribe to Layer 2 filter"
978 ); 1068 );
1069 return;
979 } 1070 }
980 } 1071 };
981 1072 let eose_tx = match &self.eose_tx {
982 tracing::info!( 1073 Some(tx) => tx.clone(),
983 relay = %relay_url, 1074 None => {
984 repo_count = repos.len(), 1075 tracing::error!(
985 "Connected to discovered relay with Layer 2+3 filters" 1076 relay = %relay_url,
986 ); 1077 "Cannot spawn connection - channels not initialized"
987 1078 );
988 // Create event channel 1079 return;
989 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
990
991 // Spawn event loop
992 tokio::spawn(async move {
993 connection.run_event_loop(event_tx).await;
994 });
995
996 // Spawn event processor
997 let relay_url_clone = relay_url.clone();
998 tokio::spawn(async move {
999 while let Some(relay_event) = event_rx.recv().await {
1000 match relay_event {
1001 RelayEvent::Event(event) => {
1002 Self::process_event_static(
1003 &event,
1004 &relay_url_clone,
1005 &database,
1006 &write_policy,
1007 )
1008 .await;
1009 }
1010 RelayEvent::EndOfStoredEvents(sub_id) => {
1011 tracing::debug!(
1012 relay = %relay_url_clone,
1013 sub_id = %sub_id,
1014 "EOSE received, notifying SyncManager"
1015 );
1016 // Notify SyncManager of EOSE
1017 let _ = eose_tx
1018 .send(EoseNotification {
1019 relay_url: relay_url_clone.clone(),
1020 sub_id,
1021 })
1022 .await;
1023 }
1024 RelayEvent::Closed(reason) => {
1025 tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed");
1026 // Notify SyncManager of disconnect
1027 let _ = disconnect_tx
1028 .send(DisconnectNotification {
1029 relay_url: relay_url_clone.clone(),
1030 })
1031 .await;
1032 break;
1033 }
1034 RelayEvent::Shutdown => {
1035 tracing::info!(relay = %relay_url_clone, "Relay shutdown detected");
1036 // Notify SyncManager of disconnect
1037 let _ = disconnect_tx
1038 .send(DisconnectNotification {
1039 relay_url: relay_url_clone.clone(),
1040 })
1041 .await;
1042 break;
1043 }
1044 }
1045 } 1080 }
1046 }); 1081 };
1047 } 1082 let connect_tx = match &self.connect_tx {
1048 1083 Some(tx) => tx.clone(),
1049 /// Spawn a relay connection and start its event loop 1084 None => {
1050 async fn spawn_relay_connection( 1085 tracing::error!(
1051 &self, 1086 relay = %relay_url,
1052 relay_url: String, 1087 "Cannot spawn connection - channels not initialized"
1053 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, 1088 );
1054 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, 1089 return;
1055 connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>, 1090 }
1056 ) { 1091 };
1057 use tokio::sync::mpsc;
1058 1092
1059 let database = Arc::clone(&self.database); 1093 let database = Arc::clone(&self.database);
1060 let write_policy = self.write_policy.clone(); 1094 let write_policy = self.write_policy.clone();
1061 let relay_sync_index = Arc::clone(&self.relay_sync_index); 1095 let relay_sync_index = Arc::clone(&self.relay_sync_index);
1062 1096
1097 // Check if this is a bootstrap relay
1098 let is_bootstrap = self.bootstrap_relay_url.as_ref() == Some(&relay_url);
1099
1063 // Create relay connection 1100 // Create relay connection
1064 let connection = RelayConnection::new(relay_url.clone()); 1101 let connection = RelayConnection::new(relay_url.clone());
1065 1102
1066 // Connect and subscribe to Layer 1 1103 // Connect and subscribe to Layer 1
1067 if let Err(e) = connection.connect_and_subscribe(None).await { 1104 if let Err(e) = connection.connect_and_subscribe(None).await {
1068 tracing::error!("Failed to connect to relay {}: {}", relay_url, e); 1105 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
1106 // Update state to disconnected on failure
1107 {
1108 let mut index = relay_sync_index.write().await;
1109 if let Some(state) = index.get_mut(&relay_url) {
1110 state.connection_status = ConnectionStatus::Disconnected;
1111 }
1112 }
1069 return; 1113 return;
1070 } 1114 }
1071 1115
1072 // Mark as connected in relay sync index 1116 // Mark as connected in relay sync index
1073 { 1117 {
1074 let mut index = relay_sync_index.write().await; 1118 let mut index = relay_sync_index.write().await;
1075 index.insert( 1119 let state = index.entry(relay_url.clone()).or_default();
1076 relay_url.clone(), 1120 state.connection_status = ConnectionStatus::Connected;
1077 RelayState { 1121 state.is_bootstrap = is_bootstrap;
1078 repos: HashSet::new(), 1122 state.last_connected = Some(Timestamp::now());
1079 root_events: HashSet::new(), 1123 state.disconnected_at = None;
1080 is_bootstrap: true,
1081 connection_status: ConnectionStatus::Connected,
1082 last_connected: Some(Timestamp::now()),
1083 disconnected_at: None,
1084 },
1085 );
1086 } 1124 }
1087 1125
1126 // Store connection for later use (for subscribing to filters)
1127 self.connections.insert(relay_url.clone(), connection);
1128
1088 // Notify SyncManager of successful connection 1129 // Notify SyncManager of successful connection
1089 let _ = connect_tx 1130 let _ = connect_tx
1090 .send(ConnectNotification { 1131 .send(ConnectNotification {
@@ -1092,6 +1133,16 @@ impl SyncManager {
1092 }) 1133 })
1093 .await; 1134 .await;
1094 1135
1136 // Get the connection back for the event loop
1137 // We need to take it out because run_event_loop consumes self
1138 let connection = match self.connections.remove(&relay_url) {
1139 Some(conn) => conn,
1140 None => {
1141 tracing::error!(relay = %relay_url, "Connection disappeared after insert");
1142 return;
1143 }
1144 };
1145
1095 // Create event channel 1146 // Create event channel
1096 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); 1147 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
1097 1148
@@ -1155,6 +1206,12 @@ impl SyncManager {
1155 } 1206 }
1156 } 1207 }
1157 }); 1208 });
1209
1210 tracing::info!(
1211 relay = %relay_url,
1212 is_bootstrap = is_bootstrap,
1213 "Spawned relay connection"
1214 );
1158 } 1215 }
1159 1216
1160 /// Process a single event from a relay (static version for spawned tasks) 1217 /// Process a single event from a relay (static version for spawned tasks)