diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:49:29 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:49:29 +0000 |
| commit | 4d21292f7fae6cf50608af0e15051ba2369472c7 (patch) | |
| tree | a27be4fc33cc5a5801abbad570cd16d17871e396 /src/sync/mod.rs | |
| parent | 3b65f541b4a3891824c61148d159c1b311e086e8 (diff) | |
sync: implement unified connect/reconnect with since filters
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 265 |
1 files changed, 262 insertions, 3 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 449e4ec..8e0b8e1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -336,6 +336,16 @@ pub struct EoseNotification { | |||
| 336 | pub sub_id: SubscriptionId, | 336 | pub sub_id: SubscriptionId, |
| 337 | } | 337 | } |
| 338 | 338 | ||
| 339 | /// Notification from spawned tasks about successful connection | ||
| 340 | #[derive(Debug)] | ||
| 341 | pub struct ConnectNotification { | ||
| 342 | /// The relay URL that connected | ||
| 343 | pub relay_url: String, | ||
| 344 | } | ||
| 345 | |||
| 346 | /// Quick reconnect window in seconds (15 minutes) | ||
| 347 | const QUICK_RECONNECT_WINDOW_SECS: u64 = 15 * 60; | ||
| 348 | |||
| 339 | /// Manages proactive synchronization with external relays | 349 | /// Manages proactive synchronization with external relays |
| 340 | /// | 350 | /// |
| 341 | /// The SyncManager runs as a background task, subscribing to repository | 351 | /// The SyncManager runs as a background task, subscribing to repository |
| @@ -526,7 +536,10 @@ impl SyncManager { | |||
| 526 | // 3. Create EOSE channel for spawned tasks -> manager communication | 536 | // 3. Create EOSE channel for spawned tasks -> manager communication |
| 527 | let (eose_tx, mut eose_rx) = mpsc::channel::<EoseNotification>(100); | 537 | let (eose_tx, mut eose_rx) = mpsc::channel::<EoseNotification>(100); |
| 528 | 538 | ||
| 529 | // 4. Spawn self-subscriber | 539 | // 4. Create connect channel for spawned tasks -> manager communication |
| 540 | let (connect_tx, mut connect_rx) = mpsc::channel::<ConnectNotification>(100); | ||
| 541 | |||
| 542 | // 5. Spawn self-subscriber | ||
| 530 | let self_subscriber = SelfSubscriber::new( | 543 | let self_subscriber = SelfSubscriber::new( |
| 531 | format!("ws://{}", self.service_domain), | 544 | format!("ws://{}", self.service_domain), |
| 532 | self.service_domain.clone(), | 545 | self.service_domain.clone(), |
| @@ -535,17 +548,18 @@ impl SyncManager { | |||
| 535 | ); | 548 | ); |
| 536 | tokio::spawn(async move { self_subscriber.run().await }); | 549 | tokio::spawn(async move { self_subscriber.run().await }); |
| 537 | 550 | ||
| 538 | // 5. Connect to bootstrap relay if configured | 551 | // 6. Connect to bootstrap relay if configured |
| 539 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | 552 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { |
| 540 | self.spawn_relay_connection( | 553 | self.spawn_relay_connection( |
| 541 | bootstrap_url.clone(), | 554 | bootstrap_url.clone(), |
| 542 | disconnect_tx.clone(), | 555 | disconnect_tx.clone(), |
| 543 | eose_tx.clone(), | 556 | eose_tx.clone(), |
| 557 | connect_tx.clone(), | ||
| 544 | ) | 558 | ) |
| 545 | .await; | 559 | .await; |
| 546 | } | 560 | } |
| 547 | 561 | ||
| 548 | // 6. Main loop - handle actions from self-subscriber, disconnect, and EOSE notifications | 562 | // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| 549 | loop { | 563 | loop { |
| 550 | tokio::select! { | 564 | tokio::select! { |
| 551 | action = action_rx.recv() => { | 565 | action = action_rx.recv() => { |
| @@ -563,6 +577,7 @@ impl SyncManager { | |||
| 563 | repos, | 577 | repos, |
| 564 | disconnect_tx.clone(), | 578 | disconnect_tx.clone(), |
| 565 | eose_tx.clone(), | 579 | eose_tx.clone(), |
| 580 | connect_tx.clone(), | ||
| 566 | ).await; | 581 | ).await; |
| 567 | } else { | 582 | } else { |
| 568 | tracing::debug!( | 583 | tracing::debug!( |
| @@ -607,7 +622,235 @@ impl SyncManager { | |||
| 607 | } | 622 | } |
| 608 | } | 623 | } |
| 609 | } | 624 | } |
| 625 | connect = connect_rx.recv() => { | ||
| 626 | match connect { | ||
| 627 | Some(notification) => { | ||
| 628 | self.handle_connect_or_reconnect(¬ification.relay_url).await; | ||
| 629 | } | ||
| 630 | None => { | ||
| 631 | // All connect senders dropped - unlikely but handle gracefully | ||
| 632 | tracing::debug!("Connect channel closed"); | ||
| 633 | } | ||
| 634 | } | ||
| 635 | } | ||
| 636 | } | ||
| 637 | } | ||
| 638 | } | ||
| 639 | |||
| 640 | /// Handle a connection success (called when a relay connects or reconnects) | ||
| 641 | /// | ||
| 642 | /// This method implements smart reconnection logic: | ||
| 643 | /// - Fresh sync if never connected or >15 min since last connection | ||
| 644 | /// - Quick reconnect with since filter if <15 min since last connection | ||
| 645 | /// | ||
| 646 | /// For fresh sync: | ||
| 647 | /// - Clears any stale state | ||
| 648 | /// - Subscribes to Layer 1 without since filter | ||
| 649 | /// - Recomputes actions for new items | ||
| 650 | /// | ||
| 651 | /// For quick reconnect: | ||
| 652 | /// - Preserves existing state | ||
| 653 | /// - Subscribes to Layer 1 with since filter | ||
| 654 | /// - Rebuilds Layer 2 and Layer 3 with since filter | ||
| 655 | /// - Recomputes actions for new items | ||
| 656 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | ||
| 657 | let now = Timestamp::now(); | ||
| 658 | |||
| 659 | // Get the relay state to determine reconnect type | ||
| 660 | let (is_fresh_sync, last_connected, is_bootstrap) = { | ||
| 661 | let index = self.relay_sync_index.read().await; | ||
| 662 | if let Some(state) = index.get(relay_url) { | ||
| 663 | let last_conn = state.last_connected; | ||
| 664 | let is_fresh = match last_conn { | ||
| 665 | None => true, // Never connected before | ||
| 666 | Some(last) => { | ||
| 667 | let elapsed = now.as_secs().saturating_sub(last.as_secs()); | ||
| 668 | elapsed > QUICK_RECONNECT_WINDOW_SECS // Stale if > 15 min | ||
| 669 | } | ||
| 670 | }; | ||
| 671 | (is_fresh, last_conn, state.is_bootstrap) | ||
| 672 | } else { | ||
| 673 | (true, None, false) // No state found, treat as fresh | ||
| 674 | } | ||
| 675 | }; | ||
| 676 | |||
| 677 | // If stale reconnect, clear state | ||
| 678 | if is_fresh_sync && last_connected.is_some() { | ||
| 679 | let mut index = self.relay_sync_index.write().await; | ||
| 680 | if let Some(state) = index.get_mut(relay_url) { | ||
| 681 | state.clear_sync_state(); | ||
| 682 | tracing::info!( | ||
| 683 | relay = %relay_url, | ||
| 684 | "Cleared stale sync state (was disconnected > 15 min)" | ||
| 685 | ); | ||
| 686 | } | ||
| 687 | } | ||
| 688 | |||
| 689 | // Update connection state | ||
| 690 | { | ||
| 691 | let mut index = self.relay_sync_index.write().await; | ||
| 692 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 693 | state.connection_status = ConnectionStatus::Connected; | ||
| 694 | state.last_connected = Some(now); | ||
| 695 | state.disconnected_at = None; | ||
| 696 | } | ||
| 697 | |||
| 698 | // Record success in health tracker | ||
| 699 | self.health_tracker.record_success(relay_url); | ||
| 700 | |||
| 701 | // Subscribe based on reconnect type | ||
| 702 | if is_fresh_sync { | ||
| 703 | tracing::info!( | ||
| 704 | relay = %relay_url, | ||
| 705 | is_bootstrap = is_bootstrap, | ||
| 706 | "Fresh sync - subscribing to Layer 1 without since filter" | ||
| 707 | ); | ||
| 708 | // Fresh sync: Layer 1 without since | ||
| 709 | // Layer 1 subscription is handled by the connection establishment | ||
| 710 | // Just recompute actions for new items | ||
| 711 | self.recompute_actions_for_relay(relay_url).await; | ||
| 712 | } else { | ||
| 713 | // Quick reconnect: use since filter | ||
| 714 | let since_ts = Timestamp::from( | ||
| 715 | last_connected | ||
| 716 | .unwrap() | ||
| 717 | .as_secs() | ||
| 718 | .saturating_sub(QUICK_RECONNECT_WINDOW_SECS), | ||
| 719 | ); | ||
| 720 | |||
| 721 | tracing::info!( | ||
| 722 | relay = %relay_url, | ||
| 723 | since = %since_ts, | ||
| 724 | "Quick reconnect - using since filter for incremental sync" | ||
| 725 | ); | ||
| 726 | |||
| 727 | // Rebuild Layer 2 and Layer 3 with since filter | ||
| 728 | self.rebuild_layer2_and_layer3(relay_url, Some(since_ts)) | ||
| 729 | .await; | ||
| 730 | |||
| 731 | // Recompute actions for any new items discovered while disconnected | ||
| 732 | self.recompute_actions_for_relay(relay_url).await; | ||
| 733 | } | ||
| 734 | } | ||
| 735 | |||
| 736 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay | ||
| 737 | /// | ||
| 738 | /// Uses the confirmed repos and root_events from RelayState to build filters. | ||
| 739 | /// If since is provided, applies it to all filters for incremental sync. | ||
| 740 | async fn rebuild_layer2_and_layer3(&self, relay_url: &str, since: Option<Timestamp>) { | ||
| 741 | use crate::sync::filters::build_layer2_and_layer3_filters; | ||
| 742 | |||
| 743 | // Get confirmed state from relay_sync_index | ||
| 744 | let (repos, root_events) = { | ||
| 745 | let index = self.relay_sync_index.read().await; | ||
| 746 | match index.get(relay_url) { | ||
| 747 | Some(state) => (state.repos.clone(), state.root_events.clone()), | ||
| 748 | None => { | ||
| 749 | tracing::warn!( | ||
| 750 | relay = %relay_url, | ||
| 751 | "No RelayState found for rebuild_layer2_and_layer3" | ||
| 752 | ); | ||
| 753 | return; | ||
| 754 | } | ||
| 755 | } | ||
| 756 | }; | ||
| 757 | |||
| 758 | // Nothing to rebuild if no confirmed items | ||
| 759 | if repos.is_empty() && root_events.is_empty() { | ||
| 760 | tracing::debug!( | ||
| 761 | relay = %relay_url, | ||
| 762 | "No confirmed items to rebuild Layer 2/3 for" | ||
| 763 | ); | ||
| 764 | return; | ||
| 765 | } | ||
| 766 | |||
| 767 | // Build Layer 2 and Layer 3 filters | ||
| 768 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); | ||
| 769 | |||
| 770 | tracing::debug!( | ||
| 771 | relay = %relay_url, | ||
| 772 | filter_count = filters.len(), | ||
| 773 | repos_count = repos.len(), | ||
| 774 | root_events_count = root_events.len(), | ||
| 775 | since = ?since, | ||
| 776 | "Rebuilding Layer 2/3 filters" | ||
| 777 | ); | ||
| 778 | |||
| 779 | // Subscribe to filters on the relay connection | ||
| 780 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 781 | for filter in filters { | ||
| 782 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 783 | tracing::error!( | ||
| 784 | relay = %relay_url, | ||
| 785 | error = %e, | ||
| 786 | "Failed to subscribe to Layer 2/3 filter during rebuild" | ||
| 787 | ); | ||
| 788 | } | ||
| 610 | } | 789 | } |
| 790 | } else { | ||
| 791 | tracing::warn!( | ||
| 792 | relay = %relay_url, | ||
| 793 | "No active connection found for Layer 2/3 rebuild" | ||
| 794 | ); | ||
| 795 | } | ||
| 796 | } | ||
| 797 | |||
| 798 | /// Recompute sync actions for a specific relay | ||
| 799 | /// | ||
| 800 | /// Uses derive_relay_targets and compute_actions to find new items | ||
| 801 | /// that need to be synced. For Phase 4, this just logs the actions; | ||
| 802 | /// full handling will be implemented in Phase 5. | ||
| 803 | async fn recompute_actions_for_relay(&self, relay_url: &str) { | ||
| 804 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | ||
| 805 | |||
| 806 | // Get current state from indexes | ||
| 807 | let repo_index = self.repo_sync_index.read().await; | ||
| 808 | let pending_index = self.pending_sync_index.read().await; | ||
| 809 | let relay_index = self.relay_sync_index.read().await; | ||
| 810 | |||
| 811 | // Derive per-relay targets from repo index | ||
| 812 | let all_targets = derive_relay_targets(&repo_index); | ||
| 813 | |||
| 814 | // Filter to only targets for this specific relay | ||
| 815 | let relay_target = all_targets.get(relay_url); | ||
| 816 | |||
| 817 | if relay_target.is_none() { | ||
| 818 | tracing::debug!( | ||
| 819 | relay = %relay_url, | ||
| 820 | "No sync targets found for relay" | ||
| 821 | ); | ||
| 822 | return; | ||
| 823 | } | ||
| 824 | |||
| 825 | // Build single-relay targets map for compute_actions | ||
| 826 | let mut single_relay_targets = std::collections::HashMap::new(); | ||
| 827 | if let Some(target) = relay_target { | ||
| 828 | single_relay_targets.insert(relay_url.to_string(), target.clone()); | ||
| 829 | } | ||
| 830 | |||
| 831 | // Compute actions for new items | ||
| 832 | let actions = compute_actions( | ||
| 833 | &single_relay_targets, | ||
| 834 | &pending_index, | ||
| 835 | &relay_index, | ||
| 836 | ); | ||
| 837 | |||
| 838 | // Log the actions (Phase 5 will process them) | ||
| 839 | for action in &actions { | ||
| 840 | tracing::info!( | ||
| 841 | relay = %action.relay_url, | ||
| 842 | new_repos = action.repos.len(), | ||
| 843 | new_root_events = action.root_events.len(), | ||
| 844 | filters = action.filters.len(), | ||
| 845 | "Discovered new items to sync (Phase 5 will process)" | ||
| 846 | ); | ||
| 847 | } | ||
| 848 | |||
| 849 | if actions.is_empty() { | ||
| 850 | tracing::debug!( | ||
| 851 | relay = %relay_url, | ||
| 852 | "No new items to sync for relay" | ||
| 853 | ); | ||
| 611 | } | 854 | } |
| 612 | } | 855 | } |
| 613 | 856 | ||
| @@ -680,6 +923,7 @@ impl SyncManager { | |||
| 680 | repos: HashMap<String, HashSet<EventId>>, | 923 | repos: HashMap<String, HashSet<EventId>>, |
| 681 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, | 924 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, |
| 682 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, | 925 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, |
| 926 | connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>, | ||
| 683 | ) { | 927 | ) { |
| 684 | use crate::sync::filters::build_layer2_and_layer3_filters; | 928 | use crate::sync::filters::build_layer2_and_layer3_filters; |
| 685 | use tokio::sync::mpsc; | 929 | use tokio::sync::mpsc; |
| @@ -713,6 +957,13 @@ impl SyncManager { | |||
| 713 | ); | 957 | ); |
| 714 | } | 958 | } |
| 715 | 959 | ||
| 960 | // Notify SyncManager of successful connection | ||
| 961 | let _ = connect_tx | ||
| 962 | .send(ConnectNotification { | ||
| 963 | relay_url: relay_url.clone(), | ||
| 964 | }) | ||
| 965 | .await; | ||
| 966 | |||
| 716 | // Subscribe to Layer 2+3 filters for the repos | 967 | // Subscribe to Layer 2+3 filters for the repos |
| 717 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | 968 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); |
| 718 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | 969 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); |
| @@ -801,6 +1052,7 @@ impl SyncManager { | |||
| 801 | relay_url: String, | 1052 | relay_url: String, |
| 802 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, | 1053 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, |
| 803 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, | 1054 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, |
| 1055 | connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>, | ||
| 804 | ) { | 1056 | ) { |
| 805 | use tokio::sync::mpsc; | 1057 | use tokio::sync::mpsc; |
| 806 | 1058 | ||
| @@ -833,6 +1085,13 @@ impl SyncManager { | |||
| 833 | ); | 1085 | ); |
| 834 | } | 1086 | } |
| 835 | 1087 | ||
| 1088 | // Notify SyncManager of successful connection | ||
| 1089 | let _ = connect_tx | ||
| 1090 | .send(ConnectNotification { | ||
| 1091 | relay_url: relay_url.clone(), | ||
| 1092 | }) | ||
| 1093 | .await; | ||
| 1094 | |||
| 836 | // Create event channel | 1095 | // Create event channel |
| 837 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | 1096 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); |
| 838 | 1097 | ||