upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 13:10:58 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 13:10:58 +0000
commit7e72cf46961852c650935633c0164f38c736aca5 (patch)
tree1f653f65b8a8e41ff26660f6598135941ac8ce92 /src/sync/mod.rs
parent00026a185b4b48d7179d02b50ea9e1802cd7e7e4 (diff)
Fix: Capture old_last_connected before updating state
Bug: handle_connect_or_reconnect() was incorrectly calling quick_reconnect() on first connections instead of fresh_start(). Root cause: The code updated last_connected = Some(now) at line 808, then immediately read it back at line 932 to make the reconnection decision. This meant first connections saw elapsed = now - now = 0 seconds, which triggered quick_reconnect() instead of fresh_start(). Fix: Capture old_last_connected BEFORE updating the state, then use that value for the reconnection decision. Now first connections correctly see None and call fresh_start(). Impact: - First connections now properly use fresh_start() with full historic sync - Short disconnections (< 15 min) use quick_reconnect() with since filter - Long disconnections (> 15 min) use fresh_start() with full resync All 41 sync tests passing.
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs71
1 files changed, 11 insertions, 60 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index fd59759..85ab680 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -800,7 +800,14 @@ impl SyncManager {
800 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { 800 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
801 use tokio::sync::mpsc; 801 use tokio::sync::mpsc;
802 802
803 // 1. Update state to Connected 803 // 1. Capture old last_connected BEFORE updating state
804 // This is critical for correct first-connection detection
805 let old_last_connected = {
806 let index = self.relay_sync_index.read().await;
807 index.get(relay_url).and_then(|s| s.last_connected)
808 };
809
810 // 2. Update state to Connected
804 { 811 {
805 let mut index = self.relay_sync_index.write().await; 812 let mut index = self.relay_sync_index.write().await;
806 let state = index.entry(relay_url.to_string()).or_default(); 813 let state = index.entry(relay_url.to_string()).or_default();
@@ -927,13 +934,9 @@ impl SyncManager {
927 "Event loop and processor spawned for connected relay" 934 "Event loop and processor spawned for connected relay"
928 ); 935 );
929 936
930 // 3. Decide reconnection strategy based on last_connected time 937 // 3. Decide reconnection strategy based on OLD last_connected time
931 let last_connected = { 938 // Use the value captured BEFORE the update to correctly detect first connections
932 let index = self.relay_sync_index.read().await; 939 if let Some(last) = old_last_connected {
933 index.get(relay_url).and_then(|s| s.last_connected)
934 };
935
936 if let Some(last) = last_connected {
937 let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); 940 let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs());
938 if elapsed < QUICK_RECONNECT_WINDOW_SECS { 941 if elapsed < QUICK_RECONNECT_WINDOW_SECS {
939 // Short disconnect - quick reconnect 942 // Short disconnect - quick reconnect
@@ -1827,58 +1830,6 @@ impl SyncManager {
1827 sub_ids 1830 sub_ids
1828 } 1831 }
1829 1832
1830 /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY)
1831 ///
1832 /// Returns raw Vec<Filter> for L1+L2+L3.
1833 /// Used by: quick_reconnect, consolidate
1834 /// Does NOT include pending items - those flow through AddFilters path.
1835 ///
1836 /// # Arguments
1837 /// * `relay_url` - The relay URL to reconstruct filters for
1838 ///
1839 /// # Returns
1840 /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags)
1841 #[allow(dead_code)] // Will be used in Phase 3+
1842 async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter> {
1843 // Get confirmed state from relay_sync_index
1844 let (repos, root_events) = {
1845 let index = self.relay_sync_index.read().await;
1846 match index.get(relay_url) {
1847 Some(state) => (state.repos.clone(), state.root_events.clone()),
1848 None => {
1849 tracing::warn!(
1850 relay = %relay_url,
1851 "No RelayState found for reconstruct_filters"
1852 );
1853 return vec![];
1854 }
1855 }
1856 };
1857
1858 let mut all_filters = Vec::new();
1859
1860 // Layer 1: Announcements (always included)
1861 // Note: No `since` filter - this returns raw filters for live subscriptions
1862 all_filters.push(filters::build_announcement_filter(None));
1863
1864 // Layer 2 + Layer 3: Repo and root event tag filters
1865 if !repos.is_empty() || !root_events.is_empty() {
1866 let l2_l3_filters =
1867 filters::build_layer2_and_layer3_filters(&repos, &root_events, None);
1868 all_filters.extend(l2_l3_filters);
1869 }
1870
1871 tracing::debug!(
1872 relay = %relay_url,
1873 total_filters = all_filters.len(),
1874 repos_count = repos.len(),
1875 root_events_count = root_events.len(),
1876 "Reconstructed filters from confirmed state"
1877 );
1878
1879 all_filters
1880 }
1881
1882 /// Sync historical events and track in PendingSyncIndex 1833 /// Sync historical events and track in PendingSyncIndex
1883 /// 1834 ///
1884 /// This method handles historical synchronization for a set of filters, 1835 /// This method handles historical synchronization for a set of filters,