diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/metrics.rs | 1 | ||||
| -rw-r--r-- | src/sync/mod.rs | 241 |
2 files changed, 161 insertions, 81 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index aacfa2c..13211b9 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -311,6 +311,7 @@ impl SyncMetrics { | |||
| 311 | ConnectionStatus::Syncing => 2, | 311 | ConnectionStatus::Syncing => 2, |
| 312 | ConnectionStatus::Connected => 3, | 312 | ConnectionStatus::Connected => 3, |
| 313 | ConnectionStatus::ConnectedHistoricSyncFailures => 4, | 313 | ConnectionStatus::ConnectedHistoricSyncFailures => 4, |
| 314 | ConnectionStatus::Disconnecting => 5, | ||
| 314 | }; | 315 | }; |
| 315 | self.relay_connected | 316 | self.relay_connected |
| 316 | .with_label_values(&[relay]) | 317 | .with_label_values(&[relay]) |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 479ab33..2c1754d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -106,6 +106,9 @@ pub enum ConnectionStatus { | |||
| 106 | Connected, | 106 | Connected, |
| 107 | /// Successfully connected, historic sync had failures but live sync active | 107 | /// Successfully connected, historic sync had failures but live sync active |
| 108 | ConnectedHistoricSyncFailures, | 108 | ConnectedHistoricSyncFailures, |
| 109 | /// Disconnection initiated, waiting for event loop to terminate | ||
| 110 | /// State is retained to process remaining queued events | ||
| 111 | Disconnecting, | ||
| 109 | } | 112 | } |
| 110 | 113 | ||
| 111 | impl ConnectionStatus { | 114 | impl ConnectionStatus { |
| @@ -624,16 +627,35 @@ impl SyncManager { | |||
| 624 | /// - When all subscriptions complete (outstanding_subs empty): | 627 | /// - When all subscriptions complete (outstanding_subs empty): |
| 625 | /// - Calls confirm_batch to move items to confirmed state | 628 | /// - Calls confirm_batch to move items to confirmed state |
| 626 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { | 629 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { |
| 630 | // Check if relay is in Disconnecting state | ||
| 631 | let is_disconnecting = { | ||
| 632 | let index = self.relay_sync_index.read().await; | ||
| 633 | index | ||
| 634 | .get(relay_url) | ||
| 635 | .map(|s| s.connection_status == ConnectionStatus::Disconnecting) | ||
| 636 | .unwrap_or(false) | ||
| 637 | }; | ||
| 638 | |||
| 627 | // 1. Find and update the pending batch | 639 | // 1. Find and update the pending batch |
| 628 | let mut pending = self.pending_sync_index.write().await; | 640 | let mut pending = self.pending_sync_index.write().await; |
| 629 | 641 | ||
| 630 | let Some(batches) = pending.get_mut(relay_url) else { | 642 | let Some(batches) = pending.get_mut(relay_url) else { |
| 631 | // This can happen during disconnect if EOSE arrives after relay cleanup | 643 | // This can happen during disconnect if EOSE arrives after cleanup |
| 632 | tracing::debug!( | 644 | if is_disconnecting { |
| 633 | relay = %relay_url, | 645 | // Expected during intentional disconnect - suppress noisy log |
| 634 | sub_id = %sub_id, | 646 | tracing::trace!( |
| 635 | "EOSE received for unknown relay (likely during disconnect)" | 647 | relay = %relay_url, |
| 636 | ); | 648 | sub_id = %sub_id, |
| 649 | "EOSE received during disconnect cleanup - ignoring" | ||
| 650 | ); | ||
| 651 | } else { | ||
| 652 | // Unexpected - log at debug level | ||
| 653 | tracing::debug!( | ||
| 654 | relay = %relay_url, | ||
| 655 | sub_id = %sub_id, | ||
| 656 | "EOSE received for unknown relay" | ||
| 657 | ); | ||
| 658 | } | ||
| 637 | return; | 659 | return; |
| 638 | }; | 660 | }; |
| 639 | 661 | ||
| @@ -1361,8 +1383,10 @@ impl SyncManager { | |||
| 1361 | // Connection will trigger handle_connect_or_reconnect which will process items | 1383 | // Connection will trigger handle_connect_or_reconnect which will process items |
| 1362 | return; | 1384 | return; |
| 1363 | } | 1385 | } |
| 1364 | Some(ConnectionStatus::Disconnected) | Some(ConnectionStatus::Connecting) => { | 1386 | Some(ConnectionStatus::Disconnected) |
| 1365 | // Will be handled when connection succeeds | 1387 | | Some(ConnectionStatus::Connecting) |
| 1388 | | Some(ConnectionStatus::Disconnecting) => { | ||
| 1389 | // Will be handled when connection succeeds (or ignored if disconnecting) | ||
| 1366 | tracing::debug!( | 1390 | tracing::debug!( |
| 1367 | relay = %action.relay_url, | 1391 | relay = %action.relay_url, |
| 1368 | status = ?connection_status, | 1392 | status = ?connection_status, |
| @@ -2060,68 +2084,122 @@ impl SyncManager { | |||
| 2060 | 2084 | ||
| 2061 | /// Handle a relay disconnection | 2085 | /// Handle a relay disconnection |
| 2062 | /// | 2086 | /// |
| 2063 | /// This method: | 2087 | /// This method is called when the event loop terminates and sends a disconnect notification. |
| 2064 | /// - Updates the RelayState in relay_sync_index to Disconnected status | 2088 | /// It handles two cases: |
| 2065 | /// - Sets disconnected_at timestamp | 2089 | /// - Unexpected disconnects: Updates state to Disconnected, keeps RelayConnection for reconnect |
| 2066 | /// - Clears pending sync batches for this relay | 2090 | /// - Intentional disconnects: Completes cleanup of Disconnecting relays (removes from indices) |
| 2067 | /// - Removes the relay from active connections | ||
| 2068 | /// - Records the failure in health tracker | ||
| 2069 | async fn handle_disconnect(&mut self, relay_url: &str) { | 2091 | async fn handle_disconnect(&mut self, relay_url: &str) { |
| 2070 | tracing::warn!(relay = %relay_url, "Handling relay disconnect"); | 2092 | // Check if this was an intentional disconnect (Disconnecting status) |
| 2093 | let was_intentional = { | ||
| 2094 | let index = self.relay_sync_index.read().await; | ||
| 2095 | index | ||
| 2096 | .get(relay_url) | ||
| 2097 | .map(|s| s.connection_status == ConnectionStatus::Disconnecting) | ||
| 2098 | .unwrap_or(false) | ||
| 2099 | }; | ||
| 2071 | 2100 | ||
| 2072 | // 1. Update RelayState in relay_sync_index | 2101 | if was_intentional { |
| 2073 | { | 2102 | // Intentional disconnect - complete cleanup by removing state |
| 2074 | let mut index = self.relay_sync_index.write().await; | 2103 | tracing::info!(relay = %relay_url, "Event loop terminated for intentional disconnect, completing cleanup"); |
| 2075 | if let Some(state) = index.get_mut(relay_url) { | 2104 | |
| 2076 | state.connection_status = ConnectionStatus::Disconnected; | 2105 | // Update metrics to Disconnected before cleanup |
| 2077 | state.disconnected_at = Some(Timestamp::now()); | 2106 | if let Some(ref metrics) = self.metrics { |
| 2078 | tracing::info!( | 2107 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 2079 | relay = %relay_url, | ||
| 2080 | repos_tracked = state.repos.len(), | ||
| 2081 | "Relay state updated to disconnected" | ||
| 2082 | ); | ||
| 2083 | } else { | ||
| 2084 | tracing::debug!( | ||
| 2085 | relay = %relay_url, | ||
| 2086 | "No RelayState found for disconnected relay" | ||
| 2087 | ); | ||
| 2088 | } | 2108 | } |
| 2089 | } | ||
| 2090 | 2109 | ||
| 2091 | // 2. Clear pending sync batches for this relay | 2110 | // Remove from relay_sync_index |
| 2092 | { | 2111 | { |
| 2093 | let mut pending = self.pending_sync_index.write().await; | 2112 | let mut index = self.relay_sync_index.write().await; |
| 2094 | if pending.remove(relay_url).is_some() { | 2113 | if index.remove(relay_url).is_some() { |
| 2114 | tracing::debug!( | ||
| 2115 | relay = %relay_url, | ||
| 2116 | "Removed relay from relay_sync_index" | ||
| 2117 | ); | ||
| 2118 | } | ||
| 2119 | } | ||
| 2120 | |||
| 2121 | // Remove from pending_sync_index | ||
| 2122 | { | ||
| 2123 | let mut pending = self.pending_sync_index.write().await; | ||
| 2124 | if pending.remove(relay_url).is_some() { | ||
| 2125 | tracing::debug!( | ||
| 2126 | relay = %relay_url, | ||
| 2127 | "Removed relay from pending_sync_index" | ||
| 2128 | ); | ||
| 2129 | } | ||
| 2130 | } | ||
| 2131 | |||
| 2132 | // Remove the connection object (won't reconnect) | ||
| 2133 | if self.connections.remove(relay_url).is_some() { | ||
| 2095 | tracing::debug!( | 2134 | tracing::debug!( |
| 2096 | relay = %relay_url, | 2135 | relay = %relay_url, |
| 2097 | "Cleared pending sync batches for disconnected relay" | 2136 | "Removed connection from connections map" |
| 2098 | ); | 2137 | ); |
| 2099 | } | 2138 | } |
| 2100 | } | ||
| 2101 | 2139 | ||
| 2102 | // 3. Keep RelayConnection in HashMap for reuse on reconnect | 2140 | // Update metrics - decrement connected count |
| 2103 | // The connection object persists and will be reused when retry_disconnected_relays | 2141 | if let Some(ref metrics) = self.metrics { |
| 2104 | // calls try_connect_relay -> connection.connect() | 2142 | metrics.dec_connected_count(); |
| 2105 | tracing::debug!( | 2143 | } |
| 2106 | relay = %relay_url, | ||
| 2107 | "Keeping RelayConnection in HashMap for reconnection" | ||
| 2108 | ); | ||
| 2109 | 2144 | ||
| 2110 | // 4. Record failure in health tracker | 2145 | tracing::info!(relay = %relay_url, "Intentional disconnect cleanup complete"); |
| 2111 | self.health_tracker.record_failure(relay_url); | 2146 | } else { |
| 2147 | // Unexpected disconnect - update state but keep for reconnection | ||
| 2148 | tracing::warn!(relay = %relay_url, "Unexpected relay disconnect detected"); | ||
| 2112 | 2149 | ||
| 2113 | // Update metrics | 2150 | // Update RelayState in relay_sync_index |
| 2114 | if let Some(ref metrics) = self.metrics { | 2151 | { |
| 2115 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); | 2152 | let mut index = self.relay_sync_index.write().await; |
| 2116 | metrics.dec_connected_count(); | 2153 | if let Some(state) = index.get_mut(relay_url) { |
| 2117 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 2154 | state.connection_status = ConnectionStatus::Disconnected; |
| 2118 | } | 2155 | state.disconnected_at = Some(Timestamp::now()); |
| 2156 | tracing::info!( | ||
| 2157 | relay = %relay_url, | ||
| 2158 | repos_tracked = state.repos.len(), | ||
| 2159 | "Relay state updated to disconnected" | ||
| 2160 | ); | ||
| 2161 | } else { | ||
| 2162 | tracing::debug!( | ||
| 2163 | relay = %relay_url, | ||
| 2164 | "No RelayState found for disconnected relay" | ||
| 2165 | ); | ||
| 2166 | return; | ||
| 2167 | } | ||
| 2168 | } | ||
| 2119 | 2169 | ||
| 2120 | tracing::info!( | 2170 | // Clear pending sync batches for this relay |
| 2121 | relay = %relay_url, | 2171 | { |
| 2122 | health_state = %self.health_tracker.get_state(relay_url), | 2172 | let mut pending = self.pending_sync_index.write().await; |
| 2123 | "Relay disconnect handling complete" | 2173 | if pending.remove(relay_url).is_some() { |
| 2124 | ); | 2174 | tracing::debug!( |
| 2175 | relay = %relay_url, | ||
| 2176 | "Cleared pending sync batches for disconnected relay" | ||
| 2177 | ); | ||
| 2178 | } | ||
| 2179 | } | ||
| 2180 | |||
| 2181 | // Keep RelayConnection in HashMap for reuse on reconnect | ||
| 2182 | tracing::debug!( | ||
| 2183 | relay = %relay_url, | ||
| 2184 | "Keeping RelayConnection in HashMap for reconnection" | ||
| 2185 | ); | ||
| 2186 | |||
| 2187 | // Record failure in health tracker | ||
| 2188 | self.health_tracker.record_failure(relay_url); | ||
| 2189 | |||
| 2190 | // Update metrics | ||
| 2191 | if let Some(ref metrics) = self.metrics { | ||
| 2192 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); | ||
| 2193 | metrics.dec_connected_count(); | ||
| 2194 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 2195 | } | ||
| 2196 | |||
| 2197 | tracing::info!( | ||
| 2198 | relay = %relay_url, | ||
| 2199 | health_state = %self.health_tracker.get_state(relay_url), | ||
| 2200 | "Unexpected disconnect handling complete" | ||
| 2201 | ); | ||
| 2202 | } | ||
| 2125 | } | 2203 | } |
| 2126 | 2204 | ||
| 2127 | /// Re-process events from hot cache after their dependencies become available | 2205 | /// Re-process events from hot cache after their dependencies become available |
| @@ -2683,6 +2761,11 @@ impl SyncManager { | |||
| 2683 | return None; | 2761 | return None; |
| 2684 | } | 2762 | } |
| 2685 | 2763 | ||
| 2764 | // Skip relays already disconnecting | ||
| 2765 | if state.connection_status == ConnectionStatus::Disconnecting { | ||
| 2766 | return None; | ||
| 2767 | } | ||
| 2768 | |||
| 2686 | // Disconnect if no repos and no root events | 2769 | // Disconnect if no repos and no root events |
| 2687 | if state.repos.is_empty() && state.root_events.is_empty() { | 2770 | if state.repos.is_empty() && state.root_events.is_empty() { |
| 2688 | Some(relay_url.clone()) | 2771 | Some(relay_url.clone()) |
| @@ -2710,49 +2793,45 @@ impl SyncManager { | |||
| 2710 | } | 2793 | } |
| 2711 | } | 2794 | } |
| 2712 | 2795 | ||
| 2713 | /// Disconnect a relay and clean up all associated state | 2796 | /// Disconnect a relay and mark it for cleanup |
| 2714 | /// | 2797 | /// |
| 2715 | /// This method: | 2798 | /// This method: |
| 2716 | /// - Removes the relay from relay_sync_index | 2799 | /// - Marks the relay as Disconnecting in relay_sync_index |
| 2717 | /// - Removes the relay from pending_sync_index | 2800 | /// - Initiates the connection disconnect |
| 2718 | /// - Disconnects the connection if it exists | 2801 | /// - Final cleanup happens in handle_disconnect when event loop terminates |
| 2719 | /// | 2802 | /// |
| 2720 | /// Used by check_disconnects for cleanup of empty relays. | 2803 | /// Used by check_disconnects for cleanup of empty relays. |
| 2721 | async fn disconnect_relay(&mut self, relay_url: &str) { | 2804 | async fn disconnect_relay(&mut self, relay_url: &str) { |
| 2722 | tracing::info!(relay = %relay_url, "Disconnecting empty relay"); | 2805 | tracing::info!(relay = %relay_url, "Initiating disconnect for empty relay"); |
| 2723 | 2806 | ||
| 2724 | // Remove from relay_sync_index | 2807 | // Mark relay as Disconnecting (keep state for event loop to drain) |
| 2725 | { | 2808 | { |
| 2726 | let mut index = self.relay_sync_index.write().await; | 2809 | let mut index = self.relay_sync_index.write().await; |
| 2727 | if index.remove(relay_url).is_some() { | 2810 | if let Some(state) = index.get_mut(relay_url) { |
| 2728 | tracing::debug!( | 2811 | state.connection_status = ConnectionStatus::Disconnecting; |
| 2729 | relay = %relay_url, | 2812 | state.disconnected_at = Some(Timestamp::now()); |
| 2730 | "Removed relay from relay_sync_index" | ||
| 2731 | ); | ||
| 2732 | } | ||
| 2733 | } | ||
| 2734 | |||
| 2735 | // Remove from pending_sync_index | ||
| 2736 | { | ||
| 2737 | let mut pending = self.pending_sync_index.write().await; | ||
| 2738 | if pending.remove(relay_url).is_some() { | ||
| 2739 | tracing::debug!( | 2813 | tracing::debug!( |
| 2740 | relay = %relay_url, | 2814 | relay = %relay_url, |
| 2741 | "Removed relay from pending_sync_index" | 2815 | "Marked relay as Disconnecting" |
| 2742 | ); | 2816 | ); |
| 2743 | } | 2817 | } |
| 2744 | } | 2818 | } |
| 2745 | 2819 | ||
| 2746 | // Disconnect the connection if it exists | 2820 | // Initiate disconnect - event loop will drain and send disconnect notification |
| 2747 | if let Some(connection) = self.connections.remove(relay_url) { | 2821 | if let Some(connection) = self.connections.get(relay_url) { |
| 2748 | connection.disconnect().await; | 2822 | connection.disconnect().await; |
| 2749 | tracing::debug!( | 2823 | tracing::debug!( |
| 2750 | relay = %relay_url, | 2824 | relay = %relay_url, |
| 2751 | "Disconnected connection" | 2825 | "Initiated connection disconnect" |
| 2752 | ); | 2826 | ); |
| 2753 | } | 2827 | } |
| 2754 | 2828 | ||
| 2755 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); | 2829 | // Update metrics |
| 2830 | if let Some(ref metrics) = self.metrics { | ||
| 2831 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnecting); | ||
| 2832 | } | ||
| 2833 | |||
| 2834 | tracing::info!(relay = %relay_url, "Disconnect initiated, waiting for event loop termination"); | ||
| 2756 | } | 2835 | } |
| 2757 | 2836 | ||
| 2758 | /// Retry disconnected relays that are ready for reconnection | 2837 | /// Retry disconnected relays that are ready for reconnection |