upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/sync/algorithms.rs4
-rw-r--r--src/sync/health.rs15
-rw-r--r--src/sync/mod.rs668
3 files changed, 355 insertions, 332 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index 84248b1..a6e0787 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -348,6 +348,7 @@ mod tests {
348 connection_status: ConnectionStatus::Disconnected, 348 connection_status: ConnectionStatus::Disconnected,
349 last_connected: None, 349 last_connected: None,
350 disconnected_at: None, 350 disconnected_at: None,
351 announcements_synced: false,
351 }, 352 },
352 ); 353 );
353 354
@@ -435,6 +436,7 @@ mod tests {
435 connection_status: ConnectionStatus::Connected, 436 connection_status: ConnectionStatus::Connected,
436 last_connected: None, 437 last_connected: None,
437 disconnected_at: None, 438 disconnected_at: None,
439 announcements_synced: false,
438 }, 440 },
439 ); 441 );
440 442
@@ -468,6 +470,7 @@ mod tests {
468 connection_status: ConnectionStatus::Connecting, 470 connection_status: ConnectionStatus::Connecting,
469 last_connected: None, 471 last_connected: None,
470 disconnected_at: None, 472 disconnected_at: None,
473 announcements_synced: false,
471 }, 474 },
472 ); 475 );
473 476
@@ -523,6 +526,7 @@ mod tests {
523 connection_status: ConnectionStatus::Connected, 526 connection_status: ConnectionStatus::Connected,
524 last_connected: None, 527 last_connected: None,
525 disconnected_at: None, 528 disconnected_at: None,
529 announcements_synced: false,
526 }, 530 },
527 ); 531 );
528 532
diff --git a/src/sync/health.rs b/src/sync/health.rs
index 0ae7dee..d919a80 100644
--- a/src/sync/health.rs
+++ b/src/sync/health.rs
@@ -64,6 +64,8 @@ pub struct RelayHealth {
64 pub last_failure_time: Option<Instant>, 64 pub last_failure_time: Option<Instant>,
65 /// Time of the last successful connection 65 /// Time of the last successful connection
66 pub last_success_time: Option<Instant>, 66 pub last_success_time: Option<Instant>,
67 /// Time of the last connection attempt (success or failure)
68 pub last_attempt_time: Option<Instant>,
67 /// Next time a connection attempt should be made 69 /// Next time a connection attempt should be made
68 pub next_retry_at: Option<Instant>, 70 pub next_retry_at: Option<Instant>,
69} 71}
@@ -76,6 +78,7 @@ impl Default for RelayHealth {
76 first_failure_time: None, 78 first_failure_time: None,
77 last_failure_time: None, 79 last_failure_time: None,
78 last_success_time: None, 80 last_success_time: None,
81 last_attempt_time: None,
79 next_retry_at: None, 82 next_retry_at: None,
80 } 83 }
81 } 84 }
@@ -132,6 +135,17 @@ impl RelayHealthTracker {
132 self.base_backoff_secs 135 self.base_backoff_secs
133 } 136 }
134 137
138 /// Record a connection attempt (updates last_attempt_time)
139 ///
140 /// This should be called before trying to connect, to track when
141 /// attempts are made regardless of success or failure.
142 pub fn record_attempt(&self, relay_url: &str) {
143 let now = Instant::now();
144 let mut entry = self.health.entry(relay_url.to_string()).or_default();
145 let health = entry.value_mut();
146 health.last_attempt_time = Some(now);
147 }
148
135 /// Record a successful connection to a relay 149 /// Record a successful connection to a relay
136 /// 150 ///
137 /// Resets the relay to Healthy state and clears failure counters. 151 /// Resets the relay to Healthy state and clears failure counters.
@@ -148,6 +162,7 @@ impl RelayHealthTracker {
148 health.first_failure_time = None; 162 health.first_failure_time = None;
149 health.last_failure_time = None; 163 health.last_failure_time = None;
150 health.last_success_time = Some(now); 164 health.last_success_time = Some(now);
165 health.last_attempt_time = Some(now);
151 health.next_retry_at = None; 166 health.next_retry_at = None;
152 167
153 if old_state != HealthState::Healthy { 168 if old_state != HealthState::Healthy {
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 3c50387..5bea701 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -103,6 +103,9 @@ pub struct RelayState {
103 pub last_connected: Option<Timestamp>, 103 pub last_connected: Option<Timestamp>,
104 /// When we disconnected - for 15-minute state retention rule 104 /// When we disconnected - for 15-minute state retention rule
105 pub disconnected_at: Option<Timestamp>, 105 pub disconnected_at: Option<Timestamp>,
106 /// Whether announcement filter historic sync has completed for this relay
107 /// Used to determine if we can use `since` filter on reconnect for Layer 1
108 pub announcements_synced: bool,
106} 109}
107 110
108impl Default for RelayState { 111impl Default for RelayState {
@@ -114,6 +117,7 @@ impl Default for RelayState {
114 connection_status: ConnectionStatus::Disconnected, 117 connection_status: ConnectionStatus::Disconnected,
115 last_connected: None, 118 last_connected: None,
116 disconnected_at: None, 119 disconnected_at: None,
120 announcements_synced: false,
117 } 121 }
118 } 122 }
119} 123}
@@ -134,6 +138,7 @@ impl RelayState {
134 pub fn clear_sync_state(&mut self) { 138 pub fn clear_sync_state(&mut self) {
135 self.repos.clear(); 139 self.repos.clear();
136 self.root_events.clear(); 140 self.root_events.clear();
141 self.announcements_synced = false;
137 } 142 }
138} 143}
139 144
@@ -301,7 +306,7 @@ async fn run_disconnect_checker(
301 306
302 let mut manager = sync_manager.lock().await; 307 let mut manager = sync_manager.lock().await;
303 manager.check_disconnects().await; 308 manager.check_disconnects().await;
304 manager.check_reconnects().await; 309 manager.retry_disconnected_relays().await;
305 } 310 }
306 _ = shutdown_rx.recv() => { 311 _ = shutdown_rx.recv() => {
307 tracing::info!("Disconnect checker received shutdown signal"); 312 tracing::info!("Disconnect checker received shutdown signal");
@@ -475,6 +480,9 @@ impl SyncManager {
475 /// move repos and root_events from pending to confirmed state. This unified 480 /// move repos and root_events from pending to confirmed state. This unified
476 /// flow ensures consistent state tracking regardless of sync method. 481 /// flow ensures consistent state tracking regardless of sync method.
477 /// 482 ///
483 /// For generic filter batches (identified by empty repos and root_events),
484 /// this sets the announcements_synced flag to enable incremental sync on reconnect.
485 ///
478 /// # Arguments 486 /// # Arguments
479 /// * `relay_url` - The relay URL the batch belongs to 487 /// * `relay_url` - The relay URL the batch belongs to
480 /// * `batch` - The completed batch to confirm 488 /// * `batch` - The completed batch to confirm
@@ -483,6 +491,7 @@ impl SyncManager {
483 let repos_count = batch.items.repos.len(); 491 let repos_count = batch.items.repos.len();
484 let events_count = batch.items.root_events.len(); 492 let events_count = batch.items.root_events.len();
485 let sync_method = batch.sync_method; 493 let sync_method = batch.sync_method;
494 let is_generic_filter = repos_count == 0 && events_count == 0;
486 495
487 let mut relay_index = self.relay_sync_index.write().await; 496 let mut relay_index = self.relay_sync_index.write().await;
488 497
@@ -492,6 +501,17 @@ impl SyncManager {
492 // Move root_events to confirmed 501 // Move root_events to confirmed
493 state.root_events.extend(batch.items.root_events.clone()); 502 state.root_events.extend(batch.items.root_events.clone());
494 503
504 // Set announcements_synced flag for generic filter batches
505 if is_generic_filter {
506 state.announcements_synced = true;
507 tracing::info!(
508 relay = %relay_url,
509 batch_id = batch_id,
510 sync_method = ?sync_method,
511 "Generic filter (announcements) historic sync complete - announcements_synced set to true"
512 );
513 }
514
495 // DEBUG TRACING: Log the root events being confirmed 515 // DEBUG TRACING: Log the root events being confirmed
496 tracing::info!( 516 tracing::info!(
497 relay = %relay_url, 517 relay = %relay_url,
@@ -503,6 +523,8 @@ impl SyncManager {
503 total_repos = state.repos.len(), 523 total_repos = state.repos.len(),
504 total_root_events = state.root_events.len(), 524 total_root_events = state.root_events.len(),
505 all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), 525 all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
526 is_generic_filter = is_generic_filter,
527 announcements_synced = state.announcements_synced,
506 "Batch confirmed - items moved from pending to confirmed" 528 "Batch confirmed - items moved from pending to confirmed"
507 ); 529 );
508 } else { 530 } else {
@@ -623,7 +645,8 @@ impl SyncManager {
623 645
624 // 6. Connect to bootstrap relay if configured 646 // 6. Connect to bootstrap relay if configured
625 if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { 647 if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() {
626 self.spawn_relay_connection(bootstrap_url.clone()).await; 648 self.register_relay(bootstrap_url.clone()).await;
649 self.try_connect_relay(bootstrap_url).await;
627 } 650 }
628 651
629 // 7. Capture config values before moving self into Arc 652 // 7. Capture config values before moving self into Arc
@@ -731,33 +754,16 @@ impl SyncManager {
731 754
732 match connection_status { 755 match connection_status {
733 None => { 756 None => {
734 // New relay - create entry with Connecting status 757 // New relay - register and connect
735 {
736 let mut index = self.relay_sync_index.write().await;
737 let new_state = RelayState {
738 connection_status: ConnectionStatus::Connecting,
739 is_bootstrap: false, // Only bootstrap relays set this to true
740 last_connected: None,
741 disconnected_at: None,
742 repos: HashSet::new(),
743 root_events: HashSet::new(),
744 };
745 index.insert(action.relay_url.clone(), new_state);
746 }
747
748 // Track new relay in metrics
749 if let Some(ref metrics) = self.metrics {
750 metrics.inc_tracked_count();
751 }
752
753 tracing::info!( 758 tracing::info!(
754 relay = %action.relay_url, 759 relay = %action.relay_url,
755 repos = action.items.repos.len(), 760 repos = action.items.repos.len(),
756 "Spawning connection for new relay" 761 "Registering and connecting to new relay"
757 ); 762 );
758 763
759 // Spawn connection for new relay 764 // Register relay (creates RelayConnection, initializes RelayState, updates metrics)
760 self.spawn_relay_connection(action.relay_url.clone()).await; 765 self.register_relay(action.relay_url.clone()).await;
766 self.try_connect_relay(&action.relay_url).await;
761 // Connection will trigger handle_connect_or_reconnect which will process items 767 // Connection will trigger handle_connect_or_reconnect which will process items
762 return; 768 return;
763 } 769 }
@@ -795,11 +801,146 @@ impl SyncManager {
795 801
796 /// Handle a connection success (called when a relay connects or reconnects) 802 /// Handle a connection success (called when a relay connects or reconnects)
797 /// 803 ///
798 /// This method dispatches to the appropriate reconnection strategy: 804 /// This method:
799 /// - `fresh_start()` if never connected before 805 /// 1. Updates RelayState to Connected
800 /// - `quick_reconnect()` if disconnected < 15 minutes 806 /// 2. Spawns event loop (MUST happen on every connection/reconnect)
801 /// - `long_reconnect()` if disconnected > 15 minutes 807 /// 3. Dispatches to appropriate reconnection strategy based on disconnect time
802 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { 808 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
809 use tokio::sync::mpsc;
810
811 // 1. Update state to Connected
812 {
813 let mut index = self.relay_sync_index.write().await;
814 let state = index.entry(relay_url.to_string()).or_default();
815 state.connection_status = ConnectionStatus::Connected;
816 state.last_connected = Some(Timestamp::now());
817 state.disconnected_at = None;
818 }
819
820 // Update metrics
821 if let Some(ref metrics) = self.metrics {
822 metrics.set_relay_connected(relay_url, true);
823 metrics.inc_connected_count();
824 }
825
826 // 2. SPAWN EVENT LOOP (moved from spawn_relay_connection)
827 // This MUST happen on every connection (initial or reconnect)
828 // because event loops die on disconnect and cannot be reused
829 let connection = match self.connections.get(relay_url) {
830 Some(c) => c.clone(),
831 None => {
832 tracing::error!(relay = %relay_url, "No RelayConnection found for connected relay");
833 return;
834 }
835 };
836
837 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
838
839 // Spawn event loop task
840 let relay_url_for_loop = relay_url.to_string();
841 tokio::spawn(async move {
842 connection.run_event_loop(event_tx).await;
843 tracing::debug!(relay = %relay_url_for_loop, "Event loop terminated");
844 });
845
846 // Spawn event processor task
847 let relay_url_clone = relay_url.to_string();
848 let database = Arc::clone(&self.database);
849 let write_policy = self.write_policy.clone();
850 let local_relay = self.local_relay.clone();
851 let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone();
852 let eose_tx = self.eose_tx.as_ref().unwrap().clone();
853 let metrics_clone = self.metrics.clone();
854
855 tokio::spawn(async move {
856 let mut disconnect_sent = false;
857 let mut eose_received = false;
858
859 while let Some(relay_event) = event_rx.recv().await {
860 match relay_event {
861 RelayEvent::Event(event) => {
862 if let Some(ref metrics) = metrics_clone {
863 let source = if eose_received {
864 event_source::LIVE
865 } else {
866 event_source::STARTUP
867 };
868 metrics.record_event(source);
869 }
870 Self::process_event_static(
871 &event,
872 &relay_url_clone,
873 &database,
874 &write_policy,
875 &local_relay,
876 )
877 .await;
878 }
879 RelayEvent::EndOfStoredEvents(sub_id) => {
880 eose_received = true;
881 tracing::debug!(
882 relay = %relay_url_clone,
883 sub_id = %sub_id,
884 "EOSE received, notifying SyncManager"
885 );
886 let _ = eose_tx
887 .send(EoseNotification {
888 relay_url: relay_url_clone.clone(),
889 sub_id,
890 })
891 .await;
892 }
893 RelayEvent::Closed(reason) => {
894 tracing::info!(
895 relay = %relay_url_clone,
896 reason = %reason,
897 "Relay connection closed"
898 );
899 if !disconnect_sent {
900 let _ = disconnect_tx
901 .send(DisconnectNotification {
902 relay_url: relay_url_clone.clone(),
903 })
904 .await;
905 disconnect_sent = true;
906 }
907 break;
908 }
909 RelayEvent::Shutdown => {
910 tracing::info!(relay = %relay_url_clone, "Relay shutdown detected");
911 if !disconnect_sent {
912 let _ = disconnect_tx
913 .send(DisconnectNotification {
914 relay_url: relay_url_clone.clone(),
915 })
916 .await;
917 disconnect_sent = true;
918 }
919 break;
920 }
921 }
922 }
923
924 // If the event channel closed without a Closed/Shutdown event
925 if !disconnect_sent {
926 tracing::info!(
927 relay = %relay_url_clone,
928 "Event channel closed, notifying SyncManager of disconnect"
929 );
930 let _ = disconnect_tx
931 .send(DisconnectNotification {
932 relay_url: relay_url_clone,
933 })
934 .await;
935 }
936 });
937
938 tracing::info!(
939 relay = %relay_url,
940 "Event loop and processor spawned for connected relay"
941 );
942
943 // 3. Decide reconnection strategy based on last_connected time
803 let last_connected = { 944 let last_connected = {
804 let index = self.relay_sync_index.read().await; 945 let index = self.relay_sync_index.read().await;
805 index.get(relay_url).and_then(|s| s.last_connected) 946 index.get(relay_url).and_then(|s| s.last_connected)
@@ -808,7 +949,7 @@ impl SyncManager {
808 if let Some(last) = last_connected { 949 if let Some(last) = last_connected {
809 let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); 950 let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs());
810 if elapsed < QUICK_RECONNECT_WINDOW_SECS { 951 if elapsed < QUICK_RECONNECT_WINDOW_SECS {
811 // short disconnect 952 // Short disconnect - quick reconnect
812 tracing::info!( 953 tracing::info!(
813 relay = %relay_url, 954 relay = %relay_url,
814 disconnect_secs = elapsed, 955 disconnect_secs = elapsed,
@@ -817,7 +958,7 @@ impl SyncManager {
817 self.quick_reconnect(relay_url, Timestamp::from(elapsed)) 958 self.quick_reconnect(relay_url, Timestamp::from(elapsed))
818 .await; 959 .await;
819 } else { 960 } else {
820 // long disconnect 961 // Long disconnect - fresh start
821 tracing::info!( 962 tracing::info!(
822 relay = %relay_url, 963 relay = %relay_url,
823 disconnect_secs = elapsed, 964 disconnect_secs = elapsed,
@@ -826,13 +967,13 @@ impl SyncManager {
826 self.fresh_start(relay_url).await; 967 self.fresh_start(relay_url).await;
827 } 968 }
828 } else { 969 } else {
829 // not successfully connected before (since launching binary) 970 // First connection - fresh start
830 tracing::info!( 971 tracing::info!(
831 relay = %relay_url, 972 relay = %relay_url,
832 "First connection - initiating fresh_start" 973 "First connection - initiating fresh_start"
833 ); 974 );
834 self.fresh_start(relay_url).await; 975 self.fresh_start(relay_url).await;
835 }; 976 }
836 } 977 }
837 978
838 /// Fresh start - clears state and does full sync 979 /// Fresh start - clears state and does full sync
@@ -846,7 +987,7 @@ impl SyncManager {
846 /// 4. L1 live + L1 historic (negentropy if available) 987 /// 4. L1 live + L1 historic (negentropy if available)
847 /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3 988 /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3
848 async fn fresh_start(&mut self, relay_url: &str) { 989 async fn fresh_start(&mut self, relay_url: &str) {
849 let now = Timestamp::now(); 990 let _now = Timestamp::now();
850 991
851 tracing::info!(relay = %relay_url, "Starting fresh_start"); 992 tracing::info!(relay = %relay_url, "Starting fresh_start");
852 993
@@ -877,28 +1018,38 @@ impl SyncManager {
877 ); 1018 );
878 } 1019 }
879 if state.connection_status == ConnectionStatus::Connected { 1020 if state.connection_status == ConnectionStatus::Connected {
880 // TODO start layer 1
881 drop(index); 1021 drop(index);
1022 self.sync_generic_filters(relay_url, None).await;
1023 // Step 5: compute_actions for L2+L3 (will be triggered by EOSE)
882 self.recompute_new_sync_filters_for_relay(relay_url).await; 1024 self.recompute_new_sync_filters_for_relay(relay_url).await;
883 } 1025 }
884 } else { 1026 } else {
885 drop(index); 1027 drop(index);
886 return self.spawn_relay_connection(relay_url.to_string()).await;
887 } 1028 }
888 } 1029 }
889 } 1030 }
890 1031
1032 async fn sync_generic_filters(&mut self, relay_url: &str, since: Option<Timestamp>) {
1033 let filters = vec![filters::build_announcement_filter(None)];
1034 self.sync_live(relay_url, &filters).await;
1035
1036 // Use historic_sync with empty PendingItems for generic filters
1037 // Generic filters (announcements) don't have associated repos or root_events
1038 let items = PendingItems::default();
1039 self.historic_sync(relay_url, filters, items, since).await;
1040 }
1041
891 /// Quick reconnect - for disconnections < 15 minutes 1042 /// Quick reconnect - for disconnections < 15 minutes
892 /// 1043 ///
893 /// Flow: 1044 /// Re-establishes subscriptions after a brief disconnection by:
894 /// 1. Clear PendingSyncIndex for this relay 1045 /// 1. Clearing stale PendingSyncIndex entries
895 /// 2. Update connection state to Connected 1046 /// 2. Syncing L1 filters with since timestamp (announcements)
896 /// 3. L1 live + L1 historic(since) 1047 /// 3. Rebuilding L2+L3 from preserved RelaySyncIndex state
897 /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since) 1048 /// 4. Computing actions for new items discovered during catchup
898 /// 5. compute_actions for any new items discovered during catchup 1049 ///
1050 /// Basic connection state and metrics are managed by handle_connect_or_reconnect.
1051 /// This method handles reconnect-specific concerns (health tracking, reconnect metrics).
899 async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { 1052 async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) {
900 let now = Timestamp::now();
901
902 tracing::info!( 1053 tracing::info!(
903 relay = %relay_url, 1054 relay = %relay_url,
904 since = %since, 1055 since = %since,
@@ -917,56 +1068,36 @@ impl SyncManager {
917 } 1068 }
918 } 1069 }
919 1070
920 // Step 2: Update connection state (preserve repos/root_events - that's the point!) 1071 // Record successful reconnection in health tracker
921 {
922 let mut index = self.relay_sync_index.write().await;
923 let state = index.entry(relay_url.to_string()).or_default();
924 state.connection_status = ConnectionStatus::Connected;
925 state.last_connected = Some(now);
926 state.disconnected_at = None;
927 }
928
929 // Record success in health tracker
930 self.health_tracker.record_success(relay_url); 1072 self.health_tracker.record_success(relay_url);
931 1073
932 // Update metrics 1074 // Record reconnect-specific metrics (not basic connection metrics)
933 if let Some(ref metrics) = self.metrics { 1075 if let Some(ref metrics) = self.metrics {
934 metrics.set_relay_connected(relay_url, true);
935 metrics.inc_connected_count();
936 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 1076 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
937 metrics.record_event(event_source::RECONNECT); 1077 metrics.record_event(event_source::RECONNECT);
938 } 1078 }
939 1079
940 // Step 3: L1 live + L1 historic with since filter 1080 // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed)
941 // L1 live subscription (since=now for ongoing events) 1081 let announcement_since = {
942 let live_filter = filters::build_announcement_filter(Some(now)); 1082 let index = self.relay_sync_index.read().await;
943 if let Some(connection) = self.connections.get(relay_url) { 1083 if let Some(state) = index.get(relay_url) {
944 if let Err(e) = connection.subscribe_filter(live_filter).await { 1084 if state.announcements_synced {
945 tracing::error!( 1085 Some(since) // Can use incremental sync
946 relay = %relay_url, 1086 } else {
947 error = %e, 1087 None // Need full sync - announcements never completed
948 "Failed to set up L1 live subscription in quick_reconnect" 1088 }
949 ); 1089 } else {
950 } 1090 None
951 }
952
953 // L1 historic with since filter (catch up on missed announcements)
954 let layer1_filter = filters::build_announcement_filter(Some(since));
955 if let Some(connection) = self.connections.get(relay_url) {
956 if let Err(e) = connection.subscribe_filter(layer1_filter).await {
957 tracing::error!(
958 relay = %relay_url,
959 error = %e,
960 "Failed to subscribe to L1 historic filter in quick_reconnect"
961 );
962 } 1091 }
963 } 1092 };
1093 self.sync_generic_filters(relay_url, announcement_since)
1094 .await;
964 1095
965 // Step 4: Rebuild L2+L3 from confirmed state with since filter 1096 // Step 3: Rebuild L2+L3 from confirmed state with since filter
966 // This uses the preserved repos/root_events from RelaySyncIndex 1097 // This uses the preserved repos/root_events from RelaySyncIndex
967 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; 1098 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
968 1099
969 // Step 5: compute_actions for any NEW items discovered while disconnected 1100 // Step 4: compute_actions for any NEW items discovered while disconnected
970 self.recompute_new_sync_filters_for_relay(relay_url).await; 1101 self.recompute_new_sync_filters_for_relay(relay_url).await;
971 1102
972 tracing::info!(relay = %relay_url, "quick_reconnect complete"); 1103 tracing::info!(relay = %relay_url, "quick_reconnect complete");
@@ -1038,6 +1169,119 @@ impl SyncManager {
1038 } 1169 }
1039 } 1170 }
1040 1171
1172 /// Register a relay for managed connection/reconnection
1173 ///
1174 /// Creates a RelayConnection object and stores it in the connections HashMap.
1175 /// Also initializes RelayState if it doesn't exist.
1176 /// Does NOT connect - connection happens via try_connect_relay or retry_disconnected_relays.
1177 /// The RelayConnection persists forever and is reused on reconnects.
1178 async fn register_relay(&mut self, relay_url: String) {
1179 // Create RelayConnection if not exists
1180 if !self.connections.contains_key(&relay_url) {
1181 let connection =
1182 RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database));
1183 self.connections.insert(relay_url.clone(), connection);
1184 tracing::debug!(relay = %relay_url, "Registered new relay connection");
1185 }
1186
1187 // Initialize RelayState if not exists
1188 let is_new = {
1189 let mut index = self.relay_sync_index.write().await;
1190 if !index.contains_key(&relay_url) {
1191 let new_state = RelayState {
1192 connection_status: ConnectionStatus::Disconnected,
1193 is_bootstrap: false,
1194 last_connected: None,
1195 disconnected_at: None,
1196 repos: HashSet::new(),
1197 root_events: HashSet::new(),
1198 announcements_synced: false,
1199 };
1200 index.insert(relay_url.clone(), new_state);
1201 true
1202 } else {
1203 false
1204 }
1205 };
1206
1207 // Track new relay in metrics
1208 if is_new {
1209 if let Some(ref metrics) = self.metrics {
1210 metrics.inc_tracked_count();
1211 }
1212 tracing::info!(relay = %relay_url, "Registered new relay for tracking");
1213 }
1214 }
1215
1216 /// Attempt a single connection to a registered relay
1217 ///
1218 /// Uses the existing RelayConnection from the HashMap and attempts to connect.
1219 /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect.
1220 /// On failure, updates state and health tracker.
1221 async fn try_connect_relay(&mut self, relay_url: &str) {
1222 // 1. Mark attempting (optional, helpful for debugging)
1223 {
1224 let mut index = self.relay_sync_index.write().await;
1225 if let Some(state) = index.get_mut(relay_url) {
1226 state.connection_status = ConnectionStatus::Connecting;
1227 }
1228 }
1229
1230 // 2. Record attempt in health tracker
1231 self.health_tracker.record_attempt(relay_url);
1232
1233 // 3. Get connection and attempt
1234 let connection = match self.connections.get(relay_url) {
1235 Some(c) => c,
1236 None => {
1237 tracing::error!(relay = %relay_url, "No RelayConnection registered");
1238 return;
1239 }
1240 };
1241
1242 let timeout = self.health_tracker.base_backoff_secs();
1243
1244 match connection.connect_and_subscribe(None, timeout).await {
1245 Ok(_) => {
1246 // Success - record and send notification
1247 self.health_tracker.record_success(relay_url);
1248
1249 if let Some(ref metrics) = self.metrics {
1250 metrics.record_connection_attempt(relay_url, true);
1251 }
1252
1253 if let Some(ref connect_tx) = self.connect_tx {
1254 let _ = connect_tx
1255 .send(ConnectNotification {
1256 relay_url: relay_url.to_string(),
1257 })
1258 .await;
1259 }
1260 }
1261 Err(e) => {
1262 tracing::error!(relay = %relay_url, error = %e, "Connection failed");
1263
1264 // 4. Update state back to Disconnected on failure
1265 {
1266 let mut index = self.relay_sync_index.write().await;
1267 if let Some(state) = index.get_mut(relay_url) {
1268 state.connection_status = ConnectionStatus::Disconnected;
1269 }
1270 }
1271
1272 // 5. Record failure in health tracker
1273 self.health_tracker.record_failure(relay_url);
1274
1275 // 6. Update metrics
1276 if let Some(ref metrics) = self.metrics {
1277 metrics.record_connection_attempt(relay_url, false);
1278 metrics
1279 .record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1280 }
1281 }
1282 }
1283 }
1284
1041 /// Recompute sync actions for a specific relay 1285 /// Recompute sync actions for a specific relay
1042 /// 1286 ///
1043 /// Uses derive_relay_targets and compute_actions to find new items 1287 /// Uses derive_relay_targets and compute_actions to find new items
@@ -1136,13 +1380,13 @@ impl SyncManager {
1136 } 1380 }
1137 } 1381 }
1138 1382
1139 // 3. Remove from active connections 1383 // 3. Keep RelayConnection in HashMap for reuse on reconnect
1140 if self.connections.remove(relay_url).is_some() { 1384 // The connection object persists and will be reused when retry_disconnected_relays
1141 tracing::debug!( 1385 // calls try_connect_relay -> connection.connect_and_subscribe()
1142 relay = %relay_url, 1386 tracing::debug!(
1143 "Removed relay from active connections" 1387 relay = %relay_url,
1144 ); 1388 "Keeping RelayConnection in HashMap for reconnection"
1145 } 1389 );
1146 1390
1147 // 4. Record failure in health tracker 1391 // 4. Record failure in health tracker
1148 self.health_tracker.record_failure(relay_url); 1392 self.health_tracker.record_failure(relay_url);
@@ -1161,246 +1405,6 @@ impl SyncManager {
1161 ); 1405 );
1162 } 1406 }
1163 1407
1164 /// Spawn a relay connection and start its event loop
1165 ///
1166 /// Creates a new RelayConnection, connects to Layer 1, stores the connection,
1167 /// and spawns event processing tasks. Uses stored channel senders for notifications.
1168 async fn spawn_relay_connection(&mut self, relay_url: String) {
1169 use tokio::sync::mpsc;
1170
1171 // Get channel senders (must exist during run)
1172 let disconnect_tx = match &self.disconnect_tx {
1173 Some(tx) => tx.clone(),
1174 None => {
1175 tracing::error!(
1176 relay = %relay_url,
1177 "Cannot spawn connection - channels not initialized"
1178 );
1179 return;
1180 }
1181 };
1182 let eose_tx = match &self.eose_tx {
1183 Some(tx) => tx.clone(),
1184 None => {
1185 tracing::error!(
1186 relay = %relay_url,
1187 "Cannot spawn connection - channels not initialized"
1188 );
1189 return;
1190 }
1191 };
1192 let connect_tx = match &self.connect_tx {
1193 Some(tx) => tx.clone(),
1194 None => {
1195 tracing::error!(
1196 relay = %relay_url,
1197 "Cannot spawn connection - channels not initialized"
1198 );
1199 return;
1200 }
1201 };
1202
1203 let database = Arc::clone(&self.database);
1204 let write_policy = self.write_policy.clone();
1205 let local_relay = self.local_relay.clone();
1206 let relay_sync_index = Arc::clone(&self.relay_sync_index);
1207
1208 // Check if this is a bootstrap relay
1209 let is_bootstrap = self.bootstrap_relay_url.as_ref() == Some(&relay_url);
1210
1211 // Create relay connection with database for negentropy sync support
1212 let connection =
1213 RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database));
1214
1215 // Get connection timeout from health tracker (capped at base backoff)
1216 // This ensures the connection attempt completes before the next retry would be scheduled
1217 let connection_timeout_secs = self.health_tracker.base_backoff_secs();
1218
1219 // Connect and subscribe to Layer 1
1220 match connection
1221 .connect_and_subscribe(None, connection_timeout_secs)
1222 .await
1223 {
1224 Ok(_) => {
1225 // Record successful connection attempt
1226 if let Some(ref metrics) = self.metrics {
1227 metrics.record_connection_attempt(&relay_url, true);
1228 }
1229 }
1230 Err(e) => {
1231 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
1232
1233 // Record failed connection attempt
1234 if let Some(ref metrics) = self.metrics {
1235 metrics.record_connection_attempt(&relay_url, false);
1236 }
1237
1238 // Record failure in health tracker
1239 self.health_tracker.record_failure(&relay_url);
1240
1241 // Record health state in metrics
1242 if let Some(ref metrics) = self.metrics {
1243 metrics
1244 .record_health_state(&relay_url, self.health_tracker.get_state(&relay_url));
1245 }
1246
1247 // Update state to disconnected on failure
1248 {
1249 let mut index = relay_sync_index.write().await;
1250 if let Some(state) = index.get_mut(&relay_url) {
1251 state.connection_status = ConnectionStatus::Disconnected;
1252 }
1253 }
1254 return;
1255 }
1256 }
1257
1258 // Mark as connected in relay sync index
1259 // Track whether this is a new relay for metrics
1260 let is_new_relay = {
1261 let mut index = relay_sync_index.write().await;
1262 let is_new = !index.contains_key(&relay_url);
1263 let state = index.entry(relay_url.clone()).or_default();
1264 state.connection_status = ConnectionStatus::Connected;
1265 state.is_bootstrap = is_bootstrap;
1266 state.last_connected = Some(Timestamp::now());
1267 state.disconnected_at = None;
1268 is_new
1269 };
1270
1271 // Increment tracked count for new relays
1272 if is_new_relay {
1273 if let Some(ref metrics) = self.metrics {
1274 metrics.inc_tracked_count();
1275 }
1276 }
1277
1278 // Store connection in HashMap BEFORE sending notification
1279 // This ensures it's available when handle_connect_or_reconnect is called
1280 self.connections.insert(relay_url.clone(), connection);
1281
1282 tracing::info!(
1283 relay = %relay_url,
1284 is_bootstrap = is_bootstrap,
1285 "Spawned relay connection"
1286 );
1287
1288 // Notify SyncManager of successful connection
1289 let _ = connect_tx
1290 .send(ConnectNotification {
1291 relay_url: relay_url.clone(),
1292 })
1293 .await;
1294
1295 // Clone the connection for the event loop spawn
1296 // The stored connection is used for subscription management
1297 let connection_for_loop = self.connections.get(&relay_url).unwrap().clone();
1298
1299 // Create event channel
1300 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
1301
1302 // Spawn event loop with cloned connection
1303 tokio::spawn(async move {
1304 connection_for_loop.run_event_loop(event_tx).await;
1305 });
1306
1307 // Spawn event processor
1308 let relay_url_clone = relay_url.clone();
1309 let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task
1310 tokio::spawn(async move {
1311 // Track whether we've already sent a disconnect notification
1312 let mut disconnect_sent = false;
1313 // Track whether EOSE has been received - events before EOSE are "startup", after are "live"
1314 let mut eose_received = false;
1315
1316 while let Some(relay_event) = event_rx.recv().await {
1317 match relay_event {
1318 RelayEvent::Event(event) => {
1319 if let Some(ref metrics) = metrics_clone {
1320 // Events before EOSE are "startup", events after EOSE are "live"
1321 let source = if eose_received {
1322 event_source::LIVE
1323 } else {
1324 event_source::STARTUP
1325 };
1326 metrics.record_event(source);
1327 }
1328 Self::process_event_static(
1329 &event,
1330 &relay_url_clone,
1331 &database,
1332 &write_policy,
1333 &local_relay,
1334 )
1335 .await;
1336 }
1337 RelayEvent::EndOfStoredEvents(sub_id) => {
1338 // Mark EOSE as received - subsequent events are "live"
1339 eose_received = true;
1340 tracing::debug!(
1341 relay = %relay_url_clone,
1342 sub_id = %sub_id,
1343 "EOSE received, notifying SyncManager"
1344 );
1345 // Notify SyncManager of EOSE
1346 let _ = eose_tx
1347 .send(EoseNotification {
1348 relay_url: relay_url_clone.clone(),
1349 sub_id,
1350 })
1351 .await;
1352 }
1353 RelayEvent::Closed(reason) => {
1354 tracing::info!(
1355 relay = %relay_url_clone,
1356 reason = %reason,
1357 "Relay connection closed"
1358 );
1359 // Notify SyncManager of disconnect
1360 let _ = disconnect_tx
1361 .send(DisconnectNotification {
1362 relay_url: relay_url_clone.clone(),
1363 })
1364 .await;
1365 disconnect_sent = true;
1366 break;
1367 }
1368 RelayEvent::Shutdown => {
1369 tracing::info!(relay = %relay_url_clone, "Relay shutdown detected");
1370 // Notify SyncManager of disconnect
1371 let _ = disconnect_tx
1372 .send(DisconnectNotification {
1373 relay_url: relay_url_clone.clone(),
1374 })
1375 .await;
1376 disconnect_sent = true;
1377 break;
1378 }
1379 }
1380 }
1381
1382 // If the event channel closed without a Closed/Shutdown event
1383 // (e.g., connection dropped unexpectedly), still notify SyncManager
1384 if !disconnect_sent {
1385 tracing::info!(
1386 relay = %relay_url_clone,
1387 "Event channel closed, notifying SyncManager of disconnect"
1388 );
1389 let _ = disconnect_tx
1390 .send(DisconnectNotification {
1391 relay_url: relay_url_clone.clone(),
1392 })
1393 .await;
1394 }
1395 });
1396
1397 tracing::info!(
1398 relay = %relay_url,
1399 is_bootstrap = is_bootstrap,
1400 "Spawned relay connection"
1401 );
1402 }
1403
1404 /// Process a single event from a relay (static version for spawned tasks) 1408 /// Process a single event from a relay (static version for spawned tasks)
1405 /// 1409 ///
1406 /// Processes events with dedup, policy check, database save, and broadcast: 1410 /// Processes events with dedup, policy check, database save, and broadcast:
@@ -1759,7 +1763,7 @@ impl SyncManager {
1759 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); 1763 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up");
1760 } 1764 }
1761 1765
1762 /// Check for disconnected relays that should be reconnected 1766 /// Retry disconnected relays that are ready for reconnection
1763 /// 1767 ///
1764 /// This method is called periodically by run_disconnect_checker. 1768 /// This method is called periodically by run_disconnect_checker.
1765 /// It identifies relays that: 1769 /// It identifies relays that:
@@ -1767,8 +1771,8 @@ impl SyncManager {
1767 /// - Have repos or root events to sync (not empty) 1771 /// - Have repos or root events to sync (not empty)
1768 /// - Have passed the exponential backoff period (respects health tracker) 1772 /// - Have passed the exponential backoff period (respects health tracker)
1769 /// 1773 ///
1770 /// For each eligible relay, a reconnection is attempted via spawn_relay_connection. 1774 /// For each eligible relay, a reconnection is attempted via try_connect_relay.
1771 async fn check_reconnects(&mut self) { 1775 async fn retry_disconnected_relays(&mut self) {
1772 // Collect relays to reconnect 1776 // Collect relays to reconnect
1773 let to_reconnect: Vec<String> = { 1777 let to_reconnect: Vec<String> = {
1774 let index = self.relay_sync_index.read().await; 1778 let index = self.relay_sync_index.read().await;
@@ -1813,7 +1817,7 @@ impl SyncManager {
1813 health_state = %self.health_tracker.get_state(&relay_url), 1817 health_state = %self.health_tracker.get_state(&relay_url),
1814 "Attempting reconnection" 1818 "Attempting reconnection"
1815 ); 1819 );
1816 self.spawn_relay_connection(relay_url).await; 1820 self.try_connect_relay(&relay_url).await;
1817 } 1821 }
1818 } 1822 }
1819 1823