upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/metrics.rs1
-rw-r--r--src/sync/mod.rs241
2 files changed, 161 insertions, 81 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index aacfa2c..13211b9 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -311,6 +311,7 @@ impl SyncMetrics {
311 ConnectionStatus::Syncing => 2, 311 ConnectionStatus::Syncing => 2,
312 ConnectionStatus::Connected => 3, 312 ConnectionStatus::Connected => 3,
313 ConnectionStatus::ConnectedHistoricSyncFailures => 4, 313 ConnectionStatus::ConnectedHistoricSyncFailures => 4,
314 ConnectionStatus::Disconnecting => 5,
314 }; 315 };
315 self.relay_connected 316 self.relay_connected
316 .with_label_values(&[relay]) 317 .with_label_values(&[relay])
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 479ab33..2c1754d 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -106,6 +106,9 @@ pub enum ConnectionStatus {
106 Connected, 106 Connected,
107 /// Successfully connected, historic sync had failures but live sync active 107 /// Successfully connected, historic sync had failures but live sync active
108 ConnectedHistoricSyncFailures, 108 ConnectedHistoricSyncFailures,
109 /// Disconnection initiated, waiting for event loop to terminate
110 /// State is retained to process remaining queued events
111 Disconnecting,
109} 112}
110 113
111impl ConnectionStatus { 114impl ConnectionStatus {
@@ -624,16 +627,35 @@ impl SyncManager {
624 /// - When all subscriptions complete (outstanding_subs empty): 627 /// - When all subscriptions complete (outstanding_subs empty):
625 /// - Calls confirm_batch to move items to confirmed state 628 /// - Calls confirm_batch to move items to confirmed state
626 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { 629 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
630 // Check if relay is in Disconnecting state
631 let is_disconnecting = {
632 let index = self.relay_sync_index.read().await;
633 index
634 .get(relay_url)
635 .map(|s| s.connection_status == ConnectionStatus::Disconnecting)
636 .unwrap_or(false)
637 };
638
627 // 1. Find and update the pending batch 639 // 1. Find and update the pending batch
628 let mut pending = self.pending_sync_index.write().await; 640 let mut pending = self.pending_sync_index.write().await;
629 641
630 let Some(batches) = pending.get_mut(relay_url) else { 642 let Some(batches) = pending.get_mut(relay_url) else {
631 // This can happen during disconnect if EOSE arrives after relay cleanup 643 // This can happen during disconnect if EOSE arrives after cleanup
632 tracing::debug!( 644 if is_disconnecting {
633 relay = %relay_url, 645 // Expected during intentional disconnect - suppress noisy log
634 sub_id = %sub_id, 646 tracing::trace!(
635 "EOSE received for unknown relay (likely during disconnect)" 647 relay = %relay_url,
636 ); 648 sub_id = %sub_id,
649 "EOSE received during disconnect cleanup - ignoring"
650 );
651 } else {
652 // Unexpected - log at debug level
653 tracing::debug!(
654 relay = %relay_url,
655 sub_id = %sub_id,
656 "EOSE received for unknown relay"
657 );
658 }
637 return; 659 return;
638 }; 660 };
639 661
@@ -1361,8 +1383,10 @@ impl SyncManager {
1361 // Connection will trigger handle_connect_or_reconnect which will process items 1383 // Connection will trigger handle_connect_or_reconnect which will process items
1362 return; 1384 return;
1363 } 1385 }
1364 Some(ConnectionStatus::Disconnected) | Some(ConnectionStatus::Connecting) => { 1386 Some(ConnectionStatus::Disconnected)
1365 // Will be handled when connection succeeds 1387 | Some(ConnectionStatus::Connecting)
1388 | Some(ConnectionStatus::Disconnecting) => {
1389 // Will be handled when connection succeeds (or ignored if disconnecting)
1366 tracing::debug!( 1390 tracing::debug!(
1367 relay = %action.relay_url, 1391 relay = %action.relay_url,
1368 status = ?connection_status, 1392 status = ?connection_status,
@@ -2060,68 +2084,122 @@ impl SyncManager {
2060 2084
2061 /// Handle a relay disconnection 2085 /// Handle a relay disconnection
2062 /// 2086 ///
2063 /// This method: 2087 /// This method is called when the event loop terminates and sends a disconnect notification.
2064 /// - Updates the RelayState in relay_sync_index to Disconnected status 2088 /// It handles two cases:
2065 /// - Sets disconnected_at timestamp 2089 /// - Unexpected disconnects: Updates state to Disconnected, keeps RelayConnection for reconnect
2066 /// - Clears pending sync batches for this relay 2090 /// - Intentional disconnects: Completes cleanup of Disconnecting relays (removes from indices)
2067 /// - Removes the relay from active connections
2068 /// - Records the failure in health tracker
2069 async fn handle_disconnect(&mut self, relay_url: &str) { 2091 async fn handle_disconnect(&mut self, relay_url: &str) {
2070 tracing::warn!(relay = %relay_url, "Handling relay disconnect"); 2092 // Check if this was an intentional disconnect (Disconnecting status)
2093 let was_intentional = {
2094 let index = self.relay_sync_index.read().await;
2095 index
2096 .get(relay_url)
2097 .map(|s| s.connection_status == ConnectionStatus::Disconnecting)
2098 .unwrap_or(false)
2099 };
2071 2100
2072 // 1. Update RelayState in relay_sync_index 2101 if was_intentional {
2073 { 2102 // Intentional disconnect - complete cleanup by removing state
2074 let mut index = self.relay_sync_index.write().await; 2103 tracing::info!(relay = %relay_url, "Event loop terminated for intentional disconnect, completing cleanup");
2075 if let Some(state) = index.get_mut(relay_url) { 2104
2076 state.connection_status = ConnectionStatus::Disconnected; 2105 // Update metrics to Disconnected before cleanup
2077 state.disconnected_at = Some(Timestamp::now()); 2106 if let Some(ref metrics) = self.metrics {
2078 tracing::info!( 2107 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
2079 relay = %relay_url,
2080 repos_tracked = state.repos.len(),
2081 "Relay state updated to disconnected"
2082 );
2083 } else {
2084 tracing::debug!(
2085 relay = %relay_url,
2086 "No RelayState found for disconnected relay"
2087 );
2088 } 2108 }
2089 }
2090 2109
2091 // 2. Clear pending sync batches for this relay 2110 // Remove from relay_sync_index
2092 { 2111 {
2093 let mut pending = self.pending_sync_index.write().await; 2112 let mut index = self.relay_sync_index.write().await;
2094 if pending.remove(relay_url).is_some() { 2113 if index.remove(relay_url).is_some() {
2114 tracing::debug!(
2115 relay = %relay_url,
2116 "Removed relay from relay_sync_index"
2117 );
2118 }
2119 }
2120
2121 // Remove from pending_sync_index
2122 {
2123 let mut pending = self.pending_sync_index.write().await;
2124 if pending.remove(relay_url).is_some() {
2125 tracing::debug!(
2126 relay = %relay_url,
2127 "Removed relay from pending_sync_index"
2128 );
2129 }
2130 }
2131
2132 // Remove the connection object (won't reconnect)
2133 if self.connections.remove(relay_url).is_some() {
2095 tracing::debug!( 2134 tracing::debug!(
2096 relay = %relay_url, 2135 relay = %relay_url,
2097 "Cleared pending sync batches for disconnected relay" 2136 "Removed connection from connections map"
2098 ); 2137 );
2099 } 2138 }
2100 }
2101 2139
2102 // 3. Keep RelayConnection in HashMap for reuse on reconnect 2140 // Update metrics - decrement connected count
2103 // The connection object persists and will be reused when retry_disconnected_relays 2141 if let Some(ref metrics) = self.metrics {
2104 // calls try_connect_relay -> connection.connect() 2142 metrics.dec_connected_count();
2105 tracing::debug!( 2143 }
2106 relay = %relay_url,
2107 "Keeping RelayConnection in HashMap for reconnection"
2108 );
2109 2144
2110 // 4. Record failure in health tracker 2145 tracing::info!(relay = %relay_url, "Intentional disconnect cleanup complete");
2111 self.health_tracker.record_failure(relay_url); 2146 } else {
2147 // Unexpected disconnect - update state but keep for reconnection
2148 tracing::warn!(relay = %relay_url, "Unexpected relay disconnect detected");
2112 2149
2113 // Update metrics 2150 // Update RelayState in relay_sync_index
2114 if let Some(ref metrics) = self.metrics { 2151 {
2115 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); 2152 let mut index = self.relay_sync_index.write().await;
2116 metrics.dec_connected_count(); 2153 if let Some(state) = index.get_mut(relay_url) {
2117 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 2154 state.connection_status = ConnectionStatus::Disconnected;
2118 } 2155 state.disconnected_at = Some(Timestamp::now());
2156 tracing::info!(
2157 relay = %relay_url,
2158 repos_tracked = state.repos.len(),
2159 "Relay state updated to disconnected"
2160 );
2161 } else {
2162 tracing::debug!(
2163 relay = %relay_url,
2164 "No RelayState found for disconnected relay"
2165 );
2166 return;
2167 }
2168 }
2119 2169
2120 tracing::info!( 2170 // Clear pending sync batches for this relay
2121 relay = %relay_url, 2171 {
2122 health_state = %self.health_tracker.get_state(relay_url), 2172 let mut pending = self.pending_sync_index.write().await;
2123 "Relay disconnect handling complete" 2173 if pending.remove(relay_url).is_some() {
2124 ); 2174 tracing::debug!(
2175 relay = %relay_url,
2176 "Cleared pending sync batches for disconnected relay"
2177 );
2178 }
2179 }
2180
2181 // Keep RelayConnection in HashMap for reuse on reconnect
2182 tracing::debug!(
2183 relay = %relay_url,
2184 "Keeping RelayConnection in HashMap for reconnection"
2185 );
2186
2187 // Record failure in health tracker
2188 self.health_tracker.record_failure(relay_url);
2189
2190 // Update metrics
2191 if let Some(ref metrics) = self.metrics {
2192 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
2193 metrics.dec_connected_count();
2194 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
2195 }
2196
2197 tracing::info!(
2198 relay = %relay_url,
2199 health_state = %self.health_tracker.get_state(relay_url),
2200 "Unexpected disconnect handling complete"
2201 );
2202 }
2125 } 2203 }
2126 2204
2127 /// Re-process events from hot cache after their dependencies become available 2205 /// Re-process events from hot cache after their dependencies become available
@@ -2683,6 +2761,11 @@ impl SyncManager {
2683 return None; 2761 return None;
2684 } 2762 }
2685 2763
2764 // Skip relays already disconnecting
2765 if state.connection_status == ConnectionStatus::Disconnecting {
2766 return None;
2767 }
2768
2686 // Disconnect if no repos and no root events 2769 // Disconnect if no repos and no root events
2687 if state.repos.is_empty() && state.root_events.is_empty() { 2770 if state.repos.is_empty() && state.root_events.is_empty() {
2688 Some(relay_url.clone()) 2771 Some(relay_url.clone())
@@ -2710,49 +2793,45 @@ impl SyncManager {
2710 } 2793 }
2711 } 2794 }
2712 2795
2713 /// Disconnect a relay and clean up all associated state 2796 /// Disconnect a relay and mark it for cleanup
2714 /// 2797 ///
2715 /// This method: 2798 /// This method:
2716 /// - Removes the relay from relay_sync_index 2799 /// - Marks the relay as Disconnecting in relay_sync_index
2717 /// - Removes the relay from pending_sync_index 2800 /// - Initiates the connection disconnect
2718 /// - Disconnects the connection if it exists 2801 /// - Final cleanup happens in handle_disconnect when event loop terminates
2719 /// 2802 ///
2720 /// Used by check_disconnects for cleanup of empty relays. 2803 /// Used by check_disconnects for cleanup of empty relays.
2721 async fn disconnect_relay(&mut self, relay_url: &str) { 2804 async fn disconnect_relay(&mut self, relay_url: &str) {
2722 tracing::info!(relay = %relay_url, "Disconnecting empty relay"); 2805 tracing::info!(relay = %relay_url, "Initiating disconnect for empty relay");
2723 2806
2724 // Remove from relay_sync_index 2807 // Mark relay as Disconnecting (keep state for event loop to drain)
2725 { 2808 {
2726 let mut index = self.relay_sync_index.write().await; 2809 let mut index = self.relay_sync_index.write().await;
2727 if index.remove(relay_url).is_some() { 2810 if let Some(state) = index.get_mut(relay_url) {
2728 tracing::debug!( 2811 state.connection_status = ConnectionStatus::Disconnecting;
2729 relay = %relay_url, 2812 state.disconnected_at = Some(Timestamp::now());
2730 "Removed relay from relay_sync_index"
2731 );
2732 }
2733 }
2734
2735 // Remove from pending_sync_index
2736 {
2737 let mut pending = self.pending_sync_index.write().await;
2738 if pending.remove(relay_url).is_some() {
2739 tracing::debug!( 2813 tracing::debug!(
2740 relay = %relay_url, 2814 relay = %relay_url,
2741 "Removed relay from pending_sync_index" 2815 "Marked relay as Disconnecting"
2742 ); 2816 );
2743 } 2817 }
2744 } 2818 }
2745 2819
2746 // Disconnect the connection if it exists 2820 // Initiate disconnect - event loop will drain and send disconnect notification
2747 if let Some(connection) = self.connections.remove(relay_url) { 2821 if let Some(connection) = self.connections.get(relay_url) {
2748 connection.disconnect().await; 2822 connection.disconnect().await;
2749 tracing::debug!( 2823 tracing::debug!(
2750 relay = %relay_url, 2824 relay = %relay_url,
2751 "Disconnected connection" 2825 "Initiated connection disconnect"
2752 ); 2826 );
2753 } 2827 }
2754 2828
2755 tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); 2829 // Update metrics
2830 if let Some(ref metrics) = self.metrics {
2831 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnecting);
2832 }
2833
2834 tracing::info!(relay = %relay_url, "Disconnect initiated, waiting for event loop termination");
2756 } 2835 }
2757 2836
2758 /// Retry disconnected relays that are ready for reconnection 2837 /// Retry disconnected relays that are ready for reconnection