diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 47 |
1 files changed, 16 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6adfc55..e66611c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1107,7 +1107,10 @@ impl SyncManager { | |||
| 1107 | /// | 1107 | /// |
| 1108 | /// Uses the confirmed repos and root_events from RelayState to build filters. | 1108 | /// Uses the confirmed repos and root_events from RelayState to build filters. |
| 1109 | /// If since is provided, applies it to all filters for incremental sync. | 1109 | /// If since is provided, applies it to all filters for incremental sync. |
| 1110 | async fn rebuild_layer2_and_layer3(&self, relay_url: &str, since: Option<Timestamp>) { | 1110 | /// |
| 1111 | /// CRITICAL: This method now creates a PendingBatch to track subscriptions, | ||
| 1112 | /// ensuring EOSE handling works correctly for live sync scenarios. | ||
| 1113 | async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) { | ||
| 1111 | use crate::sync::filters::build_layer2_and_layer3_filters; | 1114 | use crate::sync::filters::build_layer2_and_layer3_filters; |
| 1112 | 1115 | ||
| 1113 | // Get confirmed state from relay_sync_index | 1116 | // Get confirmed state from relay_sync_index |
| @@ -1137,36 +1140,15 @@ impl SyncManager { | |||
| 1137 | // Build Layer 2 and Layer 3 filters | 1140 | // Build Layer 2 and Layer 3 filters |
| 1138 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); | 1141 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); |
| 1139 | 1142 | ||
| 1140 | // DEBUG TRACING: Log detailed filter information | 1143 | if filters.is_empty() { |
| 1141 | tracing::debug!( | 1144 | tracing::debug!( |
| 1142 | relay = %relay_url, | ||
| 1143 | filter_count = filters.len(), | ||
| 1144 | repos_count = repos.len(), | ||
| 1145 | root_events_count = root_events.len(), | ||
| 1146 | repos = ?repos, | ||
| 1147 | root_events = ?root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | ||
| 1148 | filters = ?filters, | ||
| 1149 | since = ?since, | ||
| 1150 | "Rebuilding Layer 2/3 filters" | ||
| 1151 | ); | ||
| 1152 | |||
| 1153 | // Subscribe to filters on the relay connection | ||
| 1154 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1155 | for filter in filters { | ||
| 1156 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 1157 | tracing::error!( | ||
| 1158 | relay = %relay_url, | ||
| 1159 | error = %e, | ||
| 1160 | "Failed to subscribe to Layer 2/3 filter during rebuild" | ||
| 1161 | ); | ||
| 1162 | } | ||
| 1163 | } | ||
| 1164 | } else { | ||
| 1165 | tracing::warn!( | ||
| 1166 | relay = %relay_url, | 1145 | relay = %relay_url, |
| 1167 | "No active connection found for Layer 2/3 rebuild" | 1146 | "No filters generated for Layer 2/3 rebuild" |
| 1168 | ); | 1147 | ); |
| 1148 | return; | ||
| 1169 | } | 1149 | } |
| 1150 | // TODO do we add since instead of limit to live sync or do a historic sync of filters? | ||
| 1151 | self.sync_live(relay_url, &filters).await; | ||
| 1170 | } | 1152 | } |
| 1171 | 1153 | ||
| 1172 | /// Register a relay for managed connection/reconnection | 1154 | /// Register a relay for managed connection/reconnection |
| @@ -1950,7 +1932,6 @@ impl SyncManager { | |||
| 1950 | } | 1932 | } |
| 1951 | 1933 | ||
| 1952 | /// Sync historical events and track in PendingSyncIndex | 1934 | /// Sync historical events and track in PendingSyncIndex |
| 1953 | #[allow(dead_code)] // Will be used in Phase 3+ | ||
| 1954 | /// | 1935 | /// |
| 1955 | /// This method handles historical synchronization for a set of filters, | 1936 | /// This method handles historical synchronization for a set of filters, |
| 1956 | /// creating a PendingBatch to track completion. It dispatches to either | 1937 | /// creating a PendingBatch to track completion. It dispatches to either |
| @@ -2017,8 +1998,8 @@ impl SyncManager { | |||
| 2017 | 1998 | ||
| 2018 | // Check if we should use negentropy | 1999 | // Check if we should use negentropy |
| 2019 | // TODO once we have setup our new tests we will re-enable this and fix our implementation | 2000 | // TODO once we have setup our new tests we will re-enable this and fix our implementation |
| 2020 | let use_negentropy = false; | 2001 | let use_negentropy = |
| 2021 | // !self.config.sync_disable_negentropy && connection.supports_negentropy().await; | 2002 | !self.config.sync_disable_negentropy && connection.supports_negentropy().await; |
| 2022 | 2003 | ||
| 2023 | // Generate batch ID | 2004 | // Generate batch ID |
| 2024 | let batch_id = self.next_batch_id(); | 2005 | let batch_id = self.next_batch_id(); |
| @@ -2136,7 +2117,11 @@ impl SyncManager { | |||
| 2136 | total_received = 0, | 2117 | total_received = 0, |
| 2137 | "historic_sync (negentropy) completed - already up-to-date" | 2118 | "historic_sync (negentropy) completed - already up-to-date" |
| 2138 | ); | 2119 | ); |
| 2120 | |||
| 2121 | // Batch already confirmed, nothing more to do | ||
| 2122 | return Some(batch_id); | ||
| 2139 | } | 2123 | } |
| 2124 | |||
| 2140 | // launch subscriptions to fetch missing events by id | 2125 | // launch subscriptions to fetch missing events by id |
| 2141 | let ids_filters: Vec<_> = all_remote_ids | 2126 | let ids_filters: Vec<_> = all_remote_ids |
| 2142 | .chunks(300) | 2127 | .chunks(300) |