diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/mod.rs | 178 |
1 files changed, 103 insertions, 75 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8b51fac..d8c2d4f 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -3146,6 +3146,9 @@ impl SyncManager { | |||
| 3146 | // Generate batch ID | 3146 | // Generate batch ID |
| 3147 | let batch_id = self.next_batch_id(); | 3147 | let batch_id = self.next_batch_id(); |
| 3148 | 3148 | ||
| 3149 | // Track whether negentropy succeeded (for fallback logic) | ||
| 3150 | let mut negentropy_succeeded = false; | ||
| 3151 | |||
| 3149 | if use_negentropy && !filters_with_since.is_empty() { | 3152 | if use_negentropy && !filters_with_since.is_empty() { |
| 3150 | // NIP-77 negentropy path | 3153 | // NIP-77 negentropy path |
| 3151 | tracing::debug!( | 3154 | tracing::debug!( |
| @@ -3238,106 +3241,131 @@ impl SyncManager { | |||
| 3238 | 3241 | ||
| 3239 | // Require ALL filters to succeed to confirm the batch | 3242 | // Require ALL filters to succeed to confirm the batch |
| 3240 | if failed_count > 0 { | 3243 | if failed_count > 0 { |
| 3241 | // Leave pending batch so it doesnt appear as synced. we can try again later. | 3244 | // Remove failed negentropy batch and fall back to REQ+EOSE |
| 3242 | tracing::warn!( | 3245 | { |
| 3243 | relay = %relay_url, | ||
| 3244 | batch_id = batch_id, | ||
| 3245 | failed_count = failed_count, | ||
| 3246 | total_filters = filters_with_since.len(), | ||
| 3247 | "historic_sync (negentropy) failed - not all filters succeeded" | ||
| 3248 | ); | ||
| 3249 | return None; | ||
| 3250 | } else if all_remote_ids.is_empty() { | ||
| 3251 | // Remove batch from pending and confirm it (no items to download) | ||
| 3252 | let completed_batch = { | ||
| 3253 | let mut pending = self.pending_sync_index.write().await; | 3246 | let mut pending = self.pending_sync_index.write().await; |
| 3254 | if let Some(batches) = pending.get_mut(relay_url) { | 3247 | if let Some(batches) = pending.get_mut(relay_url) { |
| 3255 | let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); | 3248 | let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); |
| 3256 | if let Some(idx) = batch_idx { | 3249 | if let Some(idx) = batch_idx { |
| 3257 | let batch = batches.remove(idx); | 3250 | batches.remove(idx); |
| 3258 | if batches.is_empty() { | 3251 | if batches.is_empty() { |
| 3259 | pending.remove(relay_url); | 3252 | pending.remove(relay_url); |
| 3260 | } | 3253 | } |
| 3261 | Some(batch) | ||
| 3262 | } else { | ||
| 3263 | None | ||
| 3264 | } | 3254 | } |
| 3265 | } else { | ||
| 3266 | None | ||
| 3267 | } | 3255 | } |
| 3268 | }; | ||
| 3269 | |||
| 3270 | if let Some(batch) = completed_batch { | ||
| 3271 | self.confirm_batch(relay_url, batch).await; | ||
| 3272 | } | 3256 | } |
| 3273 | 3257 | ||
| 3274 | tracing::info!( | 3258 | tracing::info!( |
| 3275 | relay = %relay_url, | 3259 | relay = %relay_url, |
| 3276 | batch_id = batch_id, | 3260 | batch_id = batch_id, |
| 3277 | total_received = 0, | 3261 | failed_count = failed_count, |
| 3278 | "historic_sync (negentropy) completed - already up-to-date" | 3262 | total_filters = filters_with_since.len(), |
| 3263 | "historic_sync (negentropy) failed - falling back to REQ+EOSE" | ||
| 3279 | ); | 3264 | ); |
| 3280 | 3265 | ||
| 3281 | // Batch already confirmed, nothing more to do | 3266 | // Fall through to REQ+EOSE path below |
| 3282 | return Some(batch_id); | 3267 | } else { |
| 3283 | } | 3268 | // Negentropy succeeded - mark success and process results |
| 3269 | negentropy_succeeded = true; | ||
| 3284 | 3270 | ||
| 3285 | // launch subscriptions to fetch missing events by id | 3271 | if all_remote_ids.is_empty() { |
| 3286 | let ids_filters: Vec<_> = all_remote_ids | 3272 | // Remove batch from pending and confirm it (no items to download) |
| 3287 | .chunks(300) | 3273 | let completed_batch = { |
| 3288 | .map(|c| Filter::new().ids(c.iter().copied())) | 3274 | let mut pending = self.pending_sync_index.write().await; |
| 3289 | .collect(); | 3275 | if let Some(batches) = pending.get_mut(relay_url) { |
| 3276 | let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); | ||
| 3277 | if let Some(idx) = batch_idx { | ||
| 3278 | let batch = batches.remove(idx); | ||
| 3279 | if batches.is_empty() { | ||
| 3280 | pending.remove(relay_url); | ||
| 3281 | } | ||
| 3282 | Some(batch) | ||
| 3283 | } else { | ||
| 3284 | None | ||
| 3285 | } | ||
| 3286 | } else { | ||
| 3287 | None | ||
| 3288 | } | ||
| 3289 | }; | ||
| 3290 | 3290 | ||
| 3291 | // DEBUG TRACING: Log that we're requesting events by ID | 3291 | if let Some(batch) = completed_batch { |
| 3292 | tracing::info!( | 3292 | self.confirm_batch(relay_url, batch).await; |
| 3293 | relay = %relay_url, | 3293 | } |
| 3294 | batch_id = batch_id, | ||
| 3295 | total_event_ids = all_remote_ids.len(), | ||
| 3296 | filter_chunks = ids_filters.len(), | ||
| 3297 | event_ids = ?all_remote_ids, | ||
| 3298 | "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID", | ||
| 3299 | ids_filters.len(), | ||
| 3300 | all_remote_ids.len() | ||
| 3301 | ); | ||
| 3302 | 3294 | ||
| 3303 | let mut subscription_ids = HashSet::new(); | 3295 | tracing::info!( |
| 3304 | for (idx, filter) in ids_filters.iter().enumerate() { | 3296 | relay = %relay_url, |
| 3305 | if let Some(conn) = self.connections.get(relay_url) { | 3297 | batch_id = batch_id, |
| 3306 | match conn.subscribe_filter(filter.clone(), true).await { | 3298 | total_received = 0, |
| 3307 | Ok(sub_id) => { | 3299 | "historic_sync (negentropy) completed - already up-to-date" |
| 3308 | subscription_ids.insert(sub_id); | 3300 | ); |
| 3309 | } | 3301 | |
| 3310 | Err(e) => { | 3302 | // Batch already confirmed, nothing more to do |
| 3311 | tracing::error!( | 3303 | return Some(batch_id); |
| 3312 | relay = %relay_url, | 3304 | } |
| 3313 | batch_id = batch_id, | 3305 | |
| 3314 | chunk_idx = idx, | 3306 | // launch subscriptions to fetch missing events by id |
| 3315 | error = %e, | 3307 | let ids_filters: Vec<_> = all_remote_ids |
| 3316 | "Failed to subscribe to ID filter chunk" | 3308 | .chunks(300) |
| 3317 | ); | 3309 | .map(|c| Filter::new().ids(c.iter().copied())) |
| 3310 | .collect(); | ||
| 3311 | |||
| 3312 | // DEBUG TRACING: Log that we're requesting events by ID | ||
| 3313 | tracing::info!( | ||
| 3314 | relay = %relay_url, | ||
| 3315 | batch_id = batch_id, | ||
| 3316 | total_event_ids = all_remote_ids.len(), | ||
| 3317 | filter_chunks = ids_filters.len(), | ||
| 3318 | event_ids = ?all_remote_ids, | ||
| 3319 | "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID", | ||
| 3320 | ids_filters.len(), | ||
| 3321 | all_remote_ids.len() | ||
| 3322 | ); | ||
| 3323 | |||
| 3324 | let mut subscription_ids = HashSet::new(); | ||
| 3325 | for (idx, filter) in ids_filters.iter().enumerate() { | ||
| 3326 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 3327 | match conn.subscribe_filter(filter.clone(), true).await { | ||
| 3328 | Ok(sub_id) => { | ||
| 3329 | subscription_ids.insert(sub_id); | ||
| 3330 | } | ||
| 3331 | Err(e) => { | ||
| 3332 | tracing::error!( | ||
| 3333 | relay = %relay_url, | ||
| 3334 | batch_id = batch_id, | ||
| 3335 | chunk_idx = idx, | ||
| 3336 | error = %e, | ||
| 3337 | "Failed to subscribe to ID filter chunk" | ||
| 3338 | ); | ||
| 3339 | } | ||
| 3318 | } | 3340 | } |
| 3319 | } | 3341 | } |
| 3320 | } | 3342 | } |
| 3321 | } | 3343 | { |
| 3322 | { | 3344 | let mut pending = self.pending_sync_index.write().await; |
| 3323 | let mut pending = self.pending_sync_index.write().await; | 3345 | if let Some(relay_batches) = pending.get_mut(relay_url) { |
| 3324 | if let Some(relay_batches) = pending.get_mut(relay_url) { | 3346 | if let Some(batch) = |
| 3325 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { | 3347 | relay_batches.iter_mut().find(|b| b.batch_id == batch_id) |
| 3326 | batch.outstanding_subs.extend(subscription_ids.clone()); | 3348 | { |
| 3327 | // Store requested event IDs for validation after EOSE | 3349 | batch.outstanding_subs.extend(subscription_ids.clone()); |
| 3328 | batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect()); | 3350 | // Store requested event IDs for validation after EOSE |
| 3329 | batch.received_event_ids = Some(HashSet::new()); | 3351 | batch.requested_event_ids = |
| 3352 | Some(all_remote_ids.iter().cloned().collect()); | ||
| 3353 | batch.received_event_ids = Some(HashSet::new()); | ||
| 3354 | } | ||
| 3330 | } | 3355 | } |
| 3331 | } | 3356 | } |
| 3357 | tracing::debug!( | ||
| 3358 | relay = %relay_url, | ||
| 3359 | batch_id = batch_id, | ||
| 3360 | subscription_ids = subscription_ids.len(), | ||
| 3361 | events = all_remote_ids.len(), | ||
| 3362 | "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE" | ||
| 3363 | ); | ||
| 3332 | } | 3364 | } |
| 3333 | tracing::debug!( | 3365 | } |
| 3334 | relay = %relay_url, | 3366 | |
| 3335 | batch_id = batch_id, | 3367 | // Use REQ+EOSE if negentropy was not attempted or failed |
| 3336 | subscription_ids = subscription_ids.len(), | 3368 | if !negentropy_succeeded { |
| 3337 | events = all_remote_ids.len(), | ||
| 3338 | "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE" | ||
| 3339 | ); | ||
| 3340 | } else { | ||
| 3341 | // Traditional REQ+EOSE path | 3369 | // Traditional REQ+EOSE path |
| 3342 | tracing::debug!( | 3370 | tracing::debug!( |
| 3343 | relay = %relay_url, | 3371 | relay = %relay_url, |