upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 12:03:09 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 12:08:05 +0000
commit00026a185b4b48d7179d02b50ea9e1802cd7e7e4 (patch)
tree06fd1abd38018f6e1e31734efdfbfe2bd9d1d9b4 /src/sync/mod.rs
parent0228fa1e2fac86cfd2543444eef0784faa7a9715 (diff)
fix: prevent CLOSED messages from terminating relay connections
The system was incorrectly treating subscription-specific CLOSED messages as connection-wide disconnects, causing live subscriptions to be terminated immediately after historic_sync completed. Two bugs fixed: 1. relay_connection.rs: Removed break on RelayMessage::Closed - it's subscription-specific, not connection-wide 2. mod.rs: Removed disconnect handling for RelayEvent::Closed - only log at DEBUG level and continue All 41 sync tests now pass including previously failing live sync tests.
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs118
1 files changed, 32 insertions, 86 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e66611c..fd59759 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -738,14 +738,6 @@ impl SyncManager {
738 /// - For existing connected relays: subscribes to filters, creates PendingBatch 738 /// - For existing connected relays: subscribes to filters, creates PendingBatch
739 /// - For disconnected/connecting relays: returns (will be handled on connection) 739 /// - For disconnected/connecting relays: returns (will be handled on connection)
740 async fn handle_new_sync_filters(&mut self, action: AddFilters) { 740 async fn handle_new_sync_filters(&mut self, action: AddFilters) {
741 tracing::info!(
742 relay = %action.relay_url,
743 repo_count = action.items.repos.len(),
744 root_event_count = action.items.root_events.len(),
745 filter_count = action.filters.len(),
746 "[DIAG] handle_new_sync_filters called"
747 );
748
749 // Step 1: Check if relay exists in relay_sync_index 741 // Step 1: Check if relay exists in relay_sync_index
750 let connection_status = { 742 let connection_status = {
751 let index = self.relay_sync_index.read().await; 743 let index = self.relay_sync_index.read().await;
@@ -891,20 +883,15 @@ impl SyncManager {
891 .await; 883 .await;
892 } 884 }
893 RelayEvent::Closed(reason) => { 885 RelayEvent::Closed(reason) => {
894 tracing::info!( 886 // CLOSED message means one subscription was closed, not the whole connection
887 // This is normal behavior (e.g., when historic_sync completes)
888 tracing::debug!(
895 relay = %relay_url_clone, 889 relay = %relay_url_clone,
896 reason = %reason, 890 reason = %reason,
897 "Relay connection closed" 891 "Relay closed a subscription (not a connection close)"
898 ); 892 );
899 if !disconnect_sent { 893 // Don't break - other subscriptions remain active
900 let _ = disconnect_tx 894 // Don't send disconnect - connection is still alive
901 .send(DisconnectNotification {
902 relay_url: relay_url_clone.clone(),
903 })
904 .await;
905 disconnect_sent = true;
906 }
907 break;
908 } 895 }
909 RelayEvent::Shutdown => { 896 RelayEvent::Shutdown => {
910 tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); 897 tracing::info!(relay = %relay_url_clone, "Relay shutdown detected");
@@ -1031,12 +1018,14 @@ impl SyncManager {
1031 1018
1032 async fn sync_generic_filters(&mut self, relay_url: &str, since: Option<Timestamp>) { 1019 async fn sync_generic_filters(&mut self, relay_url: &str, since: Option<Timestamp>) {
1033 let filters = vec![filters::build_announcement_filter(None)]; 1020 let filters = vec![filters::build_announcement_filter(None)];
1034 self.sync_live(relay_url, &filters).await; 1021
1022 // Create live subscription for ongoing announcements
1023 let _sub_ids = self.sync_live(relay_url, &filters).await;
1035 1024
1036 // Use historic_sync with empty PendingItems for generic filters 1025 // Use historic_sync with empty PendingItems for generic filters
1037 // Generic filters (announcements) don't have associated repos or root_events 1026 // Generic filters (announcements) don't have associated repos or root_events
1038 let items = PendingItems::default(); 1027 let items = PendingItems::default();
1039 self.historic_sync(relay_url, filters, items, since).await; 1028 let _batch_id = self.historic_sync(relay_url, filters, items, since).await;
1040 } 1029 }
1041 1030
1042 /// Quick reconnect - for disconnections < 15 minutes 1031 /// Quick reconnect - for disconnections < 15 minutes
@@ -1050,22 +1039,11 @@ impl SyncManager {
1050 /// Basic connection state and metrics are managed by handle_connect_or_reconnect. 1039 /// Basic connection state and metrics are managed by handle_connect_or_reconnect.
1051 /// This method handles reconnect-specific concerns (health tracking, reconnect metrics). 1040 /// This method handles reconnect-specific concerns (health tracking, reconnect metrics).
1052 async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { 1041 async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) {
1053 tracing::info!(
1054 relay = %relay_url,
1055 since = %since,
1056 "Starting quick_reconnect"
1057 );
1058
1059 // Step 1: Clear PendingSyncIndex for this relay 1042 // Step 1: Clear PendingSyncIndex for this relay
1060 // Old subscriptions are dead after disconnect 1043 // Old subscriptions are dead after disconnect
1061 { 1044 {
1062 let mut pending = self.pending_sync_index.write().await; 1045 let mut pending = self.pending_sync_index.write().await;
1063 if pending.remove(relay_url).is_some() { 1046 pending.remove(relay_url);
1064 tracing::debug!(
1065 relay = %relay_url,
1066 "Cleared pending batches in quick_reconnect"
1067 );
1068 }
1069 } 1047 }
1070 1048
1071 // Record successful reconnection in health tracker 1049 // Record successful reconnection in health tracker
@@ -1090,6 +1068,7 @@ impl SyncManager {
1090 None 1068 None
1091 } 1069 }
1092 }; 1070 };
1071
1093 self.sync_generic_filters(relay_url, announcement_since) 1072 self.sync_generic_filters(relay_url, announcement_since)
1094 .await; 1073 .await;
1095 1074
@@ -1099,8 +1078,6 @@ impl SyncManager {
1099 1078
1100 // Step 4: compute_actions for any NEW items discovered while disconnected 1079 // Step 4: compute_actions for any NEW items discovered while disconnected
1101 self.recompute_new_sync_filters_for_relay(relay_url).await; 1080 self.recompute_new_sync_filters_for_relay(relay_url).await;
1102
1103 tracing::info!(relay = %relay_url, "quick_reconnect complete");
1104 } 1081 }
1105 1082
1106 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay 1083 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay
@@ -1405,7 +1382,6 @@ impl SyncManager {
1405 ) { 1382 ) {
1406 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; 1383 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy};
1407 use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 1384 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1408
1409 // Check if event already exists 1385 // Check if event already exists
1410 match database.event_by_id(&event.id).await { 1386 match database.event_by_id(&event.id).await {
1411 Ok(Some(_)) => { 1387 Ok(Some(_)) => {
@@ -1805,13 +1781,7 @@ impl SyncManager {
1805 } 1781 }
1806 } 1782 }
1807 1783
1808 // =========================================================================
1809 // Sync Primitives (Phase 1 of GRASP-02 refactoring)
1810 // These methods are new primitives that will be used in subsequent phases.
1811 // =========================================================================
1812
1813 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex 1784 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex
1814 #[allow(dead_code)] // Will be used in Phase 2+
1815 /// 1785 ///
1816 /// This method subscribes to filters with `limit: 0` for receiving ongoing events. 1786 /// This method subscribes to filters with `limit: 0` for receiving ongoing events.
1817 /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have 1787 /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have
@@ -1835,47 +1805,25 @@ impl SyncManager {
1835 let connection = match self.connections.get(relay_url) { 1805 let connection = match self.connections.get(relay_url) {
1836 Some(conn) => conn, 1806 Some(conn) => conn,
1837 None => { 1807 None => {
1838 tracing::warn!( 1808 tracing::debug!(relay = %relay_url, "No connection found for live sync");
1839 relay = %relay_url,
1840 "No connection found for sync_live"
1841 );
1842 return vec![]; 1809 return vec![];
1843 } 1810 }
1844 }; 1811 };
1845 1812
1846 let mut sub_ids = Vec::new(); 1813 let mut sub_ids = Vec::new();
1847 1814
1848 for filter in filters { 1815 for filter in filters.iter() {
1849 // Apply limit: 0 to make this a live subscription 1816 // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit)
1850 // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit",
1851 // but omitting limit means "no limit" which is what we want for live.
1852 // The filter passed in should already NOT have a limit set.
1853 match connection.subscribe_filter(filter.clone().limit(1)).await { 1817 match connection.subscribe_filter(filter.clone().limit(1)).await {
1854 Ok(sub_id) => { 1818 Ok(sub_id) => {
1855 tracing::trace!(
1856 relay = %relay_url,
1857 sub_id = %sub_id,
1858 "Live subscription created"
1859 );
1860 sub_ids.push(sub_id); 1819 sub_ids.push(sub_id);
1861 } 1820 }
1862 Err(e) => { 1821 Err(e) => {
1863 tracing::error!( 1822 tracing::error!(relay = %relay_url, error = %e, "Failed to create live subscription");
1864 relay = %relay_url,
1865 error = %e,
1866 "Failed to create live subscription"
1867 );
1868 } 1823 }
1869 } 1824 }
1870 } 1825 }
1871 1826
1872 tracing::debug!(
1873 relay = %relay_url,
1874 filter_count = filters.len(),
1875 sub_count = sub_ids.len(),
1876 "sync_live completed"
1877 );
1878
1879 sub_ids 1827 sub_ids
1880 } 1828 }
1881 1829
@@ -2054,13 +2002,16 @@ impl SyncManager {
2054 match result { 2002 match result {
2055 Ok(reconciliation) => { 2003 Ok(reconciliation) => {
2056 let remote_count = reconciliation.remote.len(); 2004 let remote_count = reconciliation.remote.len();
2005 let local_count = reconciliation.local.len();
2006 tracing::debug!(
2007 relay = %relay_url,
2008 filter_idx = idx,
2009 remote_count = remote_count,
2010 local_count = local_count,
2011 remote_ids = ?reconciliation.remote,
2012 "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx
2013 );
2057 if remote_count > 0 { 2014 if remote_count > 0 {
2058 tracing::debug!(
2059 relay = %relay_url,
2060 filter_idx = idx,
2061 remote_count = remote_count,
2062 "Negentropy diff identified missing events"
2063 );
2064 all_remote_ids.extend(reconciliation.remote.into_iter()); 2015 all_remote_ids.extend(reconciliation.remote.into_iter());
2065 } 2016 }
2066 } 2017 }
@@ -2129,27 +2080,20 @@ impl SyncManager {
2129 .collect(); 2080 .collect();
2130 2081
2131 // DEBUG TRACING: Log that we're requesting events by ID 2082 // DEBUG TRACING: Log that we're requesting events by ID
2132 tracing::debug!( 2083 tracing::info!(
2133 relay = %relay_url, 2084 relay = %relay_url,
2134 batch_id = batch_id, 2085 batch_id = batch_id,
2135 total_event_ids = all_remote_ids.len(), 2086 total_event_ids = all_remote_ids.len(),
2136 filter_chunks = ids_filters.len(), 2087 filter_chunks = ids_filters.len(),
2137 event_ids = ?all_remote_ids, 2088 event_ids = ?all_remote_ids,
2138 "Creating subscriptions to fetch missing events by ID (negentropy path)" 2089 "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID",
2090 ids_filters.len(),
2091 all_remote_ids.len()
2139 ); 2092 );
2140 2093
2141 let mut subscription_ids = HashSet::new(); 2094 let mut subscription_ids = HashSet::new();
2142 for (idx, filter) in ids_filters.iter().enumerate() { 2095 for (idx, filter) in ids_filters.iter().enumerate() {
2143 if let Some(conn) = self.connections.get(relay_url) { 2096 if let Some(conn) = self.connections.get(relay_url) {
2144 // DEBUG TRACING: Log each filter being subscribed
2145 tracing::debug!(
2146 relay = %relay_url,
2147 batch_id = batch_id,
2148 chunk_idx = idx,
2149 filter = ?filter,
2150 "Subscribing to ID filter chunk"
2151 );
2152
2153 match conn.subscribe_filter(filter.clone()).await { 2097 match conn.subscribe_filter(filter.clone()).await {
2154 Ok(sub_id) => { 2098 Ok(sub_id) => {
2155 subscription_ids.insert(sub_id); 2099 subscription_ids.insert(sub_id);
@@ -2157,8 +2101,10 @@ impl SyncManager {
2157 Err(e) => { 2101 Err(e) => {
2158 tracing::error!( 2102 tracing::error!(
2159 relay = %relay_url, 2103 relay = %relay_url,
2104 batch_id = batch_id,
2105 chunk_idx = idx,
2160 error = %e, 2106 error = %e,
2161 "Failed to subscribe to filter in historic_sync" 2107 "Failed to subscribe to ID filter chunk"
2162 ); 2108 );
2163 } 2109 }
2164 } 2110 }