diff options
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 118 |
1 files changed, 32 insertions, 86 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e66611c..fd59759 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -738,14 +738,6 @@ impl SyncManager { | |||
| 738 | /// - For existing connected relays: subscribes to filters, creates PendingBatch | 738 | /// - For existing connected relays: subscribes to filters, creates PendingBatch |
| 739 | /// - For disconnected/connecting relays: returns (will be handled on connection) | 739 | /// - For disconnected/connecting relays: returns (will be handled on connection) |
| 740 | async fn handle_new_sync_filters(&mut self, action: AddFilters) { | 740 | async fn handle_new_sync_filters(&mut self, action: AddFilters) { |
| 741 | tracing::info!( | ||
| 742 | relay = %action.relay_url, | ||
| 743 | repo_count = action.items.repos.len(), | ||
| 744 | root_event_count = action.items.root_events.len(), | ||
| 745 | filter_count = action.filters.len(), | ||
| 746 | "[DIAG] handle_new_sync_filters called" | ||
| 747 | ); | ||
| 748 | |||
| 749 | // Step 1: Check if relay exists in relay_sync_index | 741 | // Step 1: Check if relay exists in relay_sync_index |
| 750 | let connection_status = { | 742 | let connection_status = { |
| 751 | let index = self.relay_sync_index.read().await; | 743 | let index = self.relay_sync_index.read().await; |
| @@ -891,20 +883,15 @@ impl SyncManager { | |||
| 891 | .await; | 883 | .await; |
| 892 | } | 884 | } |
| 893 | RelayEvent::Closed(reason) => { | 885 | RelayEvent::Closed(reason) => { |
| 894 | tracing::info!( | 886 | // CLOSED message means one subscription was closed, not the whole connection |
| 887 | // This is normal behavior (e.g., when historic_sync completes) | ||
| 888 | tracing::debug!( | ||
| 895 | relay = %relay_url_clone, | 889 | relay = %relay_url_clone, |
| 896 | reason = %reason, | 890 | reason = %reason, |
| 897 | "Relay connection closed" | 891 | "Relay closed a subscription (not a connection close)" |
| 898 | ); | 892 | ); |
| 899 | if !disconnect_sent { | 893 | // Don't break - other subscriptions remain active |
| 900 | let _ = disconnect_tx | 894 | // Don't send disconnect - connection is still alive |
| 901 | .send(DisconnectNotification { | ||
| 902 | relay_url: relay_url_clone.clone(), | ||
| 903 | }) | ||
| 904 | .await; | ||
| 905 | disconnect_sent = true; | ||
| 906 | } | ||
| 907 | break; | ||
| 908 | } | 895 | } |
| 909 | RelayEvent::Shutdown => { | 896 | RelayEvent::Shutdown => { |
| 910 | tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); | 897 | tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); |
| @@ -1031,12 +1018,14 @@ impl SyncManager { | |||
| 1031 | 1018 | ||
| 1032 | async fn sync_generic_filters(&mut self, relay_url: &str, since: Option<Timestamp>) { | 1019 | async fn sync_generic_filters(&mut self, relay_url: &str, since: Option<Timestamp>) { |
| 1033 | let filters = vec![filters::build_announcement_filter(None)]; | 1020 | let filters = vec![filters::build_announcement_filter(None)]; |
| 1034 | self.sync_live(relay_url, &filters).await; | 1021 | |
| 1022 | // Create live subscription for ongoing announcements | ||
| 1023 | let _sub_ids = self.sync_live(relay_url, &filters).await; | ||
| 1035 | 1024 | ||
| 1036 | // Use historic_sync with empty PendingItems for generic filters | 1025 | // Use historic_sync with empty PendingItems for generic filters |
| 1037 | // Generic filters (announcements) don't have associated repos or root_events | 1026 | // Generic filters (announcements) don't have associated repos or root_events |
| 1038 | let items = PendingItems::default(); | 1027 | let items = PendingItems::default(); |
| 1039 | self.historic_sync(relay_url, filters, items, since).await; | 1028 | let _batch_id = self.historic_sync(relay_url, filters, items, since).await; |
| 1040 | } | 1029 | } |
| 1041 | 1030 | ||
| 1042 | /// Quick reconnect - for disconnections < 15 minutes | 1031 | /// Quick reconnect - for disconnections < 15 minutes |
| @@ -1050,22 +1039,11 @@ impl SyncManager { | |||
| 1050 | /// Basic connection state and metrics are managed by handle_connect_or_reconnect. | 1039 | /// Basic connection state and metrics are managed by handle_connect_or_reconnect. |
| 1051 | /// This method handles reconnect-specific concerns (health tracking, reconnect metrics). | 1040 | /// This method handles reconnect-specific concerns (health tracking, reconnect metrics). |
| 1052 | async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { | 1041 | async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { |
| 1053 | tracing::info!( | ||
| 1054 | relay = %relay_url, | ||
| 1055 | since = %since, | ||
| 1056 | "Starting quick_reconnect" | ||
| 1057 | ); | ||
| 1058 | |||
| 1059 | // Step 1: Clear PendingSyncIndex for this relay | 1042 | // Step 1: Clear PendingSyncIndex for this relay |
| 1060 | // Old subscriptions are dead after disconnect | 1043 | // Old subscriptions are dead after disconnect |
| 1061 | { | 1044 | { |
| 1062 | let mut pending = self.pending_sync_index.write().await; | 1045 | let mut pending = self.pending_sync_index.write().await; |
| 1063 | if pending.remove(relay_url).is_some() { | 1046 | pending.remove(relay_url); |
| 1064 | tracing::debug!( | ||
| 1065 | relay = %relay_url, | ||
| 1066 | "Cleared pending batches in quick_reconnect" | ||
| 1067 | ); | ||
| 1068 | } | ||
| 1069 | } | 1047 | } |
| 1070 | 1048 | ||
| 1071 | // Record successful reconnection in health tracker | 1049 | // Record successful reconnection in health tracker |
| @@ -1090,6 +1068,7 @@ impl SyncManager { | |||
| 1090 | None | 1068 | None |
| 1091 | } | 1069 | } |
| 1092 | }; | 1070 | }; |
| 1071 | |||
| 1093 | self.sync_generic_filters(relay_url, announcement_since) | 1072 | self.sync_generic_filters(relay_url, announcement_since) |
| 1094 | .await; | 1073 | .await; |
| 1095 | 1074 | ||
| @@ -1099,8 +1078,6 @@ impl SyncManager { | |||
| 1099 | 1078 | ||
| 1100 | // Step 4: compute_actions for any NEW items discovered while disconnected | 1079 | // Step 4: compute_actions for any NEW items discovered while disconnected |
| 1101 | self.recompute_new_sync_filters_for_relay(relay_url).await; | 1080 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1102 | |||
| 1103 | tracing::info!(relay = %relay_url, "quick_reconnect complete"); | ||
| 1104 | } | 1081 | } |
| 1105 | 1082 | ||
| 1106 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay | 1083 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay |
| @@ -1405,7 +1382,6 @@ impl SyncManager { | |||
| 1405 | ) { | 1382 | ) { |
| 1406 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; | 1383 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; |
| 1407 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | 1384 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; |
| 1408 | |||
| 1409 | // Check if event already exists | 1385 | // Check if event already exists |
| 1410 | match database.event_by_id(&event.id).await { | 1386 | match database.event_by_id(&event.id).await { |
| 1411 | Ok(Some(_)) => { | 1387 | Ok(Some(_)) => { |
| @@ -1805,13 +1781,7 @@ impl SyncManager { | |||
| 1805 | } | 1781 | } |
| 1806 | } | 1782 | } |
| 1807 | 1783 | ||
| 1808 | // ========================================================================= | ||
| 1809 | // Sync Primitives (Phase 1 of GRASP-02 refactoring) | ||
| 1810 | // These methods are new primitives that will be used in subsequent phases. | ||
| 1811 | // ========================================================================= | ||
| 1812 | |||
| 1813 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex | 1784 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex |
| 1814 | #[allow(dead_code)] // Will be used in Phase 2+ | ||
| 1815 | /// | 1785 | /// |
| 1816 | /// This method subscribes to filters with `limit: 0` for receiving ongoing events. | 1786 | /// This method subscribes to filters with `limit: 0` for receiving ongoing events. |
| 1817 | /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have | 1787 | /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have |
| @@ -1835,47 +1805,25 @@ impl SyncManager { | |||
| 1835 | let connection = match self.connections.get(relay_url) { | 1805 | let connection = match self.connections.get(relay_url) { |
| 1836 | Some(conn) => conn, | 1806 | Some(conn) => conn, |
| 1837 | None => { | 1807 | None => { |
| 1838 | tracing::warn!( | 1808 | tracing::debug!(relay = %relay_url, "No connection found for live sync"); |
| 1839 | relay = %relay_url, | ||
| 1840 | "No connection found for sync_live" | ||
| 1841 | ); | ||
| 1842 | return vec![]; | 1809 | return vec![]; |
| 1843 | } | 1810 | } |
| 1844 | }; | 1811 | }; |
| 1845 | 1812 | ||
| 1846 | let mut sub_ids = Vec::new(); | 1813 | let mut sub_ids = Vec::new(); |
| 1847 | 1814 | ||
| 1848 | for filter in filters { | 1815 | for filter in filters.iter() { |
| 1849 | // Apply limit: 0 to make this a live subscription | 1816 | // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) |
| 1850 | // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", | ||
| 1851 | // but omitting limit means "no limit" which is what we want for live. | ||
| 1852 | // The filter passed in should already NOT have a limit set. | ||
| 1853 | match connection.subscribe_filter(filter.clone().limit(1)).await { | 1817 | match connection.subscribe_filter(filter.clone().limit(1)).await { |
| 1854 | Ok(sub_id) => { | 1818 | Ok(sub_id) => { |
| 1855 | tracing::trace!( | ||
| 1856 | relay = %relay_url, | ||
| 1857 | sub_id = %sub_id, | ||
| 1858 | "Live subscription created" | ||
| 1859 | ); | ||
| 1860 | sub_ids.push(sub_id); | 1819 | sub_ids.push(sub_id); |
| 1861 | } | 1820 | } |
| 1862 | Err(e) => { | 1821 | Err(e) => { |
| 1863 | tracing::error!( | 1822 | tracing::error!(relay = %relay_url, error = %e, "Failed to create live subscription"); |
| 1864 | relay = %relay_url, | ||
| 1865 | error = %e, | ||
| 1866 | "Failed to create live subscription" | ||
| 1867 | ); | ||
| 1868 | } | 1823 | } |
| 1869 | } | 1824 | } |
| 1870 | } | 1825 | } |
| 1871 | 1826 | ||
| 1872 | tracing::debug!( | ||
| 1873 | relay = %relay_url, | ||
| 1874 | filter_count = filters.len(), | ||
| 1875 | sub_count = sub_ids.len(), | ||
| 1876 | "sync_live completed" | ||
| 1877 | ); | ||
| 1878 | |||
| 1879 | sub_ids | 1827 | sub_ids |
| 1880 | } | 1828 | } |
| 1881 | 1829 | ||
| @@ -2054,13 +2002,16 @@ impl SyncManager { | |||
| 2054 | match result { | 2002 | match result { |
| 2055 | Ok(reconciliation) => { | 2003 | Ok(reconciliation) => { |
| 2056 | let remote_count = reconciliation.remote.len(); | 2004 | let remote_count = reconciliation.remote.len(); |
| 2005 | let local_count = reconciliation.local.len(); | ||
| 2006 | tracing::debug!( | ||
| 2007 | relay = %relay_url, | ||
| 2008 | filter_idx = idx, | ||
| 2009 | remote_count = remote_count, | ||
| 2010 | local_count = local_count, | ||
| 2011 | remote_ids = ?reconciliation.remote, | ||
| 2012 | "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx | ||
| 2013 | ); | ||
| 2057 | if remote_count > 0 { | 2014 | if remote_count > 0 { |
| 2058 | tracing::debug!( | ||
| 2059 | relay = %relay_url, | ||
| 2060 | filter_idx = idx, | ||
| 2061 | remote_count = remote_count, | ||
| 2062 | "Negentropy diff identified missing events" | ||
| 2063 | ); | ||
| 2064 | all_remote_ids.extend(reconciliation.remote.into_iter()); | 2015 | all_remote_ids.extend(reconciliation.remote.into_iter()); |
| 2065 | } | 2016 | } |
| 2066 | } | 2017 | } |
| @@ -2129,27 +2080,20 @@ impl SyncManager { | |||
| 2129 | .collect(); | 2080 | .collect(); |
| 2130 | 2081 | ||
| 2131 | // DEBUG TRACING: Log that we're requesting events by ID | 2082 | // DEBUG TRACING: Log that we're requesting events by ID |
| 2132 | tracing::debug!( | 2083 | tracing::info!( |
| 2133 | relay = %relay_url, | 2084 | relay = %relay_url, |
| 2134 | batch_id = batch_id, | 2085 | batch_id = batch_id, |
| 2135 | total_event_ids = all_remote_ids.len(), | 2086 | total_event_ids = all_remote_ids.len(), |
| 2136 | filter_chunks = ids_filters.len(), | 2087 | filter_chunks = ids_filters.len(), |
| 2137 | event_ids = ?all_remote_ids, | 2088 | event_ids = ?all_remote_ids, |
| 2138 | "Creating subscriptions to fetch missing events by ID (negentropy path)" | 2089 | "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID", |
| 2090 | ids_filters.len(), | ||
| 2091 | all_remote_ids.len() | ||
| 2139 | ); | 2092 | ); |
| 2140 | 2093 | ||
| 2141 | let mut subscription_ids = HashSet::new(); | 2094 | let mut subscription_ids = HashSet::new(); |
| 2142 | for (idx, filter) in ids_filters.iter().enumerate() { | 2095 | for (idx, filter) in ids_filters.iter().enumerate() { |
| 2143 | if let Some(conn) = self.connections.get(relay_url) { | 2096 | if let Some(conn) = self.connections.get(relay_url) { |
| 2144 | // DEBUG TRACING: Log each filter being subscribed | ||
| 2145 | tracing::debug!( | ||
| 2146 | relay = %relay_url, | ||
| 2147 | batch_id = batch_id, | ||
| 2148 | chunk_idx = idx, | ||
| 2149 | filter = ?filter, | ||
| 2150 | "Subscribing to ID filter chunk" | ||
| 2151 | ); | ||
| 2152 | |||
| 2153 | match conn.subscribe_filter(filter.clone()).await { | 2097 | match conn.subscribe_filter(filter.clone()).await { |
| 2154 | Ok(sub_id) => { | 2098 | Ok(sub_id) => { |
| 2155 | subscription_ids.insert(sub_id); | 2099 | subscription_ids.insert(sub_id); |
| @@ -2157,8 +2101,10 @@ impl SyncManager { | |||
| 2157 | Err(e) => { | 2101 | Err(e) => { |
| 2158 | tracing::error!( | 2102 | tracing::error!( |
| 2159 | relay = %relay_url, | 2103 | relay = %relay_url, |
| 2104 | batch_id = batch_id, | ||
| 2105 | chunk_idx = idx, | ||
| 2160 | error = %e, | 2106 | error = %e, |
| 2161 | "Failed to subscribe to filter in historic_sync" | 2107 | "Failed to subscribe to ID filter chunk" |
| 2162 | ); | 2108 | ); |
| 2163 | } | 2109 | } |
| 2164 | } | 2110 | } |