upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 23:14:14 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 23:16:56 +0000
commitfe64d3754058aacdce80f8339d99f851a9e5987d (patch)
tree7cbcd45828517a6ef4937a404b79f8b5392d7352 /src
parentbe1f21aa1ec9d8666f96005ee203413441e6d220 (diff)
fix: eliminate disconnect race condition by adding Disconnecting state
Previously, disconnect_relay() would immediately remove RelayState and pending batches before the event loop finished draining messages. This caused confusing 'unknown relay' debug messages for EOSE and other events that arrived after state removal but were expected during normal shutdown. Changes: - Add ConnectionStatus::Disconnecting to track intentional disconnects - disconnect_relay() now marks relay as Disconnecting (keeps state) - Event loop drains messages while state exists - handle_disconnect() detects intentional vs unexpected disconnects: - Intentional: Completes cleanup by removing state/connections - Unexpected: Updates to Disconnected, keeps connection for retry - handle_eose() suppresses logs for Disconnecting relays (TRACE level) - check_disconnects() skips relays already in Disconnecting state This ensures proper sequencing: mark->drain->cleanup instead of remove->drain->confusion. Fixes the root cause instead of just hiding log messages.
Diffstat (limited to 'src')
-rw-r--r--src/sync/metrics.rs1
-rw-r--r--src/sync/mod.rs241
2 files changed, 161 insertions, 81 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index aacfa2c..13211b9 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -311,6 +311,7 @@ impl SyncMetrics {
311 ConnectionStatus::Syncing => 2, 311 ConnectionStatus::Syncing => 2,
312 ConnectionStatus::Connected => 3, 312 ConnectionStatus::Connected => 3,
313 ConnectionStatus::ConnectedHistoricSyncFailures => 4, 313 ConnectionStatus::ConnectedHistoricSyncFailures => 4,
314 ConnectionStatus::Disconnecting => 5,
314 }; 315 };
315 self.relay_connected 316 self.relay_connected
316 .with_label_values(&[relay]) 317 .with_label_values(&[relay])
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 479ab33..2c1754d 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -106,6 +106,9 @@ pub enum ConnectionStatus {
106 Connected, 106 Connected,
107 /// Successfully connected, historic sync had failures but live sync active 107 /// Successfully connected, historic sync had failures but live sync active
108 ConnectedHistoricSyncFailures, 108 ConnectedHistoricSyncFailures,
109 /// Disconnection initiated, waiting for event loop to terminate
110 /// State is retained to process remaining queued events
111 Disconnecting,
109} 112}
110 113
111impl ConnectionStatus { 114impl ConnectionStatus {
@@ -624,16 +627,35 @@ impl SyncManager {
624 /// - When all subscriptions complete (outstanding_subs empty): 627 /// - When all subscriptions complete (outstanding_subs empty):
625 /// - Calls confirm_batch to move items to confirmed state 628 /// - Calls confirm_batch to move items to confirmed state
626 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { 629 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
630 // Check if relay is in Disconnecting state
631 let is_disconnecting = {
632 let index = self.relay_sync_index.read().await;
633 index
634 .get(relay_url)
635 .map(|s| s.connection_status == ConnectionStatus::Disconnecting)
636 .unwrap_or(false)
637 };
638
627 // 1. Find and update the pending batch 639 // 1. Find and update the pending batch
628 let mut pending = self.pending_sync_index.write().await; 640 let mut pending = self.pending_sync_index.write().await;
629 641
630 let Some(batches) = pending.get_mut(relay_url) else { 642 let Some(batches) = pending.get_mut(relay_url) else {
631 // This can happen during disconnect if EOSE arrives after relay cleanup 643 // This can happen during disconnect if EOSE arrives after cleanup
632 tracing::debug!( 644 if is_disconnecting {
633 relay = %relay_url, 645 // Expected during intentional disconnect - suppress noisy log
634 sub_id = %sub_id, 646 tracing::trace!(
635 "EOSE received for unknown relay (likely during disconnect)" 647 relay = %relay_url,
636 ); 648 sub_id = %sub_id,
649 "EOSE received during disconnect cleanup - ignoring"
650 );
651 } else {
652 // Unexpected - log at debug level
653 tracing::debug!(
654 relay = %relay_url,
655 sub_id = %sub_id,
656 "EOSE received for unknown relay"
657 );
658 }
637 return; 659 return;
638 }; 660 };
639 661
@@ -1361,8 +1383,10 @@ impl SyncManager {
1361 // Connection will trigger handle_connect_or_reconnect which will process items 1383 // Connection will trigger handle_connect_or_reconnect which will process items
1362 return; 1384 return;
1363 } 1385 }
1364 Some(ConnectionStatus::Disconnected) | Some(ConnectionStatus::Connecting) => { 1386 Some(ConnectionStatus::Disconnected)
1365 // Will be handled when connection succeeds 1387 | Some(ConnectionStatus::Connecting)
1388 | Some(ConnectionStatus::Disconnecting) => {
1389 // Will be handled when connection succeeds (or ignored if disconnecting)
1366 tracing::debug!( 1390 tracing::debug!(
1367 relay = %action.relay_url, 1391 relay = %action.relay_url,
1368 status = ?connection_status, 1392 status = ?connection_status,
@@ -2060,68 +2084,122 @@ impl SyncManager {
2060 2084
2061 /// Handle a relay disconnection 2085 /// Handle a relay disconnection
2062 /// 2086 ///
2063 /// This method: 2087 /// This method is called when the event loop terminates and sends a disconnect notification.
2064 /// - Updates the RelayState in relay_sync_index to Disconnected status 2088 /// It handles two cases:
2065 /// - Sets disconnected_at timestamp 2089 /// - Unexpected disconnects: Updates state to Disconnected, keeps RelayConnection for reconnect
2066 /// - Clears pending sync batches for this relay 2090 /// - Intentional disconnects: Completes cleanup of Disconnecting relays (removes from indices)
2067 /// - Removes the relay from active connections
2068 /// - Records the failure in health tracker
2069 async fn handle_disconnect(&mut self, relay_url: &str) { 2091 async fn handle_disconnect(&mut self, relay_url: &str) {
2070 tracing::warn!(relay = %relay_url, "Handling relay disconnect"); 2092 // Check if this was an intentional disconnect (Disconnecting status)
2093 let was_intentional = {
2094 let index = self.relay_sync_index.read().await;
2095 index
2096 .get(relay_url)
2097 .map(|s| s.connection_status == ConnectionStatus::Disconnecting)
2098 .unwrap_or(false)
2099 };
2071 2100
2072 // 1. Update RelayState in relay_sync_index 2101 if was_intentional {
2073 { 2102 // Intentional disconnect - complete cleanup by removing state
2074 let mut index = self.relay_sync_index.write().await; 2103 tracing::info!(relay = %relay_url, "Event loop terminated for intentional disconnect, completing cleanup");
2075 if let Some(state) = index.get_mut(relay_url) { 2104
2076 state.connection_status = ConnectionStatus::Disconnected; 2105 // Update metrics to Disconnected before cleanup
2077 state.disconnected_at = Some(Timestamp::now()); 2106 if let Some(ref metrics) = self.metrics {
2078 tracing::info!( 2107 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
2079 relay = %relay_url,
2080 repos_tracked = state.repos.len(),
2081 "Relay state updated to disconnected"
2082 );
2083 } else {
2084 tracing::debug!(
2085 relay = %relay_url,
2086 "No RelayState found for disconnected relay"
2087 );
2088 } 2108 }
2089 }
2090 2109
2091 // 2. Clear pending sync batches for this relay 2110 // Remove from relay_sync_index
2092 { 2111 {
2093 let mut pending = self.pending_sync_index.write().await; 2112 let mut index = self.relay_sync_index.write().await;
2094 if pending.remove(relay_url).is_some() { 2113 if index.remove(relay_url).is_some() {
2114 tracing::debug!(
2115 relay = %relay_url,
2116 "Removed relay from relay_sync_index"
2117 );
2118 }
2119 }
2120
2121 // Remove from pending_sync_index
2122 {
2123 let mut pending = self.pending_sync_index.write().await;
2124 if pending.remove(relay_url).is_some() {
2125 tracing::debug!(
2126 relay = %relay_url,
2127 "Removed relay from pending_sync_index"
2128 );
2129 }
2130 }
2131
2132 // Remove the connection object (won't reconnect)
2133 if self.connections.remove(relay_url).is_some() {
2095 tracing::debug!( 2134 tracing::debug!(
2096 relay = %relay_url, 2135 relay = %relay_url,
2097 "Cleared pending sync batches for disconnected relay" 2136 "Removed connection from connections map"
2098 ); 2137 );
2099 } 2138 }
2100 }
2101 2139
2102 // 3. Keep RelayConnection in HashMap for reuse on reconnect 2140 // Update metrics - decrement connected count
2103 // The connection object persists and will be reused when retry_disconnected_relays 2141 if let Some(ref metrics) = self.metrics {
2104 // calls try_connect_relay -> connection.connect() 2142 metrics.dec_connected_count();
2105 tracing::debug!( 2143 }
2106 relay = %relay_url,
2107 "Keeping RelayConnection in HashMap for reconnection"
2108 );
2109 2144
2110 // 4. Record failure in health tracker 2145 tracing::info!(relay = %relay_url, "Intentional disconnect cleanup complete");
2111 self.health_tracker.record_failure(relay_url); 2146 } else {
2147 // Unexpected disconnect - update state but keep for reconnection
2148 tracing::warn!(relay = %relay_url, "Unexpected relay disconnect detected");
2112 2149
2113 // Update metrics 2150 // Update RelayState in relay_sync_index
2114 if let Some(ref metrics) = self.metrics { 2151 {
2115 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); 2152 let mut index = self.relay_sync_index.write().await;
2116 metrics.dec_connected_count(); 2153 if let Some(state) = index.get_mut(relay_url) {
2117 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 2154 state.connection_status = ConnectionStatus::Disconnected;
2118 } 2155 state.disconnected_at = Some(Timestamp::now());
2156 tracing::info!(
2157 relay = %relay_url,
2158 repos_tracked = state.repos.len(),
2159 "Relay state updated to disconnected"
2160 );
2161 } else {
2162 tracing::debug!(
2163 relay = %relay_url,
2164 "No RelayState found for disconnected relay"
2165 );
2166 return;
2167 }
2168 }
2119 2169
2120 tracing::info!( 2170 // Clear pending sync batches for this relay
2121 relay = %relay_url, 2171 {
2122 health_state = %self.health_tracker.get_state(relay_url), 2172 let mut pending = self.pending_sync_index.write().await;
2123 "Relay disconnect handling complete" 2173 if pending.remove(relay_url).is_some() {
2124 ); 2174 tracing::debug!(
2175 relay = %relay_url,
2176 "Cleared pending sync batches for disconnected relay"
2177 );
2178 }
2179 }
2180
2181 // Keep RelayConnection in HashMap for reuse on reconnect
2182 tracing::debug!(
2183 relay = %relay_url,
2184 "Keeping RelayConnection in HashMap for reconnection"
2185 );
2186
2187 // Record failure in health tracker
2188 self.health_tracker.record_failure(relay_url);
2189
2190 // Update metrics
2191 if let Some(ref metrics) = self.metrics {
2192 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
2193 metrics.dec_connected_count();
2194 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
2195 }
2196
2197 tracing::info!(
2198 relay = %relay_url,
2199 health_state = %self.health_tracker.get_state(relay_url),
2200 "Unexpected disconnect handling complete"
2201 );
2202 }
2125 } 2203 }
2126 2204
2127 /// Re-process events from hot cache after their dependencies become available 2205 /// Re-process events from hot cache after their dependencies become available
@@ -2683,6 +2761,11 @@ impl SyncManager {
2683 return None; 2761 return None;
2684 } 2762 }
2685 2763
2764 // Skip relays already disconnecting
2765 if state.connection_status == ConnectionStatus::Disconnecting {
2766 return None;
2767 }
2768
2686 // Disconnect if no repos and no root events 2769 // Disconnect if no repos and no root events
2687 if state.repos.is_empty() && state.root_events.is_empty() { 2770 if state.repos.is_empty() && state.root_events.is_empty() {
2688 Some(relay_url.clone()) 2771 Some(relay_url.clone())
@@ -2710,49 +2793,45 @@ impl SyncManager {
2710 } 2793 }
2711 } 2794 }
2712 2795
2713 /// Disconnect a relay and clean up all associated state 2796 /// Disconnect a relay and mark it for cleanup
2714 /// 2797 ///
2715 /// This method: 2798 /// This method:
2716 /// - Removes the relay from relay_sync_index 2799 /// - Marks the relay as Disconnecting in relay_sync_index
2717 /// - Removes the relay from pending_sync_index 2800 /// - Initiates the connection disconnect
2718 /// - Disconnects the connection if it exists 2801 /// - Final cleanup happens in handle_disconnect when event loop terminates
2719 /// 2802 ///
2720 /// Used by check_disconnects for cleanup of empty relays. 2803 /// Used by check_disconnects for cleanup of empty relays.
2721 async fn disconnect_relay(&mut self, relay_url: &str) { 2804 async fn disconnect_relay(&mut self, relay_url: &str) {
2722 tracing::info!(relay = %relay_url, "Disconnecting empty relay"); 2805 tracing::info!(relay = %relay_url, "Initiating disconnect for empty relay");
2723 2806
2724 // Remove from relay_sync_index 2807 // Mark relay as Disconnecting (keep state for event loop to drain)
2725 { 2808 {
2726 let mut index = self.relay_sync_index.write().await; 2809 let mut index = self.relay_sync_index.write().await;
2727 if index.remove(relay_url).is_some() { 2810 if let Some(state) = index.get_mut(relay_url) {
2728 tracing::debug!( 2811 state.connection_status = ConnectionStatus::Disconnecting;
2729 relay = %relay_url, 2812 state.disconnected_at = Some(Timestamp::now());
2730 "Removed relay from relay_sync_index"
2731 );
2732 }
2733 }
2734
2735 // Remove from pending_sync_index
2736 {
2737 let mut pending = self.pending_sync_index.write().await;
2738 if pending.remove(relay_url).is_some() {
2739 tracing::debug!( 2813 tracing::debug!(
2740 relay = %relay_url, 2814 relay = %relay_url,
2741 "Removed relay from pending_sync_index" 2815 "Marked relay as Disconnecting"
2742 ); 2816 );
2743 } 2817 }
2744 } 2818 }
2745 2819
2746 // Disconnect the connection if it exists 2820 // Initiate disconnect - event loop will drain and send disconnect notification
2747 if let Some(connection) = self.connections.remove(relay_url) { 2821 if let Some(connection) = self.connections.get(relay_url) {
2748 connection.disconnect().await; 2822 connection.disconnect().await;
2749 tracing::debug!( 2823 tracing::debug!(
2750 relay = %relay_url, 2824 relay = %relay_url,
2751 "Disconnected connection" 2825 "Initiated connection disconnect"
2752 ); 2826 );
2753 } 2827 }
2754 2828
2755 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); 2829 // Update metrics
2830 if let Some(ref metrics) = self.metrics {
2831 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnecting);
2832 }
2833
2834 tracing::info!(relay = %relay_url, "Disconnect initiated, waiting for event loop termination");
2756 } 2835 }
2757 2836
2758 /// Retry disconnected relays that are ready for reconnection 2837 /// Retry disconnected relays that are ready for reconnection