diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:44:05 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:44:05 +0000 |
| commit | 3b65f541b4a3891824c61148d159c1b311e086e8 (patch) | |
| tree | 1224f8d1ace1c6c9e67b6796b478690291a2519e /src/sync/mod.rs | |
| parent | 4f171fe53f24b54718a717a77b447175177e29a5 (diff) | |
sync: implement PendingBatch EOSE confirmation flow
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 179 |
1 files changed, 169 insertions, 10 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6e50eba..449e4ec 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -327,6 +327,15 @@ pub struct DisconnectNotification { | |||
| 327 | pub relay_url: String, | 327 | pub relay_url: String, |
| 328 | } | 328 | } |
| 329 | 329 | ||
| 330 | /// Notification from spawned tasks about EOSE (End Of Stored Events) | ||
| 331 | #[derive(Debug)] | ||
| 332 | pub struct EoseNotification { | ||
| 333 | /// The relay URL that sent EOSE | ||
| 334 | pub relay_url: String, | ||
| 335 | /// The subscription ID that completed | ||
| 336 | pub sub_id: SubscriptionId, | ||
| 337 | } | ||
| 338 | |||
| 330 | /// Manages proactive synchronization with external relays | 339 | /// Manages proactive synchronization with external relays |
| 331 | /// | 340 | /// |
| 332 | /// The SyncManager runs as a background task, subscribing to repository | 341 | /// The SyncManager runs as a background task, subscribing to repository |
| @@ -389,6 +398,109 @@ impl SyncManager { | |||
| 389 | } | 398 | } |
| 390 | } | 399 | } |
| 391 | 400 | ||
| 401 | /// Generate a unique batch ID | ||
| 402 | /// | ||
| 403 | /// Increments the internal counter and returns the new value. | ||
| 404 | /// Used for tracking pending batches and debugging/logging. | ||
| 405 | fn next_batch_id(&mut self) -> u64 { | ||
| 406 | self.next_batch_id += 1; | ||
| 407 | self.next_batch_id | ||
| 408 | } | ||
| 409 | |||
| 410 | /// Handle EOSE (End Of Stored Events) for a subscription | ||
| 411 | /// | ||
| 412 | /// This method: | ||
| 413 | /// - Finds the PendingBatch containing this subscription ID | ||
| 414 | /// - Removes the subscription from outstanding_subs | ||
| 415 | /// - When all subscriptions complete (outstanding_subs empty): | ||
| 416 | /// - Moves repos from pending to confirmed in RelayState | ||
| 417 | /// - Moves root_events from pending to confirmed | ||
| 418 | /// - Removes the batch from pending_sync_index | ||
| 419 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { | ||
| 420 | // 1. Find and update the pending batch | ||
| 421 | let mut pending = self.pending_sync_index.write().await; | ||
| 422 | |||
| 423 | let Some(batches) = pending.get_mut(relay_url) else { | ||
| 424 | tracing::warn!( | ||
| 425 | relay = %relay_url, | ||
| 426 | sub_id = %sub_id, | ||
| 427 | "EOSE received for unknown relay" | ||
| 428 | ); | ||
| 429 | return; | ||
| 430 | }; | ||
| 431 | |||
| 432 | // Find the batch containing this subscription | ||
| 433 | let batch_index = batches.iter().position(|b| b.outstanding_subs.contains(&sub_id)); | ||
| 434 | |||
| 435 | let Some(batch_idx) = batch_index else { | ||
| 436 | tracing::warn!( | ||
| 437 | relay = %relay_url, | ||
| 438 | sub_id = %sub_id, | ||
| 439 | "EOSE received for unknown subscription" | ||
| 440 | ); | ||
| 441 | return; | ||
| 442 | }; | ||
| 443 | |||
| 444 | // Remove the subscription from outstanding_subs | ||
| 445 | let batch = &mut batches[batch_idx]; | ||
| 446 | batch.outstanding_subs.remove(&sub_id); | ||
| 447 | |||
| 448 | tracing::debug!( | ||
| 449 | relay = %relay_url, | ||
| 450 | sub_id = %sub_id, | ||
| 451 | batch_id = batch.batch_id, | ||
| 452 | remaining_subs = batch.outstanding_subs.len(), | ||
| 453 | "EOSE processed for subscription" | ||
| 454 | ); | ||
| 455 | |||
| 456 | // Check if batch is complete | ||
| 457 | if !batch.outstanding_subs.is_empty() { | ||
| 458 | return; | ||
| 459 | } | ||
| 460 | |||
| 461 | // 2. Batch complete - extract items and remove batch | ||
| 462 | let completed_batch = batches.remove(batch_idx); | ||
| 463 | let batch_id = completed_batch.batch_id; | ||
| 464 | let repos_count = completed_batch.items.repos.len(); | ||
| 465 | let events_count = completed_batch.items.root_events.len(); | ||
| 466 | |||
| 467 | // Clean up empty relay entry | ||
| 468 | if batches.is_empty() { | ||
| 469 | pending.remove(relay_url); | ||
| 470 | } | ||
| 471 | |||
| 472 | // Drop the pending lock before acquiring relay_sync_index lock | ||
| 473 | drop(pending); | ||
| 474 | |||
| 475 | // 3. Move items to confirmed state in RelayState | ||
| 476 | { | ||
| 477 | let mut relay_index = self.relay_sync_index.write().await; | ||
| 478 | |||
| 479 | if let Some(state) = relay_index.get_mut(relay_url) { | ||
| 480 | // Move repos to confirmed | ||
| 481 | state.repos.extend(completed_batch.items.repos); | ||
| 482 | // Move root_events to confirmed | ||
| 483 | state.root_events.extend(completed_batch.items.root_events); | ||
| 484 | |||
| 485 | tracing::info!( | ||
| 486 | relay = %relay_url, | ||
| 487 | batch_id = batch_id, | ||
| 488 | repos_confirmed = repos_count, | ||
| 489 | root_events_confirmed = events_count, | ||
| 490 | total_repos = state.repos.len(), | ||
| 491 | total_root_events = state.root_events.len(), | ||
| 492 | "Batch confirmed - items moved from pending to confirmed" | ||
| 493 | ); | ||
| 494 | } else { | ||
| 495 | tracing::warn!( | ||
| 496 | relay = %relay_url, | ||
| 497 | batch_id = batch_id, | ||
| 498 | "Batch completed but no RelayState found for relay" | ||
| 499 | ); | ||
| 500 | } | ||
| 501 | } | ||
| 502 | } | ||
| 503 | |||
| 392 | /// Run the sync manager | 504 | /// Run the sync manager |
| 393 | /// | 505 | /// |
| 394 | /// Coordinates all sync components: | 506 | /// Coordinates all sync components: |
| @@ -411,7 +523,10 @@ impl SyncManager { | |||
| 411 | // 2. Create disconnect channel for spawned tasks -> manager communication | 523 | // 2. Create disconnect channel for spawned tasks -> manager communication |
| 412 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); | 524 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); |
| 413 | 525 | ||
| 414 | // 3. Spawn self-subscriber | 526 | // 3. Create EOSE channel for spawned tasks -> manager communication |
| 527 | let (eose_tx, mut eose_rx) = mpsc::channel::<EoseNotification>(100); | ||
| 528 | |||
| 529 | // 4. Spawn self-subscriber | ||
| 415 | let self_subscriber = SelfSubscriber::new( | 530 | let self_subscriber = SelfSubscriber::new( |
| 416 | format!("ws://{}", self.service_domain), | 531 | format!("ws://{}", self.service_domain), |
| 417 | self.service_domain.clone(), | 532 | self.service_domain.clone(), |
| @@ -420,13 +535,17 @@ impl SyncManager { | |||
| 420 | ); | 535 | ); |
| 421 | tokio::spawn(async move { self_subscriber.run().await }); | 536 | tokio::spawn(async move { self_subscriber.run().await }); |
| 422 | 537 | ||
| 423 | // 4. Connect to bootstrap relay if configured | 538 | // 5. Connect to bootstrap relay if configured |
| 424 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | 539 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { |
| 425 | self.spawn_relay_connection(bootstrap_url.clone(), disconnect_tx.clone()) | 540 | self.spawn_relay_connection( |
| 426 | .await; | 541 | bootstrap_url.clone(), |
| 542 | disconnect_tx.clone(), | ||
| 543 | eose_tx.clone(), | ||
| 544 | ) | ||
| 545 | .await; | ||
| 427 | } | 546 | } |
| 428 | 547 | ||
| 429 | // 5. Main loop - handle actions from self-subscriber and disconnect notifications | 548 | // 6. Main loop - handle actions from self-subscriber, disconnect, and EOSE notifications |
| 430 | loop { | 549 | loop { |
| 431 | tokio::select! { | 550 | tokio::select! { |
| 432 | action = action_rx.recv() => { | 551 | action = action_rx.recv() => { |
| @@ -439,7 +558,12 @@ impl SyncManager { | |||
| 439 | 558 | ||
| 440 | if !exists { | 559 | if !exists { |
| 441 | tracing::info!(relay = %relay_url, "Spawning new relay connection"); | 560 | tracing::info!(relay = %relay_url, "Spawning new relay connection"); |
| 442 | self.spawn_relay_with_layer2(relay_url, repos, disconnect_tx.clone()).await; | 561 | self.spawn_relay_with_layer2( |
| 562 | relay_url, | ||
| 563 | repos, | ||
| 564 | disconnect_tx.clone(), | ||
| 565 | eose_tx.clone(), | ||
| 566 | ).await; | ||
| 443 | } else { | 567 | } else { |
| 444 | tracing::debug!( | 568 | tracing::debug!( |
| 445 | relay = %relay_url, | 569 | relay = %relay_url, |
| @@ -472,6 +596,17 @@ impl SyncManager { | |||
| 472 | } | 596 | } |
| 473 | } | 597 | } |
| 474 | } | 598 | } |
| 599 | eose = eose_rx.recv() => { | ||
| 600 | match eose { | ||
| 601 | Some(notification) => { | ||
| 602 | self.handle_eose(¬ification.relay_url, notification.sub_id).await; | ||
| 603 | } | ||
| 604 | None => { | ||
| 605 | // All EOSE senders dropped - unlikely but handle gracefully | ||
| 606 | tracing::debug!("EOSE channel closed"); | ||
| 607 | } | ||
| 608 | } | ||
| 609 | } | ||
| 475 | } | 610 | } |
| 476 | } | 611 | } |
| 477 | } | 612 | } |
| @@ -544,6 +679,7 @@ impl SyncManager { | |||
| 544 | relay_url: String, | 679 | relay_url: String, |
| 545 | repos: HashMap<String, HashSet<EventId>>, | 680 | repos: HashMap<String, HashSet<EventId>>, |
| 546 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, | 681 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, |
| 682 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, | ||
| 547 | ) { | 683 | ) { |
| 548 | use crate::sync::filters::build_layer2_and_layer3_filters; | 684 | use crate::sync::filters::build_layer2_and_layer3_filters; |
| 549 | use tokio::sync::mpsc; | 685 | use tokio::sync::mpsc; |
| @@ -620,8 +756,19 @@ impl SyncManager { | |||
| 620 | ) | 756 | ) |
| 621 | .await; | 757 | .await; |
| 622 | } | 758 | } |
| 623 | RelayEvent::EndOfStoredEvents(_) => { | 759 | RelayEvent::EndOfStoredEvents(sub_id) => { |
| 624 | tracing::debug!(relay = %relay_url_clone, "EOSE received"); | 760 | tracing::debug!( |
| 761 | relay = %relay_url_clone, | ||
| 762 | sub_id = %sub_id, | ||
| 763 | "EOSE received, notifying SyncManager" | ||
| 764 | ); | ||
| 765 | // Notify SyncManager of EOSE | ||
| 766 | let _ = eose_tx | ||
| 767 | .send(EoseNotification { | ||
| 768 | relay_url: relay_url_clone.clone(), | ||
| 769 | sub_id, | ||
| 770 | }) | ||
| 771 | .await; | ||
| 625 | } | 772 | } |
| 626 | RelayEvent::Closed(reason) => { | 773 | RelayEvent::Closed(reason) => { |
| 627 | tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed"); | 774 | tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed"); |
| @@ -653,6 +800,7 @@ impl SyncManager { | |||
| 653 | &self, | 800 | &self, |
| 654 | relay_url: String, | 801 | relay_url: String, |
| 655 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, | 802 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, |
| 803 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, | ||
| 656 | ) { | 804 | ) { |
| 657 | use tokio::sync::mpsc; | 805 | use tokio::sync::mpsc; |
| 658 | 806 | ||
| @@ -707,8 +855,19 @@ impl SyncManager { | |||
| 707 | ) | 855 | ) |
| 708 | .await; | 856 | .await; |
| 709 | } | 857 | } |
| 710 | RelayEvent::EndOfStoredEvents(_) => { | 858 | RelayEvent::EndOfStoredEvents(sub_id) => { |
| 711 | tracing::debug!("EOSE from {}", relay_url_clone); | 859 | tracing::debug!( |
| 860 | relay = %relay_url_clone, | ||
| 861 | sub_id = %sub_id, | ||
| 862 | "EOSE received, notifying SyncManager" | ||
| 863 | ); | ||
| 864 | // Notify SyncManager of EOSE | ||
| 865 | let _ = eose_tx | ||
| 866 | .send(EoseNotification { | ||
| 867 | relay_url: relay_url_clone.clone(), | ||
| 868 | sub_id, | ||
| 869 | }) | ||
| 870 | .await; | ||
| 712 | } | 871 | } |
| 713 | RelayEvent::Closed(reason) => { | 872 | RelayEvent::Closed(reason) => { |
| 714 | tracing::info!( | 873 | tracing::info!( |