From 7e72cf46961852c650935633c0164f38c736aca5 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 13:10:58 +0000 Subject: Fix: Capture old_last_connected before updating state Bug: handle_connect_or_reconnect() was incorrectly calling quick_reconnect() on first connections instead of fresh_start(). Root cause: The code updated last_connected = Some(now) at line 808, then immediately read it back at line 932 to make the reconnection decision. This meant first connections saw elapsed = now - now = 0 seconds, which triggered quick_reconnect() instead of fresh_start(). Fix: Capture old_last_connected BEFORE updating the state, then use that value for the reconnection decision. Now first connections correctly see None and call fresh_start(). Impact: - First connections now properly use fresh_start() with full historic sync - Short disconnections (< 15 min) use quick_reconnect() with since filter - Long disconnections (> 15 min) use fresh_start() with full resync All 41 sync tests passing. --- src/sync/mod.rs | 71 +++++++++------------------------------------------------ 1 file changed, 11 insertions(+), 60 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index fd59759..85ab680 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -800,7 +800,14 @@ impl SyncManager { async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { use tokio::sync::mpsc; - // 1. Update state to Connected + // 1. Capture old last_connected BEFORE updating state + // This is critical for correct first-connection detection + let old_last_connected = { + let index = self.relay_sync_index.read().await; + index.get(relay_url).and_then(|s| s.last_connected) + }; + + // 2. Update state to Connected { let mut index = self.relay_sync_index.write().await; let state = index.entry(relay_url.to_string()).or_default(); @@ -927,13 +934,9 @@ impl SyncManager { "Event loop and processor spawned for connected relay" ); - // 3. Decide reconnection strategy based on last_connected time - let last_connected = { - let index = self.relay_sync_index.read().await; - index.get(relay_url).and_then(|s| s.last_connected) - }; - - if let Some(last) = last_connected { + // 3. Decide reconnection strategy based on OLD last_connected time + // Use the value captured BEFORE the update to correctly detect first connections + if let Some(last) = old_last_connected { let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); if elapsed < QUICK_RECONNECT_WINDOW_SECS { // Short disconnect - quick reconnect @@ -1827,58 +1830,6 @@ impl SyncManager { sub_ids } - /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY) - /// - /// Returns raw Vec for L1+L2+L3. - /// Used by: quick_reconnect, consolidate - /// Does NOT include pending items - those flow through AddFilters path. - /// - /// # Arguments - /// * `relay_url` - The relay URL to reconstruct filters for - /// - /// # Returns - /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags) - #[allow(dead_code)] // Will be used in Phase 3+ - async fn reconstruct_filters(&self, relay_url: &str) -> Vec { - // Get confirmed state from relay_sync_index - let (repos, root_events) = { - let index = self.relay_sync_index.read().await; - match index.get(relay_url) { - Some(state) => (state.repos.clone(), state.root_events.clone()), - None => { - tracing::warn!( - relay = %relay_url, - "No RelayState found for reconstruct_filters" - ); - return vec![]; - } - } - }; - - let mut all_filters = Vec::new(); - - // Layer 1: Announcements (always included) - // Note: No `since` filter - this returns raw filters for live subscriptions - all_filters.push(filters::build_announcement_filter(None)); - - // Layer 2 + Layer 3: Repo and root event tag filters - if !repos.is_empty() || !root_events.is_empty() { - let l2_l3_filters = - filters::build_layer2_and_layer3_filters(&repos, &root_events, None); - all_filters.extend(l2_l3_filters); - } - - tracing::debug!( - relay = %relay_url, - total_filters = all_filters.len(), - repos_count = repos.len(), - root_events_count = root_events.len(), - "Reconstructed filters from confirmed state" - ); - - all_filters - } - /// Sync historical events and track in PendingSyncIndex /// /// This method handles historical synchronization for a set of filters, -- cgit v1.2.3