diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-18 12:06:57 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-18 12:06:57 +0000 |
| commit | e98a3850b6dcd3bbd5d251896ef56199cd49dc33 (patch) | |
| tree | 8b0314c3a6633f9105a53305ada3b199e593dd84 /src/sync | |
| parent | 03f074d0d0840b946a356badde75551d61c0f84c (diff) | |
sync: new connection logic
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/algorithms.rs | 4 | ||||
| -rw-r--r-- | src/sync/health.rs | 15 | ||||
| -rw-r--r-- | src/sync/mod.rs | 668 |
3 files changed, 355 insertions, 332 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 84248b1..a6e0787 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -348,6 +348,7 @@ mod tests { | |||
| 348 | connection_status: ConnectionStatus::Disconnected, | 348 | connection_status: ConnectionStatus::Disconnected, |
| 349 | last_connected: None, | 349 | last_connected: None, |
| 350 | disconnected_at: None, | 350 | disconnected_at: None, |
| 351 | announcements_synced: false, | ||
| 351 | }, | 352 | }, |
| 352 | ); | 353 | ); |
| 353 | 354 | ||
| @@ -435,6 +436,7 @@ mod tests { | |||
| 435 | connection_status: ConnectionStatus::Connected, | 436 | connection_status: ConnectionStatus::Connected, |
| 436 | last_connected: None, | 437 | last_connected: None, |
| 437 | disconnected_at: None, | 438 | disconnected_at: None, |
| 439 | announcements_synced: false, | ||
| 438 | }, | 440 | }, |
| 439 | ); | 441 | ); |
| 440 | 442 | ||
| @@ -468,6 +470,7 @@ mod tests { | |||
| 468 | connection_status: ConnectionStatus::Connecting, | 470 | connection_status: ConnectionStatus::Connecting, |
| 469 | last_connected: None, | 471 | last_connected: None, |
| 470 | disconnected_at: None, | 472 | disconnected_at: None, |
| 473 | announcements_synced: false, | ||
| 471 | }, | 474 | }, |
| 472 | ); | 475 | ); |
| 473 | 476 | ||
| @@ -523,6 +526,7 @@ mod tests { | |||
| 523 | connection_status: ConnectionStatus::Connected, | 526 | connection_status: ConnectionStatus::Connected, |
| 524 | last_connected: None, | 527 | last_connected: None, |
| 525 | disconnected_at: None, | 528 | disconnected_at: None, |
| 529 | announcements_synced: false, | ||
| 526 | }, | 530 | }, |
| 527 | ); | 531 | ); |
| 528 | 532 | ||
diff --git a/src/sync/health.rs b/src/sync/health.rs index 0ae7dee..d919a80 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs | |||
| @@ -64,6 +64,8 @@ pub struct RelayHealth { | |||
| 64 | pub last_failure_time: Option<Instant>, | 64 | pub last_failure_time: Option<Instant>, |
| 65 | /// Time of the last successful connection | 65 | /// Time of the last successful connection |
| 66 | pub last_success_time: Option<Instant>, | 66 | pub last_success_time: Option<Instant>, |
| 67 | /// Time of the last connection attempt (success or failure) | ||
| 68 | pub last_attempt_time: Option<Instant>, | ||
| 67 | /// Next time a connection attempt should be made | 69 | /// Next time a connection attempt should be made |
| 68 | pub next_retry_at: Option<Instant>, | 70 | pub next_retry_at: Option<Instant>, |
| 69 | } | 71 | } |
| @@ -76,6 +78,7 @@ impl Default for RelayHealth { | |||
| 76 | first_failure_time: None, | 78 | first_failure_time: None, |
| 77 | last_failure_time: None, | 79 | last_failure_time: None, |
| 78 | last_success_time: None, | 80 | last_success_time: None, |
| 81 | last_attempt_time: None, | ||
| 79 | next_retry_at: None, | 82 | next_retry_at: None, |
| 80 | } | 83 | } |
| 81 | } | 84 | } |
| @@ -132,6 +135,17 @@ impl RelayHealthTracker { | |||
| 132 | self.base_backoff_secs | 135 | self.base_backoff_secs |
| 133 | } | 136 | } |
| 134 | 137 | ||
| 138 | /// Record a connection attempt (updates last_attempt_time) | ||
| 139 | /// | ||
| 140 | /// This should be called before trying to connect, to track when | ||
| 141 | /// attempts are made regardless of success or failure. | ||
| 142 | pub fn record_attempt(&self, relay_url: &str) { | ||
| 143 | let now = Instant::now(); | ||
| 144 | let mut entry = self.health.entry(relay_url.to_string()).or_default(); | ||
| 145 | let health = entry.value_mut(); | ||
| 146 | health.last_attempt_time = Some(now); | ||
| 147 | } | ||
| 148 | |||
| 135 | /// Record a successful connection to a relay | 149 | /// Record a successful connection to a relay |
| 136 | /// | 150 | /// |
| 137 | /// Resets the relay to Healthy state and clears failure counters. | 151 | /// Resets the relay to Healthy state and clears failure counters. |
| @@ -148,6 +162,7 @@ impl RelayHealthTracker { | |||
| 148 | health.first_failure_time = None; | 162 | health.first_failure_time = None; |
| 149 | health.last_failure_time = None; | 163 | health.last_failure_time = None; |
| 150 | health.last_success_time = Some(now); | 164 | health.last_success_time = Some(now); |
| 165 | health.last_attempt_time = Some(now); | ||
| 151 | health.next_retry_at = None; | 166 | health.next_retry_at = None; |
| 152 | 167 | ||
| 153 | if old_state != HealthState::Healthy { | 168 | if old_state != HealthState::Healthy { |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3c50387..5bea701 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -103,6 +103,9 @@ pub struct RelayState { | |||
| 103 | pub last_connected: Option<Timestamp>, | 103 | pub last_connected: Option<Timestamp>, |
| 104 | /// When we disconnected - for 15-minute state retention rule | 104 | /// When we disconnected - for 15-minute state retention rule |
| 105 | pub disconnected_at: Option<Timestamp>, | 105 | pub disconnected_at: Option<Timestamp>, |
| 106 | /// Whether announcement filter historic sync has completed for this relay | ||
| 107 | /// Used to determine if we can use `since` filter on reconnect for Layer 1 | ||
| 108 | pub announcements_synced: bool, | ||
| 106 | } | 109 | } |
| 107 | 110 | ||
| 108 | impl Default for RelayState { | 111 | impl Default for RelayState { |
| @@ -114,6 +117,7 @@ impl Default for RelayState { | |||
| 114 | connection_status: ConnectionStatus::Disconnected, | 117 | connection_status: ConnectionStatus::Disconnected, |
| 115 | last_connected: None, | 118 | last_connected: None, |
| 116 | disconnected_at: None, | 119 | disconnected_at: None, |
| 120 | announcements_synced: false, | ||
| 117 | } | 121 | } |
| 118 | } | 122 | } |
| 119 | } | 123 | } |
| @@ -134,6 +138,7 @@ impl RelayState { | |||
| 134 | pub fn clear_sync_state(&mut self) { | 138 | pub fn clear_sync_state(&mut self) { |
| 135 | self.repos.clear(); | 139 | self.repos.clear(); |
| 136 | self.root_events.clear(); | 140 | self.root_events.clear(); |
| 141 | self.announcements_synced = false; | ||
| 137 | } | 142 | } |
| 138 | } | 143 | } |
| 139 | 144 | ||
| @@ -301,7 +306,7 @@ async fn run_disconnect_checker( | |||
| 301 | 306 | ||
| 302 | let mut manager = sync_manager.lock().await; | 307 | let mut manager = sync_manager.lock().await; |
| 303 | manager.check_disconnects().await; | 308 | manager.check_disconnects().await; |
| 304 | manager.check_reconnects().await; | 309 | manager.retry_disconnected_relays().await; |
| 305 | } | 310 | } |
| 306 | _ = shutdown_rx.recv() => { | 311 | _ = shutdown_rx.recv() => { |
| 307 | tracing::info!("Disconnect checker received shutdown signal"); | 312 | tracing::info!("Disconnect checker received shutdown signal"); |
| @@ -475,6 +480,9 @@ impl SyncManager { | |||
| 475 | /// move repos and root_events from pending to confirmed state. This unified | 480 | /// move repos and root_events from pending to confirmed state. This unified |
| 476 | /// flow ensures consistent state tracking regardless of sync method. | 481 | /// flow ensures consistent state tracking regardless of sync method. |
| 477 | /// | 482 | /// |
| 483 | /// For generic filter batches (identified by empty repos and root_events), | ||
| 484 | /// this sets the announcements_synced flag to enable incremental sync on reconnect. | ||
| 485 | /// | ||
| 478 | /// # Arguments | 486 | /// # Arguments |
| 479 | /// * `relay_url` - The relay URL the batch belongs to | 487 | /// * `relay_url` - The relay URL the batch belongs to |
| 480 | /// * `batch` - The completed batch to confirm | 488 | /// * `batch` - The completed batch to confirm |
| @@ -483,6 +491,7 @@ impl SyncManager { | |||
| 483 | let repos_count = batch.items.repos.len(); | 491 | let repos_count = batch.items.repos.len(); |
| 484 | let events_count = batch.items.root_events.len(); | 492 | let events_count = batch.items.root_events.len(); |
| 485 | let sync_method = batch.sync_method; | 493 | let sync_method = batch.sync_method; |
| 494 | let is_generic_filter = repos_count == 0 && events_count == 0; | ||
| 486 | 495 | ||
| 487 | let mut relay_index = self.relay_sync_index.write().await; | 496 | let mut relay_index = self.relay_sync_index.write().await; |
| 488 | 497 | ||
| @@ -492,6 +501,17 @@ impl SyncManager { | |||
| 492 | // Move root_events to confirmed | 501 | // Move root_events to confirmed |
| 493 | state.root_events.extend(batch.items.root_events.clone()); | 502 | state.root_events.extend(batch.items.root_events.clone()); |
| 494 | 503 | ||
| 504 | // Set announcements_synced flag for generic filter batches | ||
| 505 | if is_generic_filter { | ||
| 506 | state.announcements_synced = true; | ||
| 507 | tracing::info!( | ||
| 508 | relay = %relay_url, | ||
| 509 | batch_id = batch_id, | ||
| 510 | sync_method = ?sync_method, | ||
| 511 | "Generic filter (announcements) historic sync complete - announcements_synced set to true" | ||
| 512 | ); | ||
| 513 | } | ||
| 514 | |||
| 495 | // DEBUG TRACING: Log the root events being confirmed | 515 | // DEBUG TRACING: Log the root events being confirmed |
| 496 | tracing::info!( | 516 | tracing::info!( |
| 497 | relay = %relay_url, | 517 | relay = %relay_url, |
| @@ -503,6 +523,8 @@ impl SyncManager { | |||
| 503 | total_repos = state.repos.len(), | 523 | total_repos = state.repos.len(), |
| 504 | total_root_events = state.root_events.len(), | 524 | total_root_events = state.root_events.len(), |
| 505 | all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | 525 | all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), |
| 526 | is_generic_filter = is_generic_filter, | ||
| 527 | announcements_synced = state.announcements_synced, | ||
| 506 | "Batch confirmed - items moved from pending to confirmed" | 528 | "Batch confirmed - items moved from pending to confirmed" |
| 507 | ); | 529 | ); |
| 508 | } else { | 530 | } else { |
| @@ -623,7 +645,8 @@ impl SyncManager { | |||
| 623 | 645 | ||
| 624 | // 6. Connect to bootstrap relay if configured | 646 | // 6. Connect to bootstrap relay if configured |
| 625 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { | 647 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { |
| 626 | self.spawn_relay_connection(bootstrap_url.clone()).await; | 648 | self.register_relay(bootstrap_url.clone()).await; |
| 649 | self.try_connect_relay(bootstrap_url).await; | ||
| 627 | } | 650 | } |
| 628 | 651 | ||
| 629 | // 7. Capture config values before moving self into Arc | 652 | // 7. Capture config values before moving self into Arc |
| @@ -731,33 +754,16 @@ impl SyncManager { | |||
| 731 | 754 | ||
| 732 | match connection_status { | 755 | match connection_status { |
| 733 | None => { | 756 | None => { |
| 734 | // New relay - create entry with Connecting status | 757 | // New relay - register and connect |
| 735 | { | ||
| 736 | let mut index = self.relay_sync_index.write().await; | ||
| 737 | let new_state = RelayState { | ||
| 738 | connection_status: ConnectionStatus::Connecting, | ||
| 739 | is_bootstrap: false, // Only bootstrap relays set this to true | ||
| 740 | last_connected: None, | ||
| 741 | disconnected_at: None, | ||
| 742 | repos: HashSet::new(), | ||
| 743 | root_events: HashSet::new(), | ||
| 744 | }; | ||
| 745 | index.insert(action.relay_url.clone(), new_state); | ||
| 746 | } | ||
| 747 | |||
| 748 | // Track new relay in metrics | ||
| 749 | if let Some(ref metrics) = self.metrics { | ||
| 750 | metrics.inc_tracked_count(); | ||
| 751 | } | ||
| 752 | |||
| 753 | tracing::info!( | 758 | tracing::info!( |
| 754 | relay = %action.relay_url, | 759 | relay = %action.relay_url, |
| 755 | repos = action.items.repos.len(), | 760 | repos = action.items.repos.len(), |
| 756 | "Spawning connection for new relay" | 761 | "Registering and connecting to new relay" |
| 757 | ); | 762 | ); |
| 758 | 763 | ||
| 759 | // Spawn connection for new relay | 764 | // Register relay (creates RelayConnection, initializes RelayState, updates metrics) |
| 760 | self.spawn_relay_connection(action.relay_url.clone()).await; | 765 | self.register_relay(action.relay_url.clone()).await; |
| 766 | self.try_connect_relay(&action.relay_url).await; | ||
| 761 | // Connection will trigger handle_connect_or_reconnect which will process items | 767 | // Connection will trigger handle_connect_or_reconnect which will process items |
| 762 | return; | 768 | return; |
| 763 | } | 769 | } |
| @@ -795,11 +801,146 @@ impl SyncManager { | |||
| 795 | 801 | ||
| 796 | /// Handle a connection success (called when a relay connects or reconnects) | 802 | /// Handle a connection success (called when a relay connects or reconnects) |
| 797 | /// | 803 | /// |
| 798 | /// This method dispatches to the appropriate reconnection strategy: | 804 | /// This method: |
| 799 | /// - `fresh_start()` if never connected before | 805 | /// 1. Updates RelayState to Connected |
| 800 | /// - `quick_reconnect()` if disconnected < 15 minutes | 806 | /// 2. Spawns event loop (MUST happen on every connection/reconnect) |
| 801 | /// - `long_reconnect()` if disconnected > 15 minutes | 807 | /// 3. Dispatches to appropriate reconnection strategy based on disconnect time |
| 802 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | 808 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { |
| 809 | use tokio::sync::mpsc; | ||
| 810 | |||
| 811 | // 1. Update state to Connected | ||
| 812 | { | ||
| 813 | let mut index = self.relay_sync_index.write().await; | ||
| 814 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 815 | state.connection_status = ConnectionStatus::Connected; | ||
| 816 | state.last_connected = Some(Timestamp::now()); | ||
| 817 | state.disconnected_at = None; | ||
| 818 | } | ||
| 819 | |||
| 820 | // Update metrics | ||
| 821 | if let Some(ref metrics) = self.metrics { | ||
| 822 | metrics.set_relay_connected(relay_url, true); | ||
| 823 | metrics.inc_connected_count(); | ||
| 824 | } | ||
| 825 | |||
| 826 | // 2. SPAWN EVENT LOOP (moved from spawn_relay_connection) | ||
| 827 | // This MUST happen on every connection (initial or reconnect) | ||
| 828 | // because event loops die on disconnect and cannot be reused | ||
| 829 | let connection = match self.connections.get(relay_url) { | ||
| 830 | Some(c) => c.clone(), | ||
| 831 | None => { | ||
| 832 | tracing::error!(relay = %relay_url, "No RelayConnection found for connected relay"); | ||
| 833 | return; | ||
| 834 | } | ||
| 835 | }; | ||
| 836 | |||
| 837 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | ||
| 838 | |||
| 839 | // Spawn event loop task | ||
| 840 | let relay_url_for_loop = relay_url.to_string(); | ||
| 841 | tokio::spawn(async move { | ||
| 842 | connection.run_event_loop(event_tx).await; | ||
| 843 | tracing::debug!(relay = %relay_url_for_loop, "Event loop terminated"); | ||
| 844 | }); | ||
| 845 | |||
| 846 | // Spawn event processor task | ||
| 847 | let relay_url_clone = relay_url.to_string(); | ||
| 848 | let database = Arc::clone(&self.database); | ||
| 849 | let write_policy = self.write_policy.clone(); | ||
| 850 | let local_relay = self.local_relay.clone(); | ||
| 851 | let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); | ||
| 852 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); | ||
| 853 | let metrics_clone = self.metrics.clone(); | ||
| 854 | |||
| 855 | tokio::spawn(async move { | ||
| 856 | let mut disconnect_sent = false; | ||
| 857 | let mut eose_received = false; | ||
| 858 | |||
| 859 | while let Some(relay_event) = event_rx.recv().await { | ||
| 860 | match relay_event { | ||
| 861 | RelayEvent::Event(event) => { | ||
| 862 | if let Some(ref metrics) = metrics_clone { | ||
| 863 | let source = if eose_received { | ||
| 864 | event_source::LIVE | ||
| 865 | } else { | ||
| 866 | event_source::STARTUP | ||
| 867 | }; | ||
| 868 | metrics.record_event(source); | ||
| 869 | } | ||
| 870 | Self::process_event_static( | ||
| 871 | &event, | ||
| 872 | &relay_url_clone, | ||
| 873 | &database, | ||
| 874 | &write_policy, | ||
| 875 | &local_relay, | ||
| 876 | ) | ||
| 877 | .await; | ||
| 878 | } | ||
| 879 | RelayEvent::EndOfStoredEvents(sub_id) => { | ||
| 880 | eose_received = true; | ||
| 881 | tracing::debug!( | ||
| 882 | relay = %relay_url_clone, | ||
| 883 | sub_id = %sub_id, | ||
| 884 | "EOSE received, notifying SyncManager" | ||
| 885 | ); | ||
| 886 | let _ = eose_tx | ||
| 887 | .send(EoseNotification { | ||
| 888 | relay_url: relay_url_clone.clone(), | ||
| 889 | sub_id, | ||
| 890 | }) | ||
| 891 | .await; | ||
| 892 | } | ||
| 893 | RelayEvent::Closed(reason) => { | ||
| 894 | tracing::info!( | ||
| 895 | relay = %relay_url_clone, | ||
| 896 | reason = %reason, | ||
| 897 | "Relay connection closed" | ||
| 898 | ); | ||
| 899 | if !disconnect_sent { | ||
| 900 | let _ = disconnect_tx | ||
| 901 | .send(DisconnectNotification { | ||
| 902 | relay_url: relay_url_clone.clone(), | ||
| 903 | }) | ||
| 904 | .await; | ||
| 905 | disconnect_sent = true; | ||
| 906 | } | ||
| 907 | break; | ||
| 908 | } | ||
| 909 | RelayEvent::Shutdown => { | ||
| 910 | tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); | ||
| 911 | if !disconnect_sent { | ||
| 912 | let _ = disconnect_tx | ||
| 913 | .send(DisconnectNotification { | ||
| 914 | relay_url: relay_url_clone.clone(), | ||
| 915 | }) | ||
| 916 | .await; | ||
| 917 | disconnect_sent = true; | ||
| 918 | } | ||
| 919 | break; | ||
| 920 | } | ||
| 921 | } | ||
| 922 | } | ||
| 923 | |||
| 924 | // If the event channel closed without a Closed/Shutdown event | ||
| 925 | if !disconnect_sent { | ||
| 926 | tracing::info!( | ||
| 927 | relay = %relay_url_clone, | ||
| 928 | "Event channel closed, notifying SyncManager of disconnect" | ||
| 929 | ); | ||
| 930 | let _ = disconnect_tx | ||
| 931 | .send(DisconnectNotification { | ||
| 932 | relay_url: relay_url_clone, | ||
| 933 | }) | ||
| 934 | .await; | ||
| 935 | } | ||
| 936 | }); | ||
| 937 | |||
| 938 | tracing::info!( | ||
| 939 | relay = %relay_url, | ||
| 940 | "Event loop and processor spawned for connected relay" | ||
| 941 | ); | ||
| 942 | |||
| 943 | // 3. Decide reconnection strategy based on last_connected time | ||
| 803 | let last_connected = { | 944 | let last_connected = { |
| 804 | let index = self.relay_sync_index.read().await; | 945 | let index = self.relay_sync_index.read().await; |
| 805 | index.get(relay_url).and_then(|s| s.last_connected) | 946 | index.get(relay_url).and_then(|s| s.last_connected) |
| @@ -808,7 +949,7 @@ impl SyncManager { | |||
| 808 | if let Some(last) = last_connected { | 949 | if let Some(last) = last_connected { |
| 809 | let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); | 950 | let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); |
| 810 | if elapsed < QUICK_RECONNECT_WINDOW_SECS { | 951 | if elapsed < QUICK_RECONNECT_WINDOW_SECS { |
| 811 | // short disconnect | 952 | // Short disconnect - quick reconnect |
| 812 | tracing::info!( | 953 | tracing::info!( |
| 813 | relay = %relay_url, | 954 | relay = %relay_url, |
| 814 | disconnect_secs = elapsed, | 955 | disconnect_secs = elapsed, |
| @@ -817,7 +958,7 @@ impl SyncManager { | |||
| 817 | self.quick_reconnect(relay_url, Timestamp::from(elapsed)) | 958 | self.quick_reconnect(relay_url, Timestamp::from(elapsed)) |
| 818 | .await; | 959 | .await; |
| 819 | } else { | 960 | } else { |
| 820 | // long disconnect | 961 | // Long disconnect - fresh start |
| 821 | tracing::info!( | 962 | tracing::info!( |
| 822 | relay = %relay_url, | 963 | relay = %relay_url, |
| 823 | disconnect_secs = elapsed, | 964 | disconnect_secs = elapsed, |
| @@ -826,13 +967,13 @@ impl SyncManager { | |||
| 826 | self.fresh_start(relay_url).await; | 967 | self.fresh_start(relay_url).await; |
| 827 | } | 968 | } |
| 828 | } else { | 969 | } else { |
| 829 | // not successfully connected before (since launching binary) | 970 | // First connection - fresh start |
| 830 | tracing::info!( | 971 | tracing::info!( |
| 831 | relay = %relay_url, | 972 | relay = %relay_url, |
| 832 | "First connection - initiating fresh_start" | 973 | "First connection - initiating fresh_start" |
| 833 | ); | 974 | ); |
| 834 | self.fresh_start(relay_url).await; | 975 | self.fresh_start(relay_url).await; |
| 835 | }; | 976 | } |
| 836 | } | 977 | } |
| 837 | 978 | ||
| 838 | /// Fresh start - clears state and does full sync | 979 | /// Fresh start - clears state and does full sync |
| @@ -846,7 +987,7 @@ impl SyncManager { | |||
| 846 | /// 4. L1 live + L1 historic (negentropy if available) | 987 | /// 4. L1 live + L1 historic (negentropy if available) |
| 847 | /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3 | 988 | /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3 |
| 848 | async fn fresh_start(&mut self, relay_url: &str) { | 989 | async fn fresh_start(&mut self, relay_url: &str) { |
| 849 | let now = Timestamp::now(); | 990 | let _now = Timestamp::now(); |
| 850 | 991 | ||
| 851 | tracing::info!(relay = %relay_url, "Starting fresh_start"); | 992 | tracing::info!(relay = %relay_url, "Starting fresh_start"); |
| 852 | 993 | ||
| @@ -877,28 +1018,38 @@ impl SyncManager { | |||
| 877 | ); | 1018 | ); |
| 878 | } | 1019 | } |
| 879 | if state.connection_status == ConnectionStatus::Connected { | 1020 | if state.connection_status == ConnectionStatus::Connected { |
| 880 | // TODO start layer 1 | ||
| 881 | drop(index); | 1021 | drop(index); |
| 1022 | self.sync_generic_filters(relay_url, None).await; | ||
| 1023 | // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) | ||
| 882 | self.recompute_new_sync_filters_for_relay(relay_url).await; | 1024 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 883 | } | 1025 | } |
| 884 | } else { | 1026 | } else { |
| 885 | drop(index); | 1027 | drop(index); |
| 886 | return self.spawn_relay_connection(relay_url.to_string()).await; | ||
| 887 | } | 1028 | } |
| 888 | } | 1029 | } |
| 889 | } | 1030 | } |
| 890 | 1031 | ||
| 1032 | async fn sync_generic_filters(&mut self, relay_url: &str, since: Option<Timestamp>) { | ||
| 1033 | let filters = vec![filters::build_announcement_filter(None)]; | ||
| 1034 | self.sync_live(relay_url, &filters).await; | ||
| 1035 | |||
| 1036 | // Use historic_sync with empty PendingItems for generic filters | ||
| 1037 | // Generic filters (announcements) don't have associated repos or root_events | ||
| 1038 | let items = PendingItems::default(); | ||
| 1039 | self.historic_sync(relay_url, filters, items, since).await; | ||
| 1040 | } | ||
| 1041 | |||
| 891 | /// Quick reconnect - for disconnections < 15 minutes | 1042 | /// Quick reconnect - for disconnections < 15 minutes |
| 892 | /// | 1043 | /// |
| 893 | /// Flow: | 1044 | /// Re-establishes subscriptions after a brief disconnection by: |
| 894 | /// 1. Clear PendingSyncIndex for this relay | 1045 | /// 1. Clearing stale PendingSyncIndex entries |
| 895 | /// 2. Update connection state to Connected | 1046 | /// 2. Syncing L1 filters with since timestamp (announcements) |
| 896 | /// 3. L1 live + L1 historic(since) | 1047 | /// 3. Rebuilding L2+L3 from preserved RelaySyncIndex state |
| 897 | /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since) | 1048 | /// 4. Computing actions for new items discovered during catchup |
| 898 | /// 5. compute_actions for any new items discovered during catchup | 1049 | /// |
| 1050 | /// Basic connection state and metrics are managed by handle_connect_or_reconnect. | ||
| 1051 | /// This method handles reconnect-specific concerns (health tracking, reconnect metrics). | ||
| 899 | async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { | 1052 | async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { |
| 900 | let now = Timestamp::now(); | ||
| 901 | |||
| 902 | tracing::info!( | 1053 | tracing::info!( |
| 903 | relay = %relay_url, | 1054 | relay = %relay_url, |
| 904 | since = %since, | 1055 | since = %since, |
| @@ -917,56 +1068,36 @@ impl SyncManager { | |||
| 917 | } | 1068 | } |
| 918 | } | 1069 | } |
| 919 | 1070 | ||
| 920 | // Step 2: Update connection state (preserve repos/root_events - that's the point!) | 1071 | // Record successful reconnection in health tracker |
| 921 | { | ||
| 922 | let mut index = self.relay_sync_index.write().await; | ||
| 923 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 924 | state.connection_status = ConnectionStatus::Connected; | ||
| 925 | state.last_connected = Some(now); | ||
| 926 | state.disconnected_at = None; | ||
| 927 | } | ||
| 928 | |||
| 929 | // Record success in health tracker | ||
| 930 | self.health_tracker.record_success(relay_url); | 1072 | self.health_tracker.record_success(relay_url); |
| 931 | 1073 | ||
| 932 | // Update metrics | 1074 | // Record reconnect-specific metrics (not basic connection metrics) |
| 933 | if let Some(ref metrics) = self.metrics { | 1075 | if let Some(ref metrics) = self.metrics { |
| 934 | metrics.set_relay_connected(relay_url, true); | ||
| 935 | metrics.inc_connected_count(); | ||
| 936 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1076 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); |
| 937 | metrics.record_event(event_source::RECONNECT); | 1077 | metrics.record_event(event_source::RECONNECT); |
| 938 | } | 1078 | } |
| 939 | 1079 | ||
| 940 | // Step 3: L1 live + L1 historic with since filter | 1080 | // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed) |
| 941 | // L1 live subscription (since=now for ongoing events) | 1081 | let announcement_since = { |
| 942 | let live_filter = filters::build_announcement_filter(Some(now)); | 1082 | let index = self.relay_sync_index.read().await; |
| 943 | if let Some(connection) = self.connections.get(relay_url) { | 1083 | if let Some(state) = index.get(relay_url) { |
| 944 | if let Err(e) = connection.subscribe_filter(live_filter).await { | 1084 | if state.announcements_synced { |
| 945 | tracing::error!( | 1085 | Some(since) // Can use incremental sync |
| 946 | relay = %relay_url, | 1086 | } else { |
| 947 | error = %e, | 1087 | None // Need full sync - announcements never completed |
| 948 | "Failed to set up L1 live subscription in quick_reconnect" | 1088 | } |
| 949 | ); | 1089 | } else { |
| 950 | } | 1090 | None |
| 951 | } | ||
| 952 | |||
| 953 | // L1 historic with since filter (catch up on missed announcements) | ||
| 954 | let layer1_filter = filters::build_announcement_filter(Some(since)); | ||
| 955 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 956 | if let Err(e) = connection.subscribe_filter(layer1_filter).await { | ||
| 957 | tracing::error!( | ||
| 958 | relay = %relay_url, | ||
| 959 | error = %e, | ||
| 960 | "Failed to subscribe to L1 historic filter in quick_reconnect" | ||
| 961 | ); | ||
| 962 | } | 1091 | } |
| 963 | } | 1092 | }; |
| 1093 | self.sync_generic_filters(relay_url, announcement_since) | ||
| 1094 | .await; | ||
| 964 | 1095 | ||
| 965 | // Step 4: Rebuild L2+L3 from confirmed state with since filter | 1096 | // Step 3: Rebuild L2+L3 from confirmed state with since filter |
| 966 | // This uses the preserved repos/root_events from RelaySyncIndex | 1097 | // This uses the preserved repos/root_events from RelaySyncIndex |
| 967 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | 1098 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; |
| 968 | 1099 | ||
| 969 | // Step 5: compute_actions for any NEW items discovered while disconnected | 1100 | // Step 4: compute_actions for any NEW items discovered while disconnected |
| 970 | self.recompute_new_sync_filters_for_relay(relay_url).await; | 1101 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 971 | 1102 | ||
| 972 | tracing::info!(relay = %relay_url, "quick_reconnect complete"); | 1103 | tracing::info!(relay = %relay_url, "quick_reconnect complete"); |
| @@ -1038,6 +1169,119 @@ impl SyncManager { | |||
| 1038 | } | 1169 | } |
| 1039 | } | 1170 | } |
| 1040 | 1171 | ||
| 1172 | /// Register a relay for managed connection/reconnection | ||
| 1173 | /// | ||
| 1174 | /// Creates a RelayConnection object and stores it in the connections HashMap. | ||
| 1175 | /// Also initializes RelayState if it doesn't exist. | ||
| 1176 | /// Does NOT connect - connection happens via try_connect_relay or retry_disconnected_relays. | ||
| 1177 | /// The RelayConnection persists forever and is reused on reconnects. | ||
| 1178 | async fn register_relay(&mut self, relay_url: String) { | ||
| 1179 | // Create RelayConnection if not exists | ||
| 1180 | if !self.connections.contains_key(&relay_url) { | ||
| 1181 | let connection = | ||
| 1182 | RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database)); | ||
| 1183 | self.connections.insert(relay_url.clone(), connection); | ||
| 1184 | tracing::debug!(relay = %relay_url, "Registered new relay connection"); | ||
| 1185 | } | ||
| 1186 | |||
| 1187 | // Initialize RelayState if not exists | ||
| 1188 | let is_new = { | ||
| 1189 | let mut index = self.relay_sync_index.write().await; | ||
| 1190 | if !index.contains_key(&relay_url) { | ||
| 1191 | let new_state = RelayState { | ||
| 1192 | connection_status: ConnectionStatus::Disconnected, | ||
| 1193 | is_bootstrap: false, | ||
| 1194 | last_connected: None, | ||
| 1195 | disconnected_at: None, | ||
| 1196 | repos: HashSet::new(), | ||
| 1197 | root_events: HashSet::new(), | ||
| 1198 | announcements_synced: false, | ||
| 1199 | }; | ||
| 1200 | index.insert(relay_url.clone(), new_state); | ||
| 1201 | true | ||
| 1202 | } else { | ||
| 1203 | false | ||
| 1204 | } | ||
| 1205 | }; | ||
| 1206 | |||
| 1207 | // Track new relay in metrics | ||
| 1208 | if is_new { | ||
| 1209 | if let Some(ref metrics) = self.metrics { | ||
| 1210 | metrics.inc_tracked_count(); | ||
| 1211 | } | ||
| 1212 | tracing::info!(relay = %relay_url, "Registered new relay for tracking"); | ||
| 1213 | } | ||
| 1214 | } | ||
| 1215 | |||
| 1216 | /// Attempt a single connection to a registered relay | ||
| 1217 | /// | ||
| 1218 | /// Uses the existing RelayConnection from the HashMap and attempts to connect. | ||
| 1219 | /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. | ||
| 1220 | /// On failure, updates state and health tracker. | ||
| 1221 | async fn try_connect_relay(&mut self, relay_url: &str) { | ||
| 1222 | // 1. Mark attempting (optional, helpful for debugging) | ||
| 1223 | { | ||
| 1224 | let mut index = self.relay_sync_index.write().await; | ||
| 1225 | if let Some(state) = index.get_mut(relay_url) { | ||
| 1226 | state.connection_status = ConnectionStatus::Connecting; | ||
| 1227 | } | ||
| 1228 | } | ||
| 1229 | |||
| 1230 | // 2. Record attempt in health tracker | ||
| 1231 | self.health_tracker.record_attempt(relay_url); | ||
| 1232 | |||
| 1233 | // 3. Get connection and attempt | ||
| 1234 | let connection = match self.connections.get(relay_url) { | ||
| 1235 | Some(c) => c, | ||
| 1236 | None => { | ||
| 1237 | tracing::error!(relay = %relay_url, "No RelayConnection registered"); | ||
| 1238 | return; | ||
| 1239 | } | ||
| 1240 | }; | ||
| 1241 | |||
| 1242 | let timeout = self.health_tracker.base_backoff_secs(); | ||
| 1243 | |||
| 1244 | match connection.connect_and_subscribe(None, timeout).await { | ||
| 1245 | Ok(_) => { | ||
| 1246 | // Success - record and send notification | ||
| 1247 | self.health_tracker.record_success(relay_url); | ||
| 1248 | |||
| 1249 | if let Some(ref metrics) = self.metrics { | ||
| 1250 | metrics.record_connection_attempt(relay_url, true); | ||
| 1251 | } | ||
| 1252 | |||
| 1253 | if let Some(ref connect_tx) = self.connect_tx { | ||
| 1254 | let _ = connect_tx | ||
| 1255 | .send(ConnectNotification { | ||
| 1256 | relay_url: relay_url.to_string(), | ||
| 1257 | }) | ||
| 1258 | .await; | ||
| 1259 | } | ||
| 1260 | } | ||
| 1261 | Err(e) => { | ||
| 1262 | tracing::error!(relay = %relay_url, error = %e, "Connection failed"); | ||
| 1263 | |||
| 1264 | // 4. Update state back to Disconnected on failure | ||
| 1265 | { | ||
| 1266 | let mut index = self.relay_sync_index.write().await; | ||
| 1267 | if let Some(state) = index.get_mut(relay_url) { | ||
| 1268 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 1269 | } | ||
| 1270 | } | ||
| 1271 | |||
| 1272 | // 5. Record failure in health tracker | ||
| 1273 | self.health_tracker.record_failure(relay_url); | ||
| 1274 | |||
| 1275 | // 6. Update metrics | ||
| 1276 | if let Some(ref metrics) = self.metrics { | ||
| 1277 | metrics.record_connection_attempt(relay_url, false); | ||
| 1278 | metrics | ||
| 1279 | .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1280 | } | ||
| 1281 | } | ||
| 1282 | } | ||
| 1283 | } | ||
| 1284 | |||
| 1041 | /// Recompute sync actions for a specific relay | 1285 | /// Recompute sync actions for a specific relay |
| 1042 | /// | 1286 | /// |
| 1043 | /// Uses derive_relay_targets and compute_actions to find new items | 1287 | /// Uses derive_relay_targets and compute_actions to find new items |
| @@ -1136,13 +1380,13 @@ impl SyncManager { | |||
| 1136 | } | 1380 | } |
| 1137 | } | 1381 | } |
| 1138 | 1382 | ||
| 1139 | // 3. Remove from active connections | 1383 | // 3. Keep RelayConnection in HashMap for reuse on reconnect |
| 1140 | if self.connections.remove(relay_url).is_some() { | 1384 | // The connection object persists and will be reused when retry_disconnected_relays |
| 1141 | tracing::debug!( | 1385 | // calls try_connect_relay -> connection.connect_and_subscribe() |
| 1142 | relay = %relay_url, | 1386 | tracing::debug!( |
| 1143 | "Removed relay from active connections" | 1387 | relay = %relay_url, |
| 1144 | ); | 1388 | "Keeping RelayConnection in HashMap for reconnection" |
| 1145 | } | 1389 | ); |
| 1146 | 1390 | ||
| 1147 | // 4. Record failure in health tracker | 1391 | // 4. Record failure in health tracker |
| 1148 | self.health_tracker.record_failure(relay_url); | 1392 | self.health_tracker.record_failure(relay_url); |
| @@ -1161,246 +1405,6 @@ impl SyncManager { | |||
| 1161 | ); | 1405 | ); |
| 1162 | } | 1406 | } |
| 1163 | 1407 | ||
| 1164 | /// Spawn a relay connection and start its event loop | ||
| 1165 | /// | ||
| 1166 | /// Creates a new RelayConnection, connects to Layer 1, stores the connection, | ||
| 1167 | /// and spawns event processing tasks. Uses stored channel senders for notifications. | ||
| 1168 | async fn spawn_relay_connection(&mut self, relay_url: String) { | ||
| 1169 | use tokio::sync::mpsc; | ||
| 1170 | |||
| 1171 | // Get channel senders (must exist during run) | ||
| 1172 | let disconnect_tx = match &self.disconnect_tx { | ||
| 1173 | Some(tx) => tx.clone(), | ||
| 1174 | None => { | ||
| 1175 | tracing::error!( | ||
| 1176 | relay = %relay_url, | ||
| 1177 | "Cannot spawn connection - channels not initialized" | ||
| 1178 | ); | ||
| 1179 | return; | ||
| 1180 | } | ||
| 1181 | }; | ||
| 1182 | let eose_tx = match &self.eose_tx { | ||
| 1183 | Some(tx) => tx.clone(), | ||
| 1184 | None => { | ||
| 1185 | tracing::error!( | ||
| 1186 | relay = %relay_url, | ||
| 1187 | "Cannot spawn connection - channels not initialized" | ||
| 1188 | ); | ||
| 1189 | return; | ||
| 1190 | } | ||
| 1191 | }; | ||
| 1192 | let connect_tx = match &self.connect_tx { | ||
| 1193 | Some(tx) => tx.clone(), | ||
| 1194 | None => { | ||
| 1195 | tracing::error!( | ||
| 1196 | relay = %relay_url, | ||
| 1197 | "Cannot spawn connection - channels not initialized" | ||
| 1198 | ); | ||
| 1199 | return; | ||
| 1200 | } | ||
| 1201 | }; | ||
| 1202 | |||
| 1203 | let database = Arc::clone(&self.database); | ||
| 1204 | let write_policy = self.write_policy.clone(); | ||
| 1205 | let local_relay = self.local_relay.clone(); | ||
| 1206 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | ||
| 1207 | |||
| 1208 | // Check if this is a bootstrap relay | ||
| 1209 | let is_bootstrap = self.bootstrap_relay_url.as_ref() == Some(&relay_url); | ||
| 1210 | |||
| 1211 | // Create relay connection with database for negentropy sync support | ||
| 1212 | let connection = | ||
| 1213 | RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database)); | ||
| 1214 | |||
| 1215 | // Get connection timeout from health tracker (capped at base backoff) | ||
| 1216 | // This ensures the connection attempt completes before the next retry would be scheduled | ||
| 1217 | let connection_timeout_secs = self.health_tracker.base_backoff_secs(); | ||
| 1218 | |||
| 1219 | // Connect and subscribe to Layer 1 | ||
| 1220 | match connection | ||
| 1221 | .connect_and_subscribe(None, connection_timeout_secs) | ||
| 1222 | .await | ||
| 1223 | { | ||
| 1224 | Ok(_) => { | ||
| 1225 | // Record successful connection attempt | ||
| 1226 | if let Some(ref metrics) = self.metrics { | ||
| 1227 | metrics.record_connection_attempt(&relay_url, true); | ||
| 1228 | } | ||
| 1229 | } | ||
| 1230 | Err(e) => { | ||
| 1231 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | ||
| 1232 | |||
| 1233 | // Record failed connection attempt | ||
| 1234 | if let Some(ref metrics) = self.metrics { | ||
| 1235 | metrics.record_connection_attempt(&relay_url, false); | ||
| 1236 | } | ||
| 1237 | |||
| 1238 | // Record failure in health tracker | ||
| 1239 | self.health_tracker.record_failure(&relay_url); | ||
| 1240 | |||
| 1241 | // Record health state in metrics | ||
| 1242 | if let Some(ref metrics) = self.metrics { | ||
| 1243 | metrics | ||
| 1244 | .record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); | ||
| 1245 | } | ||
| 1246 | |||
| 1247 | // Update state to disconnected on failure | ||
| 1248 | { | ||
| 1249 | let mut index = relay_sync_index.write().await; | ||
| 1250 | if let Some(state) = index.get_mut(&relay_url) { | ||
| 1251 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 1252 | } | ||
| 1253 | } | ||
| 1254 | return; | ||
| 1255 | } | ||
| 1256 | } | ||
| 1257 | |||
| 1258 | // Mark as connected in relay sync index | ||
| 1259 | // Track whether this is a new relay for metrics | ||
| 1260 | let is_new_relay = { | ||
| 1261 | let mut index = relay_sync_index.write().await; | ||
| 1262 | let is_new = !index.contains_key(&relay_url); | ||
| 1263 | let state = index.entry(relay_url.clone()).or_default(); | ||
| 1264 | state.connection_status = ConnectionStatus::Connected; | ||
| 1265 | state.is_bootstrap = is_bootstrap; | ||
| 1266 | state.last_connected = Some(Timestamp::now()); | ||
| 1267 | state.disconnected_at = None; | ||
| 1268 | is_new | ||
| 1269 | }; | ||
| 1270 | |||
| 1271 | // Increment tracked count for new relays | ||
| 1272 | if is_new_relay { | ||
| 1273 | if let Some(ref metrics) = self.metrics { | ||
| 1274 | metrics.inc_tracked_count(); | ||
| 1275 | } | ||
| 1276 | } | ||
| 1277 | |||
| 1278 | // Store connection in HashMap BEFORE sending notification | ||
| 1279 | // This ensures it's available when handle_connect_or_reconnect is called | ||
| 1280 | self.connections.insert(relay_url.clone(), connection); | ||
| 1281 | |||
| 1282 | tracing::info!( | ||
| 1283 | relay = %relay_url, | ||
| 1284 | is_bootstrap = is_bootstrap, | ||
| 1285 | "Spawned relay connection" | ||
| 1286 | ); | ||
| 1287 | |||
| 1288 | // Notify SyncManager of successful connection | ||
| 1289 | let _ = connect_tx | ||
| 1290 | .send(ConnectNotification { | ||
| 1291 | relay_url: relay_url.clone(), | ||
| 1292 | }) | ||
| 1293 | .await; | ||
| 1294 | |||
| 1295 | // Clone the connection for the event loop spawn | ||
| 1296 | // The stored connection is used for subscription management | ||
| 1297 | let connection_for_loop = self.connections.get(&relay_url).unwrap().clone(); | ||
| 1298 | |||
| 1299 | // Create event channel | ||
| 1300 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | ||
| 1301 | |||
| 1302 | // Spawn event loop with cloned connection | ||
| 1303 | tokio::spawn(async move { | ||
| 1304 | connection_for_loop.run_event_loop(event_tx).await; | ||
| 1305 | }); | ||
| 1306 | |||
| 1307 | // Spawn event processor | ||
| 1308 | let relay_url_clone = relay_url.clone(); | ||
| 1309 | let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task | ||
| 1310 | tokio::spawn(async move { | ||
| 1311 | // Track whether we've already sent a disconnect notification | ||
| 1312 | let mut disconnect_sent = false; | ||
| 1313 | // Track whether EOSE has been received - events before EOSE are "startup", after are "live" | ||
| 1314 | let mut eose_received = false; | ||
| 1315 | |||
| 1316 | while let Some(relay_event) = event_rx.recv().await { | ||
| 1317 | match relay_event { | ||
| 1318 | RelayEvent::Event(event) => { | ||
| 1319 | if let Some(ref metrics) = metrics_clone { | ||
| 1320 | // Events before EOSE are "startup", events after EOSE are "live" | ||
| 1321 | let source = if eose_received { | ||
| 1322 | event_source::LIVE | ||
| 1323 | } else { | ||
| 1324 | event_source::STARTUP | ||
| 1325 | }; | ||
| 1326 | metrics.record_event(source); | ||
| 1327 | } | ||
| 1328 | Self::process_event_static( | ||
| 1329 | &event, | ||
| 1330 | &relay_url_clone, | ||
| 1331 | &database, | ||
| 1332 | &write_policy, | ||
| 1333 | &local_relay, | ||
| 1334 | ) | ||
| 1335 | .await; | ||
| 1336 | } | ||
| 1337 | RelayEvent::EndOfStoredEvents(sub_id) => { | ||
| 1338 | // Mark EOSE as received - subsequent events are "live" | ||
| 1339 | eose_received = true; | ||
| 1340 | tracing::debug!( | ||
| 1341 | relay = %relay_url_clone, | ||
| 1342 | sub_id = %sub_id, | ||
| 1343 | "EOSE received, notifying SyncManager" | ||
| 1344 | ); | ||
| 1345 | // Notify SyncManager of EOSE | ||
| 1346 | let _ = eose_tx | ||
| 1347 | .send(EoseNotification { | ||
| 1348 | relay_url: relay_url_clone.clone(), | ||
| 1349 | sub_id, | ||
| 1350 | }) | ||
| 1351 | .await; | ||
| 1352 | } | ||
| 1353 | RelayEvent::Closed(reason) => { | ||
| 1354 | tracing::info!( | ||
| 1355 | relay = %relay_url_clone, | ||
| 1356 | reason = %reason, | ||
| 1357 | "Relay connection closed" | ||
| 1358 | ); | ||
| 1359 | // Notify SyncManager of disconnect | ||
| 1360 | let _ = disconnect_tx | ||
| 1361 | .send(DisconnectNotification { | ||
| 1362 | relay_url: relay_url_clone.clone(), | ||
| 1363 | }) | ||
| 1364 | .await; | ||
| 1365 | disconnect_sent = true; | ||
| 1366 | break; | ||
| 1367 | } | ||
| 1368 | RelayEvent::Shutdown => { | ||
| 1369 | tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); | ||
| 1370 | // Notify SyncManager of disconnect | ||
| 1371 | let _ = disconnect_tx | ||
| 1372 | .send(DisconnectNotification { | ||
| 1373 | relay_url: relay_url_clone.clone(), | ||
| 1374 | }) | ||
| 1375 | .await; | ||
| 1376 | disconnect_sent = true; | ||
| 1377 | break; | ||
| 1378 | } | ||
| 1379 | } | ||
| 1380 | } | ||
| 1381 | |||
| 1382 | // If the event channel closed without a Closed/Shutdown event | ||
| 1383 | // (e.g., connection dropped unexpectedly), still notify SyncManager | ||
| 1384 | if !disconnect_sent { | ||
| 1385 | tracing::info!( | ||
| 1386 | relay = %relay_url_clone, | ||
| 1387 | "Event channel closed, notifying SyncManager of disconnect" | ||
| 1388 | ); | ||
| 1389 | let _ = disconnect_tx | ||
| 1390 | .send(DisconnectNotification { | ||
| 1391 | relay_url: relay_url_clone.clone(), | ||
| 1392 | }) | ||
| 1393 | .await; | ||
| 1394 | } | ||
| 1395 | }); | ||
| 1396 | |||
| 1397 | tracing::info!( | ||
| 1398 | relay = %relay_url, | ||
| 1399 | is_bootstrap = is_bootstrap, | ||
| 1400 | "Spawned relay connection" | ||
| 1401 | ); | ||
| 1402 | } | ||
| 1403 | |||
| 1404 | /// Process a single event from a relay (static version for spawned tasks) | 1408 | /// Process a single event from a relay (static version for spawned tasks) |
| 1405 | /// | 1409 | /// |
| 1406 | /// Processes events with dedup, policy check, database save, and broadcast: | 1410 | /// Processes events with dedup, policy check, database save, and broadcast: |
| @@ -1759,7 +1763,7 @@ impl SyncManager { | |||
| 1759 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); | 1763 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); |
| 1760 | } | 1764 | } |
| 1761 | 1765 | ||
| 1762 | /// Check for disconnected relays that should be reconnected | 1766 | /// Retry disconnected relays that are ready for reconnection |
| 1763 | /// | 1767 | /// |
| 1764 | /// This method is called periodically by run_disconnect_checker. | 1768 | /// This method is called periodically by run_disconnect_checker. |
| 1765 | /// It identifies relays that: | 1769 | /// It identifies relays that: |
| @@ -1767,8 +1771,8 @@ impl SyncManager { | |||
| 1767 | /// - Have repos or root events to sync (not empty) | 1771 | /// - Have repos or root events to sync (not empty) |
| 1768 | /// - Have passed the exponential backoff period (respects health tracker) | 1772 | /// - Have passed the exponential backoff period (respects health tracker) |
| 1769 | /// | 1773 | /// |
| 1770 | /// For each eligible relay, a reconnection is attempted via spawn_relay_connection. | 1774 | /// For each eligible relay, a reconnection is attempted via try_connect_relay. |
| 1771 | async fn check_reconnects(&mut self) { | 1775 | async fn retry_disconnected_relays(&mut self) { |
| 1772 | // Collect relays to reconnect | 1776 | // Collect relays to reconnect |
| 1773 | let to_reconnect: Vec<String> = { | 1777 | let to_reconnect: Vec<String> = { |
| 1774 | let index = self.relay_sync_index.read().await; | 1778 | let index = self.relay_sync_index.read().await; |
| @@ -1813,7 +1817,7 @@ impl SyncManager { | |||
| 1813 | health_state = %self.health_tracker.get_state(&relay_url), | 1817 | health_state = %self.health_tracker.get_state(&relay_url), |
| 1814 | "Attempting reconnection" | 1818 | "Attempting reconnection" |
| 1815 | ); | 1819 | ); |
| 1816 | self.spawn_relay_connection(relay_url).await; | 1820 | self.try_connect_relay(&relay_url).await; |
| 1817 | } | 1821 | } |
| 1818 | } | 1822 | } |
| 1819 | 1823 | ||