upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:44:05 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:44:05 +0000
commit3b65f541b4a3891824c61148d159c1b311e086e8 (patch)
tree1224f8d1ace1c6c9e67b6796b478690291a2519e /src/sync/mod.rs
parent4f171fe53f24b54718a717a77b447175177e29a5 (diff)
sync: implement PendingBatch EOSE confirmation flow
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs179
1 files changed, 169 insertions, 10 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 6e50eba..449e4ec 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -327,6 +327,15 @@ pub struct DisconnectNotification {
327 pub relay_url: String, 327 pub relay_url: String,
328} 328}
329 329
330/// Notification from spawned tasks about EOSE (End Of Stored Events)
331#[derive(Debug)]
332pub struct EoseNotification {
333 /// The relay URL that sent EOSE
334 pub relay_url: String,
335 /// The subscription ID that completed
336 pub sub_id: SubscriptionId,
337}
338
330/// Manages proactive synchronization with external relays 339/// Manages proactive synchronization with external relays
331/// 340///
332/// The SyncManager runs as a background task, subscribing to repository 341/// The SyncManager runs as a background task, subscribing to repository
@@ -389,6 +398,109 @@ impl SyncManager {
389 } 398 }
390 } 399 }
391 400
401 /// Generate a unique batch ID
402 ///
403 /// Increments the internal counter and returns the new value.
404 /// Used for tracking pending batches and debugging/logging.
405 fn next_batch_id(&mut self) -> u64 {
406 self.next_batch_id += 1;
407 self.next_batch_id
408 }
409
410 /// Handle EOSE (End Of Stored Events) for a subscription
411 ///
412 /// This method:
413 /// - Finds the PendingBatch containing this subscription ID
414 /// - Removes the subscription from outstanding_subs
415 /// - When all subscriptions complete (outstanding_subs empty):
416 /// - Moves repos from pending to confirmed in RelayState
417 /// - Moves root_events from pending to confirmed
418 /// - Removes the batch from pending_sync_index
419 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
420 // 1. Find and update the pending batch
421 let mut pending = self.pending_sync_index.write().await;
422
423 let Some(batches) = pending.get_mut(relay_url) else {
424 tracing::warn!(
425 relay = %relay_url,
426 sub_id = %sub_id,
427 "EOSE received for unknown relay"
428 );
429 return;
430 };
431
432 // Find the batch containing this subscription
433 let batch_index = batches.iter().position(|b| b.outstanding_subs.contains(&sub_id));
434
435 let Some(batch_idx) = batch_index else {
436 tracing::warn!(
437 relay = %relay_url,
438 sub_id = %sub_id,
439 "EOSE received for unknown subscription"
440 );
441 return;
442 };
443
444 // Remove the subscription from outstanding_subs
445 let batch = &mut batches[batch_idx];
446 batch.outstanding_subs.remove(&sub_id);
447
448 tracing::debug!(
449 relay = %relay_url,
450 sub_id = %sub_id,
451 batch_id = batch.batch_id,
452 remaining_subs = batch.outstanding_subs.len(),
453 "EOSE processed for subscription"
454 );
455
456 // Check if batch is complete
457 if !batch.outstanding_subs.is_empty() {
458 return;
459 }
460
461 // 2. Batch complete - extract items and remove batch
462 let completed_batch = batches.remove(batch_idx);
463 let batch_id = completed_batch.batch_id;
464 let repos_count = completed_batch.items.repos.len();
465 let events_count = completed_batch.items.root_events.len();
466
467 // Clean up empty relay entry
468 if batches.is_empty() {
469 pending.remove(relay_url);
470 }
471
472 // Drop the pending lock before acquiring relay_sync_index lock
473 drop(pending);
474
475 // 3. Move items to confirmed state in RelayState
476 {
477 let mut relay_index = self.relay_sync_index.write().await;
478
479 if let Some(state) = relay_index.get_mut(relay_url) {
480 // Move repos to confirmed
481 state.repos.extend(completed_batch.items.repos);
482 // Move root_events to confirmed
483 state.root_events.extend(completed_batch.items.root_events);
484
485 tracing::info!(
486 relay = %relay_url,
487 batch_id = batch_id,
488 repos_confirmed = repos_count,
489 root_events_confirmed = events_count,
490 total_repos = state.repos.len(),
491 total_root_events = state.root_events.len(),
492 "Batch confirmed - items moved from pending to confirmed"
493 );
494 } else {
495 tracing::warn!(
496 relay = %relay_url,
497 batch_id = batch_id,
498 "Batch completed but no RelayState found for relay"
499 );
500 }
501 }
502 }
503
392 /// Run the sync manager 504 /// Run the sync manager
393 /// 505 ///
394 /// Coordinates all sync components: 506 /// Coordinates all sync components:
@@ -411,7 +523,10 @@ impl SyncManager {
411 // 2. Create disconnect channel for spawned tasks -> manager communication 523 // 2. Create disconnect channel for spawned tasks -> manager communication
412 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); 524 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100);
413 525
414 // 3. Spawn self-subscriber 526 // 3. Create EOSE channel for spawned tasks -> manager communication
527 let (eose_tx, mut eose_rx) = mpsc::channel::<EoseNotification>(100);
528
529 // 4. Spawn self-subscriber
415 let self_subscriber = SelfSubscriber::new( 530 let self_subscriber = SelfSubscriber::new(
416 format!("ws://{}", self.service_domain), 531 format!("ws://{}", self.service_domain),
417 self.service_domain.clone(), 532 self.service_domain.clone(),
@@ -420,13 +535,17 @@ impl SyncManager {
420 ); 535 );
421 tokio::spawn(async move { self_subscriber.run().await }); 536 tokio::spawn(async move { self_subscriber.run().await });
422 537
423 // 4. Connect to bootstrap relay if configured 538 // 5. Connect to bootstrap relay if configured
424 if let Some(ref bootstrap_url) = self.bootstrap_relay_url { 539 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
425 self.spawn_relay_connection(bootstrap_url.clone(), disconnect_tx.clone()) 540 self.spawn_relay_connection(
426 .await; 541 bootstrap_url.clone(),
542 disconnect_tx.clone(),
543 eose_tx.clone(),
544 )
545 .await;
427 } 546 }
428 547
429 // 5. Main loop - handle actions from self-subscriber and disconnect notifications 548 // 6. Main loop - handle actions from self-subscriber, disconnect, and EOSE notifications
430 loop { 549 loop {
431 tokio::select! { 550 tokio::select! {
432 action = action_rx.recv() => { 551 action = action_rx.recv() => {
@@ -439,7 +558,12 @@ impl SyncManager {
439 558
440 if !exists { 559 if !exists {
441 tracing::info!(relay = %relay_url, "Spawning new relay connection"); 560 tracing::info!(relay = %relay_url, "Spawning new relay connection");
442 self.spawn_relay_with_layer2(relay_url, repos, disconnect_tx.clone()).await; 561 self.spawn_relay_with_layer2(
562 relay_url,
563 repos,
564 disconnect_tx.clone(),
565 eose_tx.clone(),
566 ).await;
443 } else { 567 } else {
444 tracing::debug!( 568 tracing::debug!(
445 relay = %relay_url, 569 relay = %relay_url,
@@ -472,6 +596,17 @@ impl SyncManager {
472 } 596 }
473 } 597 }
474 } 598 }
599 eose = eose_rx.recv() => {
600 match eose {
601 Some(notification) => {
602 self.handle_eose(&notification.relay_url, notification.sub_id).await;
603 }
604 None => {
605 // All EOSE senders dropped - unlikely but handle gracefully
606 tracing::debug!("EOSE channel closed");
607 }
608 }
609 }
475 } 610 }
476 } 611 }
477 } 612 }
@@ -544,6 +679,7 @@ impl SyncManager {
544 relay_url: String, 679 relay_url: String,
545 repos: HashMap<String, HashSet<EventId>>, 680 repos: HashMap<String, HashSet<EventId>>,
546 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, 681 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>,
682 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>,
547 ) { 683 ) {
548 use crate::sync::filters::build_layer2_and_layer3_filters; 684 use crate::sync::filters::build_layer2_and_layer3_filters;
549 use tokio::sync::mpsc; 685 use tokio::sync::mpsc;
@@ -620,8 +756,19 @@ impl SyncManager {
620 ) 756 )
621 .await; 757 .await;
622 } 758 }
623 RelayEvent::EndOfStoredEvents(_) => { 759 RelayEvent::EndOfStoredEvents(sub_id) => {
624 tracing::debug!(relay = %relay_url_clone, "EOSE received"); 760 tracing::debug!(
761 relay = %relay_url_clone,
762 sub_id = %sub_id,
763 "EOSE received, notifying SyncManager"
764 );
765 // Notify SyncManager of EOSE
766 let _ = eose_tx
767 .send(EoseNotification {
768 relay_url: relay_url_clone.clone(),
769 sub_id,
770 })
771 .await;
625 } 772 }
626 RelayEvent::Closed(reason) => { 773 RelayEvent::Closed(reason) => {
627 tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed"); 774 tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed");
@@ -653,6 +800,7 @@ impl SyncManager {
653 &self, 800 &self,
654 relay_url: String, 801 relay_url: String,
655 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, 802 disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>,
803 eose_tx: tokio::sync::mpsc::Sender<EoseNotification>,
656 ) { 804 ) {
657 use tokio::sync::mpsc; 805 use tokio::sync::mpsc;
658 806
@@ -707,8 +855,19 @@ impl SyncManager {
707 ) 855 )
708 .await; 856 .await;
709 } 857 }
710 RelayEvent::EndOfStoredEvents(_) => { 858 RelayEvent::EndOfStoredEvents(sub_id) => {
711 tracing::debug!("EOSE from {}", relay_url_clone); 859 tracing::debug!(
860 relay = %relay_url_clone,
861 sub_id = %sub_id,
862 "EOSE received, notifying SyncManager"
863 );
864 // Notify SyncManager of EOSE
865 let _ = eose_tx
866 .send(EoseNotification {
867 relay_url: relay_url_clone.clone(),
868 sub_id,
869 })
870 .await;
712 } 871 }
713 RelayEvent::Closed(reason) => { 872 RelayEvent::Closed(reason) => {
714 tracing::info!( 873 tracing::info!(