diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 13:10:58 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 13:10:58 +0000 |
| commit | 7e72cf46961852c650935633c0164f38c736aca5 (patch) | |
| tree | 1f653f65b8a8e41ff26660f6598135941ac8ce92 /src/sync | |
| parent | 00026a185b4b48d7179d02b50ea9e1802cd7e7e4 (diff) | |
Fix: Capture old_last_connected before updating state
Bug: handle_connect_or_reconnect() was incorrectly calling quick_reconnect()
on first connections instead of fresh_start().
Root cause: The code updated last_connected = Some(now) at line 808, then
immediately read it back at line 932 to make the reconnection decision.
This meant first connections saw elapsed = now - now = 0 seconds, which
triggered quick_reconnect() instead of fresh_start().
Fix: Capture old_last_connected BEFORE updating the state, then use that
value for the reconnection decision. Now first connections correctly see
None and call fresh_start().
Impact:
- First connections now properly use fresh_start() with full historic sync
- Short disconnections (< 15 min) use quick_reconnect() with since filter
- Long disconnections (> 15 min) use fresh_start() with full resync
All 41 sync tests passing.
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 71 |
1 files changed, 11 insertions, 60 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index fd59759..85ab680 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -800,7 +800,14 @@ impl SyncManager { | |||
| 800 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | 800 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { |
| 801 | use tokio::sync::mpsc; | 801 | use tokio::sync::mpsc; |
| 802 | 802 | ||
| 803 | // 1. Update state to Connected | 803 | // 1. Capture old last_connected BEFORE updating state |
| 804 | // This is critical for correct first-connection detection | ||
| 805 | let old_last_connected = { | ||
| 806 | let index = self.relay_sync_index.read().await; | ||
| 807 | index.get(relay_url).and_then(|s| s.last_connected) | ||
| 808 | }; | ||
| 809 | |||
| 810 | // 2. Update state to Connected | ||
| 804 | { | 811 | { |
| 805 | let mut index = self.relay_sync_index.write().await; | 812 | let mut index = self.relay_sync_index.write().await; |
| 806 | let state = index.entry(relay_url.to_string()).or_default(); | 813 | let state = index.entry(relay_url.to_string()).or_default(); |
| @@ -927,13 +934,9 @@ impl SyncManager { | |||
| 927 | "Event loop and processor spawned for connected relay" | 934 | "Event loop and processor spawned for connected relay" |
| 928 | ); | 935 | ); |
| 929 | 936 | ||
| 930 | // 3. Decide reconnection strategy based on last_connected time | 937 | // 3. Decide reconnection strategy based on OLD last_connected time |
| 931 | let last_connected = { | 938 | // Use the value captured BEFORE the update to correctly detect first connections |
| 932 | let index = self.relay_sync_index.read().await; | 939 | if let Some(last) = old_last_connected { |
| 933 | index.get(relay_url).and_then(|s| s.last_connected) | ||
| 934 | }; | ||
| 935 | |||
| 936 | if let Some(last) = last_connected { | ||
| 937 | let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); | 940 | let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); |
| 938 | if elapsed < QUICK_RECONNECT_WINDOW_SECS { | 941 | if elapsed < QUICK_RECONNECT_WINDOW_SECS { |
| 939 | // Short disconnect - quick reconnect | 942 | // Short disconnect - quick reconnect |
| @@ -1827,58 +1830,6 @@ impl SyncManager { | |||
| 1827 | sub_ids | 1830 | sub_ids |
| 1828 | } | 1831 | } |
| 1829 | 1832 | ||
| 1830 | /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY) | ||
| 1831 | /// | ||
| 1832 | /// Returns raw Vec<Filter> for L1+L2+L3. | ||
| 1833 | /// Used by: quick_reconnect, consolidate | ||
| 1834 | /// Does NOT include pending items - those flow through AddFilters path. | ||
| 1835 | /// | ||
| 1836 | /// # Arguments | ||
| 1837 | /// * `relay_url` - The relay URL to reconstruct filters for | ||
| 1838 | /// | ||
| 1839 | /// # Returns | ||
| 1840 | /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags) | ||
| 1841 | #[allow(dead_code)] // Will be used in Phase 3+ | ||
| 1842 | async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter> { | ||
| 1843 | // Get confirmed state from relay_sync_index | ||
| 1844 | let (repos, root_events) = { | ||
| 1845 | let index = self.relay_sync_index.read().await; | ||
| 1846 | match index.get(relay_url) { | ||
| 1847 | Some(state) => (state.repos.clone(), state.root_events.clone()), | ||
| 1848 | None => { | ||
| 1849 | tracing::warn!( | ||
| 1850 | relay = %relay_url, | ||
| 1851 | "No RelayState found for reconstruct_filters" | ||
| 1852 | ); | ||
| 1853 | return vec![]; | ||
| 1854 | } | ||
| 1855 | } | ||
| 1856 | }; | ||
| 1857 | |||
| 1858 | let mut all_filters = Vec::new(); | ||
| 1859 | |||
| 1860 | // Layer 1: Announcements (always included) | ||
| 1861 | // Note: No `since` filter - this returns raw filters for live subscriptions | ||
| 1862 | all_filters.push(filters::build_announcement_filter(None)); | ||
| 1863 | |||
| 1864 | // Layer 2 + Layer 3: Repo and root event tag filters | ||
| 1865 | if !repos.is_empty() || !root_events.is_empty() { | ||
| 1866 | let l2_l3_filters = | ||
| 1867 | filters::build_layer2_and_layer3_filters(&repos, &root_events, None); | ||
| 1868 | all_filters.extend(l2_l3_filters); | ||
| 1869 | } | ||
| 1870 | |||
| 1871 | tracing::debug!( | ||
| 1872 | relay = %relay_url, | ||
| 1873 | total_filters = all_filters.len(), | ||
| 1874 | repos_count = repos.len(), | ||
| 1875 | root_events_count = root_events.len(), | ||
| 1876 | "Reconstructed filters from confirmed state" | ||
| 1877 | ); | ||
| 1878 | |||
| 1879 | all_filters | ||
| 1880 | } | ||
| 1881 | |||
| 1882 | /// Sync historical events and track in PendingSyncIndex | 1833 | /// Sync historical events and track in PendingSyncIndex |
| 1883 | /// | 1834 | /// |
| 1884 | /// This method handles historical synchronization for a set of filters, | 1835 | /// This method handles historical synchronization for a set of filters, |