diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 23:14:14 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 23:16:56 +0000 |
| commit | fe64d3754058aacdce80f8339d99f851a9e5987d (patch) | |
| tree | 7cbcd45828517a6ef4937a404b79f8b5392d7352 /src | |
| parent | be1f21aa1ec9d8666f96005ee203413441e6d220 (diff) | |
fix: eliminate disconnect race condition by adding Disconnecting state
Previously, disconnect_relay() would immediately remove RelayState and
pending batches before the event loop finished draining messages. This
caused confusing 'unknown relay' debug messages for EOSE and other
events that arrived after state removal but were expected during
normal shutdown.
Changes:
- Add ConnectionStatus::Disconnecting to track intentional disconnects
- disconnect_relay() now marks relay as Disconnecting (keeps state)
- Event loop drains messages while state exists
- handle_disconnect() detects intentional vs unexpected disconnects:
- Intentional: Completes cleanup by removing state/connections
- Unexpected: Updates to Disconnected, keeps connection for retry
- handle_eose() suppresses logs for Disconnecting relays (TRACE level)
- check_disconnects() skips relays already in Disconnecting state
This ensures proper sequencing: mark->drain->cleanup instead of
remove->drain->confusion. Fixes the root cause instead of just
hiding log messages.
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/metrics.rs | 1 | ||||
| -rw-r--r-- | src/sync/mod.rs | 241 |
2 files changed, 161 insertions, 81 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index aacfa2c..13211b9 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -311,6 +311,7 @@ impl SyncMetrics { | |||
| 311 | ConnectionStatus::Syncing => 2, | 311 | ConnectionStatus::Syncing => 2, |
| 312 | ConnectionStatus::Connected => 3, | 312 | ConnectionStatus::Connected => 3, |
| 313 | ConnectionStatus::ConnectedHistoricSyncFailures => 4, | 313 | ConnectionStatus::ConnectedHistoricSyncFailures => 4, |
| 314 | ConnectionStatus::Disconnecting => 5, | ||
| 314 | }; | 315 | }; |
| 315 | self.relay_connected | 316 | self.relay_connected |
| 316 | .with_label_values(&[relay]) | 317 | .with_label_values(&[relay]) |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 479ab33..2c1754d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -106,6 +106,9 @@ pub enum ConnectionStatus { | |||
| 106 | Connected, | 106 | Connected, |
| 107 | /// Successfully connected, historic sync had failures but live sync active | 107 | /// Successfully connected, historic sync had failures but live sync active |
| 108 | ConnectedHistoricSyncFailures, | 108 | ConnectedHistoricSyncFailures, |
| 109 | /// Disconnection initiated, waiting for event loop to terminate | ||
| 110 | /// State is retained to process remaining queued events | ||
| 111 | Disconnecting, | ||
| 109 | } | 112 | } |
| 110 | 113 | ||
| 111 | impl ConnectionStatus { | 114 | impl ConnectionStatus { |
| @@ -624,16 +627,35 @@ impl SyncManager { | |||
| 624 | /// - When all subscriptions complete (outstanding_subs empty): | 627 | /// - When all subscriptions complete (outstanding_subs empty): |
| 625 | /// - Calls confirm_batch to move items to confirmed state | 628 | /// - Calls confirm_batch to move items to confirmed state |
| 626 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { | 629 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { |
| 630 | // Check if relay is in Disconnecting state | ||
| 631 | let is_disconnecting = { | ||
| 632 | let index = self.relay_sync_index.read().await; | ||
| 633 | index | ||
| 634 | .get(relay_url) | ||
| 635 | .map(|s| s.connection_status == ConnectionStatus::Disconnecting) | ||
| 636 | .unwrap_or(false) | ||
| 637 | }; | ||
| 638 | |||
| 627 | // 1. Find and update the pending batch | 639 | // 1. Find and update the pending batch |
| 628 | let mut pending = self.pending_sync_index.write().await; | 640 | let mut pending = self.pending_sync_index.write().await; |
| 629 | 641 | ||
| 630 | let Some(batches) = pending.get_mut(relay_url) else { | 642 | let Some(batches) = pending.get_mut(relay_url) else { |
| 631 | // This can happen during disconnect if EOSE arrives after relay cleanup | 643 | // This can happen during disconnect if EOSE arrives after cleanup |
| 632 | tracing::debug!( | 644 | if is_disconnecting { |
| 633 | relay = %relay_url, | 645 | // Expected during intentional disconnect - suppress noisy log |
| 634 | sub_id = %sub_id, | 646 | tracing::trace!( |
| 635 | "EOSE received for unknown relay (likely during disconnect)" | 647 | relay = %relay_url, |
| 636 | ); | 648 | sub_id = %sub_id, |
| 649 | "EOSE received during disconnect cleanup - ignoring" | ||
| 650 | ); | ||
| 651 | } else { | ||
| 652 | // Unexpected - log at debug level | ||
| 653 | tracing::debug!( | ||
| 654 | relay = %relay_url, | ||
| 655 | sub_id = %sub_id, | ||
| 656 | "EOSE received for unknown relay" | ||
| 657 | ); | ||
| 658 | } | ||
| 637 | return; | 659 | return; |
| 638 | }; | 660 | }; |
| 639 | 661 | ||
| @@ -1361,8 +1383,10 @@ impl SyncManager { | |||
| 1361 | // Connection will trigger handle_connect_or_reconnect which will process items | 1383 | // Connection will trigger handle_connect_or_reconnect which will process items |
| 1362 | return; | 1384 | return; |
| 1363 | } | 1385 | } |
| 1364 | Some(ConnectionStatus::Disconnected) | Some(ConnectionStatus::Connecting) => { | 1386 | Some(ConnectionStatus::Disconnected) |
| 1365 | // Will be handled when connection succeeds | 1387 | | Some(ConnectionStatus::Connecting) |
| 1388 | | Some(ConnectionStatus::Disconnecting) => { | ||
| 1389 | // Will be handled when connection succeeds (or ignored if disconnecting) | ||
| 1366 | tracing::debug!( | 1390 | tracing::debug!( |
| 1367 | relay = %action.relay_url, | 1391 | relay = %action.relay_url, |
| 1368 | status = ?connection_status, | 1392 | status = ?connection_status, |
| @@ -2060,68 +2084,122 @@ impl SyncManager { | |||
| 2060 | 2084 | ||
| 2061 | /// Handle a relay disconnection | 2085 | /// Handle a relay disconnection |
| 2062 | /// | 2086 | /// |
| 2063 | /// This method: | 2087 | /// This method is called when the event loop terminates and sends a disconnect notification. |
| 2064 | /// - Updates the RelayState in relay_sync_index to Disconnected status | 2088 | /// It handles two cases: |
| 2065 | /// - Sets disconnected_at timestamp | 2089 | /// - Unexpected disconnects: Updates state to Disconnected, keeps RelayConnection for reconnect |
| 2066 | /// - Clears pending sync batches for this relay | 2090 | /// - Intentional disconnects: Completes cleanup of Disconnecting relays (removes from indices) |
| 2067 | /// - Removes the relay from active connections | ||
| 2068 | /// - Records the failure in health tracker | ||
| 2069 | async fn handle_disconnect(&mut self, relay_url: &str) { | 2091 | async fn handle_disconnect(&mut self, relay_url: &str) { |
| 2070 | tracing::warn!(relay = %relay_url, "Handling relay disconnect"); | 2092 | // Check if this was an intentional disconnect (Disconnecting status) |
| 2093 | let was_intentional = { | ||
| 2094 | let index = self.relay_sync_index.read().await; | ||
| 2095 | index | ||
| 2096 | .get(relay_url) | ||
| 2097 | .map(|s| s.connection_status == ConnectionStatus::Disconnecting) | ||
| 2098 | .unwrap_or(false) | ||
| 2099 | }; | ||
| 2071 | 2100 | ||
| 2072 | // 1. Update RelayState in relay_sync_index | 2101 | if was_intentional { |
| 2073 | { | 2102 | // Intentional disconnect - complete cleanup by removing state |
| 2074 | let mut index = self.relay_sync_index.write().await; | 2103 | tracing::info!(relay = %relay_url, "Event loop terminated for intentional disconnect, completing cleanup"); |
| 2075 | if let Some(state) = index.get_mut(relay_url) { | 2104 | |
| 2076 | state.connection_status = ConnectionStatus::Disconnected; | 2105 | // Update metrics to Disconnected before cleanup |
| 2077 | state.disconnected_at = Some(Timestamp::now()); | 2106 | if let Some(ref metrics) = self.metrics { |
| 2078 | tracing::info!( | 2107 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 2079 | relay = %relay_url, | ||
| 2080 | repos_tracked = state.repos.len(), | ||
| 2081 | "Relay state updated to disconnected" | ||
| 2082 | ); | ||
| 2083 | } else { | ||
| 2084 | tracing::debug!( | ||
| 2085 | relay = %relay_url, | ||
| 2086 | "No RelayState found for disconnected relay" | ||
| 2087 | ); | ||
| 2088 | } | 2108 | } |
| 2089 | } | ||
| 2090 | 2109 | ||
| 2091 | // 2. Clear pending sync batches for this relay | 2110 | // Remove from relay_sync_index |
| 2092 | { | 2111 | { |
| 2093 | let mut pending = self.pending_sync_index.write().await; | 2112 | let mut index = self.relay_sync_index.write().await; |
| 2094 | if pending.remove(relay_url).is_some() { | 2113 | if index.remove(relay_url).is_some() { |
| 2114 | tracing::debug!( | ||
| 2115 | relay = %relay_url, | ||
| 2116 | "Removed relay from relay_sync_index" | ||
| 2117 | ); | ||
| 2118 | } | ||
| 2119 | } | ||
| 2120 | |||
| 2121 | // Remove from pending_sync_index | ||
| 2122 | { | ||
| 2123 | let mut pending = self.pending_sync_index.write().await; | ||
| 2124 | if pending.remove(relay_url).is_some() { | ||
| 2125 | tracing::debug!( | ||
| 2126 | relay = %relay_url, | ||
| 2127 | "Removed relay from pending_sync_index" | ||
| 2128 | ); | ||
| 2129 | } | ||
| 2130 | } | ||
| 2131 | |||
| 2132 | // Remove the connection object (won't reconnect) | ||
| 2133 | if self.connections.remove(relay_url).is_some() { | ||
| 2095 | tracing::debug!( | 2134 | tracing::debug!( |
| 2096 | relay = %relay_url, | 2135 | relay = %relay_url, |
| 2097 | "Cleared pending sync batches for disconnected relay" | 2136 | "Removed connection from connections map" |
| 2098 | ); | 2137 | ); |
| 2099 | } | 2138 | } |
| 2100 | } | ||
| 2101 | 2139 | ||
| 2102 | // 3. Keep RelayConnection in HashMap for reuse on reconnect | 2140 | // Update metrics - decrement connected count |
| 2103 | // The connection object persists and will be reused when retry_disconnected_relays | 2141 | if let Some(ref metrics) = self.metrics { |
| 2104 | // calls try_connect_relay -> connection.connect() | 2142 | metrics.dec_connected_count(); |
| 2105 | tracing::debug!( | 2143 | } |
| 2106 | relay = %relay_url, | ||
| 2107 | "Keeping RelayConnection in HashMap for reconnection" | ||
| 2108 | ); | ||
| 2109 | 2144 | ||
| 2110 | // 4. Record failure in health tracker | 2145 | tracing::info!(relay = %relay_url, "Intentional disconnect cleanup complete"); |
| 2111 | self.health_tracker.record_failure(relay_url); | 2146 | } else { |
| 2147 | // Unexpected disconnect - update state but keep for reconnection | ||
| 2148 | tracing::warn!(relay = %relay_url, "Unexpected relay disconnect detected"); | ||
| 2112 | 2149 | ||
| 2113 | // Update metrics | 2150 | // Update RelayState in relay_sync_index |
| 2114 | if let Some(ref metrics) = self.metrics { | 2151 | { |
| 2115 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); | 2152 | let mut index = self.relay_sync_index.write().await; |
| 2116 | metrics.dec_connected_count(); | 2153 | if let Some(state) = index.get_mut(relay_url) { |
| 2117 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 2154 | state.connection_status = ConnectionStatus::Disconnected; |
| 2118 | } | 2155 | state.disconnected_at = Some(Timestamp::now()); |
| 2156 | tracing::info!( | ||
| 2157 | relay = %relay_url, | ||
| 2158 | repos_tracked = state.repos.len(), | ||
| 2159 | "Relay state updated to disconnected" | ||
| 2160 | ); | ||
| 2161 | } else { | ||
| 2162 | tracing::debug!( | ||
| 2163 | relay = %relay_url, | ||
| 2164 | "No RelayState found for disconnected relay" | ||
| 2165 | ); | ||
| 2166 | return; | ||
| 2167 | } | ||
| 2168 | } | ||
| 2119 | 2169 | ||
| 2120 | tracing::info!( | 2170 | // Clear pending sync batches for this relay |
| 2121 | relay = %relay_url, | 2171 | { |
| 2122 | health_state = %self.health_tracker.get_state(relay_url), | 2172 | let mut pending = self.pending_sync_index.write().await; |
| 2123 | "Relay disconnect handling complete" | 2173 | if pending.remove(relay_url).is_some() { |
| 2124 | ); | 2174 | tracing::debug!( |
| 2175 | relay = %relay_url, | ||
| 2176 | "Cleared pending sync batches for disconnected relay" | ||
| 2177 | ); | ||
| 2178 | } | ||
| 2179 | } | ||
| 2180 | |||
| 2181 | // Keep RelayConnection in HashMap for reuse on reconnect | ||
| 2182 | tracing::debug!( | ||
| 2183 | relay = %relay_url, | ||
| 2184 | "Keeping RelayConnection in HashMap for reconnection" | ||
| 2185 | ); | ||
| 2186 | |||
| 2187 | // Record failure in health tracker | ||
| 2188 | self.health_tracker.record_failure(relay_url); | ||
| 2189 | |||
| 2190 | // Update metrics | ||
| 2191 | if let Some(ref metrics) = self.metrics { | ||
| 2192 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); | ||
| 2193 | metrics.dec_connected_count(); | ||
| 2194 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 2195 | } | ||
| 2196 | |||
| 2197 | tracing::info!( | ||
| 2198 | relay = %relay_url, | ||
| 2199 | health_state = %self.health_tracker.get_state(relay_url), | ||
| 2200 | "Unexpected disconnect handling complete" | ||
| 2201 | ); | ||
| 2202 | } | ||
| 2125 | } | 2203 | } |
| 2126 | 2204 | ||
| 2127 | /// Re-process events from hot cache after their dependencies become available | 2205 | /// Re-process events from hot cache after their dependencies become available |
| @@ -2683,6 +2761,11 @@ impl SyncManager { | |||
| 2683 | return None; | 2761 | return None; |
| 2684 | } | 2762 | } |
| 2685 | 2763 | ||
| 2764 | // Skip relays already disconnecting | ||
| 2765 | if state.connection_status == ConnectionStatus::Disconnecting { | ||
| 2766 | return None; | ||
| 2767 | } | ||
| 2768 | |||
| 2686 | // Disconnect if no repos and no root events | 2769 | // Disconnect if no repos and no root events |
| 2687 | if state.repos.is_empty() && state.root_events.is_empty() { | 2770 | if state.repos.is_empty() && state.root_events.is_empty() { |
| 2688 | Some(relay_url.clone()) | 2771 | Some(relay_url.clone()) |
| @@ -2710,49 +2793,45 @@ impl SyncManager { | |||
| 2710 | } | 2793 | } |
| 2711 | } | 2794 | } |
| 2712 | 2795 | ||
| 2713 | /// Disconnect a relay and clean up all associated state | 2796 | /// Disconnect a relay and mark it for cleanup |
| 2714 | /// | 2797 | /// |
| 2715 | /// This method: | 2798 | /// This method: |
| 2716 | /// - Removes the relay from relay_sync_index | 2799 | /// - Marks the relay as Disconnecting in relay_sync_index |
| 2717 | /// - Removes the relay from pending_sync_index | 2800 | /// - Initiates the connection disconnect |
| 2718 | /// - Disconnects the connection if it exists | 2801 | /// - Final cleanup happens in handle_disconnect when event loop terminates |
| 2719 | /// | 2802 | /// |
| 2720 | /// Used by check_disconnects for cleanup of empty relays. | 2803 | /// Used by check_disconnects for cleanup of empty relays. |
| 2721 | async fn disconnect_relay(&mut self, relay_url: &str) { | 2804 | async fn disconnect_relay(&mut self, relay_url: &str) { |
| 2722 | tracing::info!(relay = %relay_url, "Disconnecting empty relay"); | 2805 | tracing::info!(relay = %relay_url, "Initiating disconnect for empty relay"); |
| 2723 | 2806 | ||
| 2724 | // Remove from relay_sync_index | 2807 | // Mark relay as Disconnecting (keep state for event loop to drain) |
| 2725 | { | 2808 | { |
| 2726 | let mut index = self.relay_sync_index.write().await; | 2809 | let mut index = self.relay_sync_index.write().await; |
| 2727 | if index.remove(relay_url).is_some() { | 2810 | if let Some(state) = index.get_mut(relay_url) { |
| 2728 | tracing::debug!( | 2811 | state.connection_status = ConnectionStatus::Disconnecting; |
| 2729 | relay = %relay_url, | 2812 | state.disconnected_at = Some(Timestamp::now()); |
| 2730 | "Removed relay from relay_sync_index" | ||
| 2731 | ); | ||
| 2732 | } | ||
| 2733 | } | ||
| 2734 | |||
| 2735 | // Remove from pending_sync_index | ||
| 2736 | { | ||
| 2737 | let mut pending = self.pending_sync_index.write().await; | ||
| 2738 | if pending.remove(relay_url).is_some() { | ||
| 2739 | tracing::debug!( | 2813 | tracing::debug!( |
| 2740 | relay = %relay_url, | 2814 | relay = %relay_url, |
| 2741 | "Removed relay from pending_sync_index" | 2815 | "Marked relay as Disconnecting" |
| 2742 | ); | 2816 | ); |
| 2743 | } | 2817 | } |
| 2744 | } | 2818 | } |
| 2745 | 2819 | ||
| 2746 | // Disconnect the connection if it exists | 2820 | // Initiate disconnect - event loop will drain and send disconnect notification |
| 2747 | if let Some(connection) = self.connections.remove(relay_url) { | 2821 | if let Some(connection) = self.connections.get(relay_url) { |
| 2748 | connection.disconnect().await; | 2822 | connection.disconnect().await; |
| 2749 | tracing::debug!( | 2823 | tracing::debug!( |
| 2750 | relay = %relay_url, | 2824 | relay = %relay_url, |
| 2751 | "Disconnected connection" | 2825 | "Initiated connection disconnect" |
| 2752 | ); | 2826 | ); |
| 2753 | } | 2827 | } |
| 2754 | 2828 | ||
| 2755 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); | 2829 | // Update metrics |
| 2830 | if let Some(ref metrics) = self.metrics { | ||
| 2831 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnecting); | ||
| 2832 | } | ||
| 2833 | |||
| 2834 | tracing::info!(relay = %relay_url, "Disconnect initiated, waiting for event loop termination"); | ||
| 2756 | } | 2835 | } |
| 2757 | 2836 | ||
| 2758 | /// Retry disconnected relays that are ready for reconnection | 2837 | /// Retry disconnected relays that are ready for reconnection |