From 00026a185b4b48d7179d02b50ea9e1802cd7e7e4 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 12:03:09 +0000 Subject: fix: prevent CLOSED messages from terminating relay connections The system was incorrectly treating subscription-specific CLOSED messages as connection-wide disconnects, causing live subscriptions to be terminated immediately after historic_sync completed. Two bugs fixed: 1. relay_connection.rs: Removed break on RelayMessage::Closed - it's subscription-specific, not connection-wide 2. mod.rs: Removed disconnect handling for RelayEvent::Closed - only log at DEBUG level and continue All 41 sync tests now pass including previously failing live sync tests. --- src/sync/mod.rs | 118 ++++++++++++------------------------------- src/sync/relay_connection.rs | 3 +- 2 files changed, 34 insertions(+), 87 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e66611c..fd59759 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -738,14 +738,6 @@ impl SyncManager { /// - For existing connected relays: subscribes to filters, creates PendingBatch /// - For disconnected/connecting relays: returns (will be handled on connection) async fn handle_new_sync_filters(&mut self, action: AddFilters) { - tracing::info!( - relay = %action.relay_url, - repo_count = action.items.repos.len(), - root_event_count = action.items.root_events.len(), - filter_count = action.filters.len(), - "[DIAG] handle_new_sync_filters called" - ); - // Step 1: Check if relay exists in relay_sync_index let connection_status = { let index = self.relay_sync_index.read().await; @@ -891,20 +883,15 @@ impl SyncManager { .await; } RelayEvent::Closed(reason) => { - tracing::info!( + // CLOSED message means one subscription was closed, not the whole connection + // This is normal behavior (e.g., when historic_sync completes) + tracing::debug!( relay = %relay_url_clone, reason = %reason, - "Relay connection closed" + "Relay closed a subscription (not a connection close)" ); - if !disconnect_sent { - let _ = disconnect_tx - .send(DisconnectNotification { - relay_url: relay_url_clone.clone(), - }) - .await; - disconnect_sent = true; - } - break; + // Don't break - other subscriptions remain active + // Don't send disconnect - connection is still alive } RelayEvent::Shutdown => { tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); @@ -1031,12 +1018,14 @@ impl SyncManager { async fn sync_generic_filters(&mut self, relay_url: &str, since: Option) { let filters = vec![filters::build_announcement_filter(None)]; - self.sync_live(relay_url, &filters).await; + + // Create live subscription for ongoing announcements + let _sub_ids = self.sync_live(relay_url, &filters).await; // Use historic_sync with empty PendingItems for generic filters // Generic filters (announcements) don't have associated repos or root_events let items = PendingItems::default(); - self.historic_sync(relay_url, filters, items, since).await; + let _batch_id = self.historic_sync(relay_url, filters, items, since).await; } /// Quick reconnect - for disconnections < 15 minutes @@ -1050,22 +1039,11 @@ impl SyncManager { /// Basic connection state and metrics are managed by handle_connect_or_reconnect. /// This method handles reconnect-specific concerns (health tracking, reconnect metrics). async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { - tracing::info!( - relay = %relay_url, - since = %since, - "Starting quick_reconnect" - ); - // Step 1: Clear PendingSyncIndex for this relay // Old subscriptions are dead after disconnect { let mut pending = self.pending_sync_index.write().await; - if pending.remove(relay_url).is_some() { - tracing::debug!( - relay = %relay_url, - "Cleared pending batches in quick_reconnect" - ); - } + pending.remove(relay_url); } // Record successful reconnection in health tracker @@ -1090,6 +1068,7 @@ impl SyncManager { None } }; + self.sync_generic_filters(relay_url, announcement_since) .await; @@ -1099,8 +1078,6 @@ impl SyncManager { // Step 4: compute_actions for any NEW items discovered while disconnected self.recompute_new_sync_filters_for_relay(relay_url).await; - - tracing::info!(relay = %relay_url, "quick_reconnect complete"); } /// Rebuild Layer 2 and Layer 3 subscriptions for a relay @@ -1405,7 +1382,6 @@ impl SyncManager { ) { use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - // Check if event already exists match database.event_by_id(&event.id).await { Ok(Some(_)) => { @@ -1805,13 +1781,7 @@ impl SyncManager { } } - // ========================================================================= - // Sync Primitives (Phase 1 of GRASP-02 refactoring) - // These methods are new primitives that will be used in subsequent phases. - // ========================================================================= - /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex - #[allow(dead_code)] // Will be used in Phase 2+ /// /// This method subscribes to filters with `limit: 0` for receiving ongoing events. /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have @@ -1835,47 +1805,25 @@ impl SyncManager { let connection = match self.connections.get(relay_url) { Some(conn) => conn, None => { - tracing::warn!( - relay = %relay_url, - "No connection found for sync_live" - ); + tracing::debug!(relay = %relay_url, "No connection found for live sync"); return vec![]; } }; let mut sub_ids = Vec::new(); - for filter in filters { - // Apply limit: 0 to make this a live subscription - // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", - // but omitting limit means "no limit" which is what we want for live. - // The filter passed in should already NOT have a limit set. + for filter in filters.iter() { + // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) match connection.subscribe_filter(filter.clone().limit(1)).await { Ok(sub_id) => { - tracing::trace!( - relay = %relay_url, - sub_id = %sub_id, - "Live subscription created" - ); sub_ids.push(sub_id); } Err(e) => { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to create live subscription" - ); + tracing::error!(relay = %relay_url, error = %e, "Failed to create live subscription"); } } } - tracing::debug!( - relay = %relay_url, - filter_count = filters.len(), - sub_count = sub_ids.len(), - "sync_live completed" - ); - sub_ids } @@ -2054,13 +2002,16 @@ impl SyncManager { match result { Ok(reconciliation) => { let remote_count = reconciliation.remote.len(); + let local_count = reconciliation.local.len(); + tracing::debug!( + relay = %relay_url, + filter_idx = idx, + remote_count = remote_count, + local_count = local_count, + remote_ids = ?reconciliation.remote, + "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx + ); if remote_count > 0 { - tracing::debug!( - relay = %relay_url, - filter_idx = idx, - remote_count = remote_count, - "Negentropy diff identified missing events" - ); all_remote_ids.extend(reconciliation.remote.into_iter()); } } @@ -2129,27 +2080,20 @@ impl SyncManager { .collect(); // DEBUG TRACING: Log that we're requesting events by ID - tracing::debug!( + tracing::info!( relay = %relay_url, batch_id = batch_id, total_event_ids = all_remote_ids.len(), filter_chunks = ids_filters.len(), event_ids = ?all_remote_ids, - "Creating subscriptions to fetch missing events by ID (negentropy path)" + "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID", + ids_filters.len(), + all_remote_ids.len() ); let mut subscription_ids = HashSet::new(); for (idx, filter) in ids_filters.iter().enumerate() { if let Some(conn) = self.connections.get(relay_url) { - // DEBUG TRACING: Log each filter being subscribed - tracing::debug!( - relay = %relay_url, - batch_id = batch_id, - chunk_idx = idx, - filter = ?filter, - "Subscribing to ID filter chunk" - ); - match conn.subscribe_filter(filter.clone()).await { Ok(sub_id) => { subscription_ids.insert(sub_id); @@ -2157,8 +2101,10 @@ impl SyncManager { Err(e) => { tracing::error!( relay = %relay_url, + batch_id = batch_id, + chunk_idx = idx, error = %e, - "Failed to subscribe to filter in historic_sync" + "Failed to subscribe to ID filter chunk" ); } } diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index bc4b59e..fdb32cb 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -256,7 +256,8 @@ impl RelayConnection { tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await; - break; + // Don't break - CLOSED is subscription-specific, not connection-specific + // The event loop should continue running for other active subscriptions } _ => {} }, -- cgit v1.2.3