upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs47
1 files changed, 16 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 6adfc55..e66611c 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1107,7 +1107,10 @@ impl SyncManager {
1107 /// 1107 ///
1108 /// Uses the confirmed repos and root_events from RelayState to build filters. 1108 /// Uses the confirmed repos and root_events from RelayState to build filters.
1109 /// If since is provided, applies it to all filters for incremental sync. 1109 /// If since is provided, applies it to all filters for incremental sync.
1110 async fn rebuild_layer2_and_layer3(&self, relay_url: &str, since: Option<Timestamp>) { 1110 ///
1111 /// CRITICAL: This method now creates a PendingBatch to track subscriptions,
1112 /// ensuring EOSE handling works correctly for live sync scenarios.
1113 async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) {
1111 use crate::sync::filters::build_layer2_and_layer3_filters; 1114 use crate::sync::filters::build_layer2_and_layer3_filters;
1112 1115
1113 // Get confirmed state from relay_sync_index 1116 // Get confirmed state from relay_sync_index
@@ -1137,36 +1140,15 @@ impl SyncManager {
1137 // Build Layer 2 and Layer 3 filters 1140 // Build Layer 2 and Layer 3 filters
1138 let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); 1141 let filters = build_layer2_and_layer3_filters(&repos, &root_events, since);
1139 1142
1140 // DEBUG TRACING: Log detailed filter information 1143 if filters.is_empty() {
1141 tracing::debug!( 1144 tracing::debug!(
1142 relay = %relay_url,
1143 filter_count = filters.len(),
1144 repos_count = repos.len(),
1145 root_events_count = root_events.len(),
1146 repos = ?repos,
1147 root_events = ?root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
1148 filters = ?filters,
1149 since = ?since,
1150 "Rebuilding Layer 2/3 filters"
1151 );
1152
1153 // Subscribe to filters on the relay connection
1154 if let Some(connection) = self.connections.get(relay_url) {
1155 for filter in filters {
1156 if let Err(e) = connection.subscribe_filter(filter).await {
1157 tracing::error!(
1158 relay = %relay_url,
1159 error = %e,
1160 "Failed to subscribe to Layer 2/3 filter during rebuild"
1161 );
1162 }
1163 }
1164 } else {
1165 tracing::warn!(
1166 relay = %relay_url, 1145 relay = %relay_url,
1167 "No active connection found for Layer 2/3 rebuild" 1146 "No filters generated for Layer 2/3 rebuild"
1168 ); 1147 );
1148 return;
1169 } 1149 }
1150 // TODO do we add since instead of limit to live sync or do a historic sync of filters?
1151 self.sync_live(relay_url, &filters).await;
1170 } 1152 }
1171 1153
1172 /// Register a relay for managed connection/reconnection 1154 /// Register a relay for managed connection/reconnection
@@ -1950,7 +1932,6 @@ impl SyncManager {
1950 } 1932 }
1951 1933
1952 /// Sync historical events and track in PendingSyncIndex 1934 /// Sync historical events and track in PendingSyncIndex
1953 #[allow(dead_code)] // Will be used in Phase 3+
1954 /// 1935 ///
1955 /// This method handles historical synchronization for a set of filters, 1936 /// This method handles historical synchronization for a set of filters,
1956 /// creating a PendingBatch to track completion. It dispatches to either 1937 /// creating a PendingBatch to track completion. It dispatches to either
@@ -2017,8 +1998,8 @@ impl SyncManager {
2017 1998
2018 // Check if we should use negentropy 1999 // Check if we should use negentropy
2019 // TODO once we have setup our new tests we will re-enable this and fix our implementation 2000 // TODO once we have setup our new tests we will re-enable this and fix our implementation
2020 let use_negentropy = false; 2001 let use_negentropy =
2021 // !self.config.sync_disable_negentropy && connection.supports_negentropy().await; 2002 !self.config.sync_disable_negentropy && connection.supports_negentropy().await;
2022 2003
2023 // Generate batch ID 2004 // Generate batch ID
2024 let batch_id = self.next_batch_id(); 2005 let batch_id = self.next_batch_id();
@@ -2136,7 +2117,11 @@ impl SyncManager {
2136 total_received = 0, 2117 total_received = 0,
2137 "historic_sync (negentropy) completed - already up-to-date" 2118 "historic_sync (negentropy) completed - already up-to-date"
2138 ); 2119 );
2120
2121 // Batch already confirmed, nothing more to do
2122 return Some(batch_id);
2139 } 2123 }
2124
2140 // launch subscriptions to fetch missing events by id 2125 // launch subscriptions to fetch missing events by id
2141 let ids_filters: Vec<_> = all_remote_ids 2126 let ids_filters: Vec<_> = all_remote_ids
2142 .chunks(300) 2127 .chunks(300)