upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-10 01:34:11 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-10 01:34:11 +0000
commit3f19ab476799071d11e5f61074b60e31511f68a2 (patch)
tree265a7f048b75d5b12f37d58e05f1db8cb35152d8 /src/sync
parentc9b3b3bd8a04de139bcb0d0b83bf819c367ee8c8 (diff)
fix: implement negentropy fallback to REQ+EOSE when negentropy fails
When negentropy sync fails (one or more filters fail during diff), the code previously left a pending batch and returned early, preventing any sync from happening. This caused the "No sync targets found" issue. Changes: - Track negentropy success with a boolean flag - On negentropy failure: clean up pending batch and fall through to REQ+EOSE - Log the fallback at info level for visibility - Restructure control flow so REQ+EOSE path executes after negentropy failure This ensures sync always completes using traditional REQ+EOSE when NIP-77 negentropy is unavailable or fails.
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs178
1 files changed, 103 insertions, 75 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 8b51fac..d8c2d4f 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -3146,6 +3146,9 @@ impl SyncManager {
3146 // Generate batch ID 3146 // Generate batch ID
3147 let batch_id = self.next_batch_id(); 3147 let batch_id = self.next_batch_id();
3148 3148
3149 // Track whether negentropy succeeded (for fallback logic)
3150 let mut negentropy_succeeded = false;
3151
3149 if use_negentropy && !filters_with_since.is_empty() { 3152 if use_negentropy && !filters_with_since.is_empty() {
3150 // NIP-77 negentropy path 3153 // NIP-77 negentropy path
3151 tracing::debug!( 3154 tracing::debug!(
@@ -3238,106 +3241,131 @@ impl SyncManager {
3238 3241
3239 // Require ALL filters to succeed to confirm the batch 3242 // Require ALL filters to succeed to confirm the batch
3240 if failed_count > 0 { 3243 if failed_count > 0 {
3241 // Leave pending batch so it doesnt appear as synced. we can try again later. 3244 // Remove failed negentropy batch and fall back to REQ+EOSE
3242 tracing::warn!( 3245 {
3243 relay = %relay_url,
3244 batch_id = batch_id,
3245 failed_count = failed_count,
3246 total_filters = filters_with_since.len(),
3247 "historic_sync (negentropy) failed - not all filters succeeded"
3248 );
3249 return None;
3250 } else if all_remote_ids.is_empty() {
3251 // Remove batch from pending and confirm it (no items to download)
3252 let completed_batch = {
3253 let mut pending = self.pending_sync_index.write().await; 3246 let mut pending = self.pending_sync_index.write().await;
3254 if let Some(batches) = pending.get_mut(relay_url) { 3247 if let Some(batches) = pending.get_mut(relay_url) {
3255 let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); 3248 let batch_idx = batches.iter().position(|b| b.batch_id == batch_id);
3256 if let Some(idx) = batch_idx { 3249 if let Some(idx) = batch_idx {
3257 let batch = batches.remove(idx); 3250 batches.remove(idx);
3258 if batches.is_empty() { 3251 if batches.is_empty() {
3259 pending.remove(relay_url); 3252 pending.remove(relay_url);
3260 } 3253 }
3261 Some(batch)
3262 } else {
3263 None
3264 } 3254 }
3265 } else {
3266 None
3267 } 3255 }
3268 };
3269
3270 if let Some(batch) = completed_batch {
3271 self.confirm_batch(relay_url, batch).await;
3272 } 3256 }
3273 3257
3274 tracing::info!( 3258 tracing::info!(
3275 relay = %relay_url, 3259 relay = %relay_url,
3276 batch_id = batch_id, 3260 batch_id = batch_id,
3277 total_received = 0, 3261 failed_count = failed_count,
3278 "historic_sync (negentropy) completed - already up-to-date" 3262 total_filters = filters_with_since.len(),
3263 "historic_sync (negentropy) failed - falling back to REQ+EOSE"
3279 ); 3264 );
3280 3265
3281 // Batch already confirmed, nothing more to do 3266 // Fall through to REQ+EOSE path below
3282 return Some(batch_id); 3267 } else {
3283 } 3268 // Negentropy succeeded - mark success and process results
3269 negentropy_succeeded = true;
3284 3270
3285 // launch subscriptions to fetch missing events by id 3271 if all_remote_ids.is_empty() {
3286 let ids_filters: Vec<_> = all_remote_ids 3272 // Remove batch from pending and confirm it (no items to download)
3287 .chunks(300) 3273 let completed_batch = {
3288 .map(|c| Filter::new().ids(c.iter().copied())) 3274 let mut pending = self.pending_sync_index.write().await;
3289 .collect(); 3275 if let Some(batches) = pending.get_mut(relay_url) {
3276 let batch_idx = batches.iter().position(|b| b.batch_id == batch_id);
3277 if let Some(idx) = batch_idx {
3278 let batch = batches.remove(idx);
3279 if batches.is_empty() {
3280 pending.remove(relay_url);
3281 }
3282 Some(batch)
3283 } else {
3284 None
3285 }
3286 } else {
3287 None
3288 }
3289 };
3290 3290
3291 // DEBUG TRACING: Log that we're requesting events by ID 3291 if let Some(batch) = completed_batch {
3292 tracing::info!( 3292 self.confirm_batch(relay_url, batch).await;
3293 relay = %relay_url, 3293 }
3294 batch_id = batch_id,
3295 total_event_ids = all_remote_ids.len(),
3296 filter_chunks = ids_filters.len(),
3297 event_ids = ?all_remote_ids,
3298 "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID",
3299 ids_filters.len(),
3300 all_remote_ids.len()
3301 );
3302 3294
3303 let mut subscription_ids = HashSet::new(); 3295 tracing::info!(
3304 for (idx, filter) in ids_filters.iter().enumerate() { 3296 relay = %relay_url,
3305 if let Some(conn) = self.connections.get(relay_url) { 3297 batch_id = batch_id,
3306 match conn.subscribe_filter(filter.clone(), true).await { 3298 total_received = 0,
3307 Ok(sub_id) => { 3299 "historic_sync (negentropy) completed - already up-to-date"
3308 subscription_ids.insert(sub_id); 3300 );
3309 } 3301
3310 Err(e) => { 3302 // Batch already confirmed, nothing more to do
3311 tracing::error!( 3303 return Some(batch_id);
3312 relay = %relay_url, 3304 }
3313 batch_id = batch_id, 3305
3314 chunk_idx = idx, 3306 // launch subscriptions to fetch missing events by id
3315 error = %e, 3307 let ids_filters: Vec<_> = all_remote_ids
3316 "Failed to subscribe to ID filter chunk" 3308 .chunks(300)
3317 ); 3309 .map(|c| Filter::new().ids(c.iter().copied()))
3310 .collect();
3311
3312 // DEBUG TRACING: Log that we're requesting events by ID
3313 tracing::info!(
3314 relay = %relay_url,
3315 batch_id = batch_id,
3316 total_event_ids = all_remote_ids.len(),
3317 filter_chunks = ids_filters.len(),
3318 event_ids = ?all_remote_ids,
3319 "[DIAG TRACE] ✓ Creating {} subscription(s) to fetch {} missing event(s) by ID",
3320 ids_filters.len(),
3321 all_remote_ids.len()
3322 );
3323
3324 let mut subscription_ids = HashSet::new();
3325 for (idx, filter) in ids_filters.iter().enumerate() {
3326 if let Some(conn) = self.connections.get(relay_url) {
3327 match conn.subscribe_filter(filter.clone(), true).await {
3328 Ok(sub_id) => {
3329 subscription_ids.insert(sub_id);
3330 }
3331 Err(e) => {
3332 tracing::error!(
3333 relay = %relay_url,
3334 batch_id = batch_id,
3335 chunk_idx = idx,
3336 error = %e,
3337 "Failed to subscribe to ID filter chunk"
3338 );
3339 }
3318 } 3340 }
3319 } 3341 }
3320 } 3342 }
3321 } 3343 {
3322 { 3344 let mut pending = self.pending_sync_index.write().await;
3323 let mut pending = self.pending_sync_index.write().await; 3345 if let Some(relay_batches) = pending.get_mut(relay_url) {
3324 if let Some(relay_batches) = pending.get_mut(relay_url) { 3346 if let Some(batch) =
3325 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { 3347 relay_batches.iter_mut().find(|b| b.batch_id == batch_id)
3326 batch.outstanding_subs.extend(subscription_ids.clone()); 3348 {
3327 // Store requested event IDs for validation after EOSE 3349 batch.outstanding_subs.extend(subscription_ids.clone());
3328 batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect()); 3350 // Store requested event IDs for validation after EOSE
3329 batch.received_event_ids = Some(HashSet::new()); 3351 batch.requested_event_ids =
3352 Some(all_remote_ids.iter().cloned().collect());
3353 batch.received_event_ids = Some(HashSet::new());
3354 }
3330 } 3355 }
3331 } 3356 }
3357 tracing::debug!(
3358 relay = %relay_url,
3359 batch_id = batch_id,
3360 subscription_ids = subscription_ids.len(),
3361 events = all_remote_ids.len(),
3362 "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE"
3363 );
3332 } 3364 }
3333 tracing::debug!( 3365 }
3334 relay = %relay_url, 3366
3335 batch_id = batch_id, 3367 // Use REQ+EOSE if negentropy was not attempted or failed
3336 subscription_ids = subscription_ids.len(), 3368 if !negentropy_succeeded {
3337 events = all_remote_ids.len(),
3338 "historic_sync (Negentropy) created subscriptions to fetch missing events by id, awaiting EOSE"
3339 );
3340 } else {
3341 // Traditional REQ+EOSE path 3369 // Traditional REQ+EOSE path
3342 tracing::debug!( 3370 tracing::debug!(
3343 relay = %relay_url, 3371 relay = %relay_url,