diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/mod.rs | 493 |
1 files changed, 275 insertions, 218 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8e0b8e1..f704ca7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -375,6 +375,12 @@ pub struct SyncManager { | |||
| 375 | health_tracker: Arc<RelayHealthTracker>, | 375 | health_tracker: Arc<RelayHealthTracker>, |
| 376 | /// Counter for generating unique batch IDs | 376 | /// Counter for generating unique batch IDs |
| 377 | next_batch_id: u64, | 377 | next_batch_id: u64, |
| 378 | /// Channel for disconnect notifications (set during run) | ||
| 379 | disconnect_tx: Option<tokio::sync::mpsc::Sender<DisconnectNotification>>, | ||
| 380 | /// Channel for EOSE notifications (set during run) | ||
| 381 | eose_tx: Option<tokio::sync::mpsc::Sender<EoseNotification>>, | ||
| 382 | /// Channel for connect notifications (set during run) | ||
| 383 | connect_tx: Option<tokio::sync::mpsc::Sender<ConnectNotification>>, | ||
| 378 | } | 384 | } |
| 379 | 385 | ||
| 380 | impl SyncManager { | 386 | impl SyncManager { |
| @@ -405,6 +411,9 @@ impl SyncManager { | |||
| 405 | connections: HashMap::new(), | 411 | connections: HashMap::new(), |
| 406 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 412 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 407 | next_batch_id: 0, | 413 | next_batch_id: 0, |
| 414 | disconnect_tx: None, | ||
| 415 | eose_tx: None, | ||
| 416 | connect_tx: None, | ||
| 408 | } | 417 | } |
| 409 | } | 418 | } |
| 410 | 419 | ||
| @@ -548,15 +557,14 @@ impl SyncManager { | |||
| 548 | ); | 557 | ); |
| 549 | tokio::spawn(async move { self_subscriber.run().await }); | 558 | tokio::spawn(async move { self_subscriber.run().await }); |
| 550 | 559 | ||
| 560 | // 5b. Store channel senders for use by handlers | ||
| 561 | self.disconnect_tx = Some(disconnect_tx.clone()); | ||
| 562 | self.eose_tx = Some(eose_tx.clone()); | ||
| 563 | self.connect_tx = Some(connect_tx.clone()); | ||
| 564 | |||
| 551 | // 6. Connect to bootstrap relay if configured | 565 | // 6. Connect to bootstrap relay if configured |
| 552 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | 566 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { |
| 553 | self.spawn_relay_connection( | 567 | self.spawn_relay_connection(bootstrap_url.clone()).await; |
| 554 | bootstrap_url.clone(), | ||
| 555 | disconnect_tx.clone(), | ||
| 556 | eose_tx.clone(), | ||
| 557 | connect_tx.clone(), | ||
| 558 | ) | ||
| 559 | .await; | ||
| 560 | } | 568 | } |
| 561 | 569 | ||
| 562 | // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 570 | // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| @@ -565,37 +573,46 @@ impl SyncManager { | |||
| 565 | action = action_rx.recv() => { | 573 | action = action_rx.recv() => { |
| 566 | match action { | 574 | match action { |
| 567 | Some(RelayAction::SpawnRelay { relay_url, repos }) => { | 575 | Some(RelayAction::SpawnRelay { relay_url, repos }) => { |
| 568 | // Check if relay already exists | 576 | // Convert to AddFilters format and use unified handler |
| 569 | let relay_index = self.relay_sync_index.read().await; | 577 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); |
| 570 | let exists = relay_index.contains_key(&relay_url); | 578 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); |
| 571 | drop(relay_index); | 579 | |
| 572 | 580 | // Build filters for these repos | |
| 573 | if !exists { | 581 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( |
| 574 | tracing::info!(relay = %relay_url, "Spawning new relay connection"); | 582 | &repo_ids, |
| 575 | self.spawn_relay_with_layer2( | 583 | &root_events, |
| 576 | relay_url, | 584 | None, |
| 577 | repos, | 585 | ); |
| 578 | disconnect_tx.clone(), | 586 | |
| 579 | eose_tx.clone(), | 587 | let action = AddFilters { |
| 580 | connect_tx.clone(), | 588 | relay_url, |
| 581 | ).await; | 589 | repos: repo_ids, |
| 582 | } else { | 590 | root_events, |
| 583 | tracing::debug!( | 591 | filters, |
| 584 | relay = %relay_url, | 592 | }; |
| 585 | "Relay already exists, considering AddFilters" | 593 | |
| 586 | ); | 594 | self.handle_add_filters(action).await; |
| 587 | // For MVP, we don't handle AddFilters - just log | ||
| 588 | // Full implementation would call subscribe_filters on existing connection | ||
| 589 | } | ||
| 590 | } | 595 | } |
| 591 | Some(RelayAction::AddFilters { relay_url, repos }) => { | 596 | Some(RelayAction::AddFilters { relay_url, repos }) => { |
| 592 | tracing::debug!( | 597 | // Convert to AddFilters format and use unified handler |
| 593 | relay = %relay_url, | 598 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); |
| 594 | repo_count = repos.len(), | 599 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); |
| 595 | "AddFilters action (MVP: not implemented)" | 600 | |
| 601 | // Build filters for these repos | ||
| 602 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | ||
| 603 | &repo_ids, | ||
| 604 | &root_events, | ||
| 605 | None, | ||
| 596 | ); | 606 | ); |
| 597 | // For MVP, not implemented - full version would add Layer 2 filters | 607 | |
| 598 | // to existing relay connection | 608 | let action = AddFilters { |
| 609 | relay_url, | ||
| 610 | repos: repo_ids, | ||
| 611 | root_events, | ||
| 612 | filters, | ||
| 613 | }; | ||
| 614 | |||
| 615 | self.handle_add_filters(action).await; | ||
| 599 | } | 616 | } |
| 600 | None => break, | 617 | None => break, |
| 601 | } | 618 | } |
| @@ -637,6 +654,130 @@ impl SyncManager { | |||
| 637 | } | 654 | } |
| 638 | } | 655 | } |
| 639 | 656 | ||
| 657 | /// Handle AddFilters action - subscribe to filters on a relay | ||
| 658 | /// | ||
| 659 | /// This method handles all filter additions: | ||
| 660 | /// - For new relays: creates entry with Connecting status, spawns connection | ||
| 661 | /// - For existing connected relays: subscribes to filters, creates PendingBatch | ||
| 662 | /// - For disconnected/connecting relays: returns (will be handled on connection) | ||
| 663 | async fn handle_add_filters(&mut self, action: AddFilters) { | ||
| 664 | // Step 1: Check if relay exists in relay_sync_index | ||
| 665 | let connection_status = { | ||
| 666 | let index = self.relay_sync_index.read().await; | ||
| 667 | index.get(&action.relay_url).map(|s| s.connection_status) | ||
| 668 | }; | ||
| 669 | |||
| 670 | match connection_status { | ||
| 671 | None => { | ||
| 672 | // New relay - create entry with Connecting status | ||
| 673 | { | ||
| 674 | let mut index = self.relay_sync_index.write().await; | ||
| 675 | let new_state = RelayState { | ||
| 676 | connection_status: ConnectionStatus::Connecting, | ||
| 677 | is_bootstrap: false, // Only bootstrap relays set this to true | ||
| 678 | last_connected: None, | ||
| 679 | disconnected_at: None, | ||
| 680 | repos: HashSet::new(), | ||
| 681 | root_events: HashSet::new(), | ||
| 682 | }; | ||
| 683 | index.insert(action.relay_url.clone(), new_state); | ||
| 684 | } | ||
| 685 | |||
| 686 | tracing::info!( | ||
| 687 | relay = %action.relay_url, | ||
| 688 | repos = action.repos.len(), | ||
| 689 | "Spawning connection for new relay" | ||
| 690 | ); | ||
| 691 | |||
| 692 | // Spawn connection for new relay | ||
| 693 | self.spawn_relay_connection(action.relay_url.clone()).await; | ||
| 694 | // Connection will trigger handle_connect_or_reconnect which will process items | ||
| 695 | return; | ||
| 696 | } | ||
| 697 | Some(ConnectionStatus::Disconnected) | Some(ConnectionStatus::Connecting) => { | ||
| 698 | // Will be handled when connection succeeds | ||
| 699 | tracing::debug!( | ||
| 700 | relay = %action.relay_url, | ||
| 701 | status = ?connection_status, | ||
| 702 | "Relay not connected, action will be processed on connection" | ||
| 703 | ); | ||
| 704 | return; | ||
| 705 | } | ||
| 706 | Some(ConnectionStatus::Connected) => { | ||
| 707 | // Continue to subscribe | ||
| 708 | } | ||
| 709 | } | ||
| 710 | |||
| 711 | // Step 2: Check if consolidation is needed (Phase 6 will implement maybe_consolidate) | ||
| 712 | // self.maybe_consolidate(&action.relay_url, action.filters.len()); | ||
| 713 | |||
| 714 | // Step 3: Get connection and subscribe to all filters | ||
| 715 | let connection = match self.connections.get(&action.relay_url) { | ||
| 716 | Some(conn) => conn, | ||
| 717 | None => { | ||
| 718 | tracing::warn!( | ||
| 719 | relay = %action.relay_url, | ||
| 720 | "No connection for relay, cannot subscribe" | ||
| 721 | ); | ||
| 722 | return; | ||
| 723 | } | ||
| 724 | }; | ||
| 725 | |||
| 726 | // Subscribe to each filter and collect subscription IDs | ||
| 727 | let mut subscription_ids = Vec::new(); | ||
| 728 | for filter in &action.filters { | ||
| 729 | match connection.subscribe_filter(filter.clone()).await { | ||
| 730 | Ok(sub_id) => { | ||
| 731 | subscription_ids.push(sub_id); | ||
| 732 | } | ||
| 733 | Err(e) => { | ||
| 734 | tracing::error!( | ||
| 735 | relay = %action.relay_url, | ||
| 736 | error = %e, | ||
| 737 | "Failed to subscribe to filter" | ||
| 738 | ); | ||
| 739 | } | ||
| 740 | } | ||
| 741 | } | ||
| 742 | |||
| 743 | if subscription_ids.is_empty() && !action.filters.is_empty() { | ||
| 744 | tracing::warn!( | ||
| 745 | relay = %action.relay_url, | ||
| 746 | "All filter subscriptions failed, not creating batch" | ||
| 747 | ); | ||
| 748 | return; | ||
| 749 | } | ||
| 750 | |||
| 751 | // Step 4: Create PendingBatch | ||
| 752 | let batch_id = self.next_batch_id(); | ||
| 753 | let batch = PendingBatch { | ||
| 754 | batch_id, | ||
| 755 | items: PendingItems { | ||
| 756 | repos: action.repos.clone(), | ||
| 757 | root_events: action.root_events.clone(), | ||
| 758 | }, | ||
| 759 | outstanding_subs: subscription_ids.into_iter().collect(), | ||
| 760 | }; | ||
| 761 | |||
| 762 | // Step 5: Add to pending_sync_index | ||
| 763 | { | ||
| 764 | let mut pending = self.pending_sync_index.write().await; | ||
| 765 | pending | ||
| 766 | .entry(action.relay_url.clone()) | ||
| 767 | .or_insert_with(Vec::new) | ||
| 768 | .push(batch); | ||
| 769 | } | ||
| 770 | |||
| 771 | tracing::debug!( | ||
| 772 | relay = %action.relay_url, | ||
| 773 | batch_id = batch_id, | ||
| 774 | repos = action.repos.len(), | ||
| 775 | root_events = action.root_events.len(), | ||
| 776 | filters = action.filters.len(), | ||
| 777 | "Created pending batch for filter subscriptions" | ||
| 778 | ); | ||
| 779 | } | ||
| 780 | |||
| 640 | /// Handle a connection success (called when a relay connects or reconnects) | 781 | /// Handle a connection success (called when a relay connects or reconnects) |
| 641 | /// | 782 | /// |
| 642 | /// This method implements smart reconnection logic: | 783 | /// This method implements smart reconnection logic: |
| @@ -798,59 +939,57 @@ impl SyncManager { | |||
| 798 | /// Recompute sync actions for a specific relay | 939 | /// Recompute sync actions for a specific relay |
| 799 | /// | 940 | /// |
| 800 | /// Uses derive_relay_targets and compute_actions to find new items | 941 | /// Uses derive_relay_targets and compute_actions to find new items |
| 801 | /// that need to be synced. For Phase 4, this just logs the actions; | 942 | /// that need to be synced. Processes AddFilters actions for new items. |
| 802 | /// full handling will be implemented in Phase 5. | 943 | async fn recompute_actions_for_relay(&mut self, relay_url: &str) { |
| 803 | async fn recompute_actions_for_relay(&self, relay_url: &str) { | ||
| 804 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | 944 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; |
| 805 | 945 | ||
| 806 | // Get current state from indexes | 946 | // Get current state from indexes (need to collect to avoid holding locks) |
| 807 | let repo_index = self.repo_sync_index.read().await; | 947 | let all_targets = { |
| 808 | let pending_index = self.pending_sync_index.read().await; | 948 | let repo_index = self.repo_sync_index.read().await; |
| 809 | let relay_index = self.relay_sync_index.read().await; | 949 | derive_relay_targets(&repo_index) |
| 810 | 950 | }; | |
| 811 | // Derive per-relay targets from repo index | ||
| 812 | let all_targets = derive_relay_targets(&repo_index); | ||
| 813 | 951 | ||
| 814 | // Filter to only targets for this specific relay | 952 | // Filter to only targets for this specific relay |
| 815 | let relay_target = all_targets.get(relay_url); | 953 | let relay_target = match all_targets.get(relay_url) { |
| 954 | Some(target) => target.clone(), | ||
| 955 | None => { | ||
| 956 | tracing::debug!( | ||
| 957 | relay = %relay_url, | ||
| 958 | "No sync targets found for relay" | ||
| 959 | ); | ||
| 960 | return; | ||
| 961 | } | ||
| 962 | }; | ||
| 963 | |||
| 964 | // Build single-relay targets map for compute_actions | ||
| 965 | let mut single_relay_targets = std::collections::HashMap::new(); | ||
| 966 | single_relay_targets.insert(relay_url.to_string(), relay_target); | ||
| 967 | |||
| 968 | // Compute actions for new items | ||
| 969 | let actions = { | ||
| 970 | let pending_index = self.pending_sync_index.read().await; | ||
| 971 | let relay_index = self.relay_sync_index.read().await; | ||
| 972 | compute_actions(&single_relay_targets, &pending_index, &relay_index) | ||
| 973 | }; | ||
| 816 | 974 | ||
| 817 | if relay_target.is_none() { | 975 | if actions.is_empty() { |
| 818 | tracing::debug!( | 976 | tracing::debug!( |
| 819 | relay = %relay_url, | 977 | relay = %relay_url, |
| 820 | "No sync targets found for relay" | 978 | "No new items to sync for relay" |
| 821 | ); | 979 | ); |
| 822 | return; | 980 | return; |
| 823 | } | 981 | } |
| 824 | 982 | ||
| 825 | // Build single-relay targets map for compute_actions | 983 | // Process each action |
| 826 | let mut single_relay_targets = std::collections::HashMap::new(); | 984 | for action in actions { |
| 827 | if let Some(target) = relay_target { | ||
| 828 | single_relay_targets.insert(relay_url.to_string(), target.clone()); | ||
| 829 | } | ||
| 830 | |||
| 831 | // Compute actions for new items | ||
| 832 | let actions = compute_actions( | ||
| 833 | &single_relay_targets, | ||
| 834 | &pending_index, | ||
| 835 | &relay_index, | ||
| 836 | ); | ||
| 837 | |||
| 838 | // Log the actions (Phase 5 will process them) | ||
| 839 | for action in &actions { | ||
| 840 | tracing::info!( | 985 | tracing::info!( |
| 841 | relay = %action.relay_url, | 986 | relay = %action.relay_url, |
| 842 | new_repos = action.repos.len(), | 987 | new_repos = action.repos.len(), |
| 843 | new_root_events = action.root_events.len(), | 988 | new_root_events = action.root_events.len(), |
| 844 | filters = action.filters.len(), | 989 | filters = action.filters.len(), |
| 845 | "Discovered new items to sync (Phase 5 will process)" | 990 | "Processing AddFilters for new items" |
| 846 | ); | ||
| 847 | } | ||
| 848 | |||
| 849 | if actions.is_empty() { | ||
| 850 | tracing::debug!( | ||
| 851 | relay = %relay_url, | ||
| 852 | "No new items to sync for relay" | ||
| 853 | ); | 991 | ); |
| 992 | self.handle_add_filters(action).await; | ||
| 854 | } | 993 | } |
| 855 | } | 994 | } |
| 856 | 995 | ||
| @@ -912,179 +1051,81 @@ impl SyncManager { | |||
| 912 | ); | 1051 | ); |
| 913 | } | 1052 | } |
| 914 | 1053 | ||
| 915 | /// Spawn relay connection with Layer 2 filters for specific repos | 1054 | /// Spawn a relay connection and start its event loop |
| 916 | /// | 1055 | /// |
| 917 | /// Used when discovering relays from announcements. Connects to the relay, | 1056 | /// Creates a new RelayConnection, connects to Layer 1, stores the connection, |
| 918 | /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the | 1057 | /// and spawns event processing tasks. Uses stored channel senders for notifications. |
| 919 | /// specific repos we want to sync. | 1058 | async fn spawn_relay_connection(&mut self, relay_url: String) { |
| 920 | async fn spawn_relay_with_layer2( | ||
| 921 | &self, | ||
| 922 | relay_url: String, | ||
| 923 | repos: HashMap<String, HashSet<EventId>>, | ||
| 924 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, | ||
| 925 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, | ||
| 926 | connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>, | ||
| 927 | ) { | ||
| 928 | use crate::sync::filters::build_layer2_and_layer3_filters; | ||
| 929 | use tokio::sync::mpsc; | 1059 | use tokio::sync::mpsc; |
| 930 | 1060 | ||
| 931 | let database = Arc::clone(&self.database); | 1061 | // Get channel senders (must exist during run) |
| 932 | let write_policy = self.write_policy.clone(); | 1062 | let disconnect_tx = match &self.disconnect_tx { |
| 933 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | 1063 | Some(tx) => tx.clone(), |
| 934 | 1064 | None => { | |
| 935 | // Create relay connection | ||
| 936 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 937 | |||
| 938 | // Connect and subscribe to Layer 1 (announcements) | ||
| 939 | if let Err(e) = connection.connect_and_subscribe(None).await { | ||
| 940 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | ||
| 941 | return; | ||
| 942 | } | ||
| 943 | |||
| 944 | // Mark as connected in relay sync index | ||
| 945 | { | ||
| 946 | let mut index = relay_sync_index.write().await; | ||
| 947 | index.insert( | ||
| 948 | relay_url.clone(), | ||
| 949 | RelayState { | ||
| 950 | repos: repos.keys().cloned().collect(), | ||
| 951 | root_events: repos.values().flatten().cloned().collect(), | ||
| 952 | is_bootstrap: false, | ||
| 953 | connection_status: ConnectionStatus::Connected, | ||
| 954 | last_connected: Some(Timestamp::now()), | ||
| 955 | disconnected_at: None, | ||
| 956 | }, | ||
| 957 | ); | ||
| 958 | } | ||
| 959 | |||
| 960 | // Notify SyncManager of successful connection | ||
| 961 | let _ = connect_tx | ||
| 962 | .send(ConnectNotification { | ||
| 963 | relay_url: relay_url.clone(), | ||
| 964 | }) | ||
| 965 | .await; | ||
| 966 | |||
| 967 | // Subscribe to Layer 2+3 filters for the repos | ||
| 968 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | ||
| 969 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | ||
| 970 | let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None); | ||
| 971 | |||
| 972 | for filter in filters { | ||
| 973 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 974 | tracing::error!( | 1065 | tracing::error!( |
| 975 | relay = %relay_url, | 1066 | relay = %relay_url, |
| 976 | error = %e, | 1067 | "Cannot spawn connection - channels not initialized" |
| 977 | "Failed to subscribe to Layer 2 filter" | ||
| 978 | ); | 1068 | ); |
| 1069 | return; | ||
| 979 | } | 1070 | } |
| 980 | } | 1071 | }; |
| 981 | 1072 | let eose_tx = match &self.eose_tx { | |
| 982 | tracing::info!( | 1073 | Some(tx) => tx.clone(), |
| 983 | relay = %relay_url, | 1074 | None => { |
| 984 | repo_count = repos.len(), | 1075 | tracing::error!( |
| 985 | "Connected to discovered relay with Layer 2+3 filters" | 1076 | relay = %relay_url, |
| 986 | ); | 1077 | "Cannot spawn connection - channels not initialized" |
| 987 | 1078 | ); | |
| 988 | // Create event channel | 1079 | return; |
| 989 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | ||
| 990 | |||
| 991 | // Spawn event loop | ||
| 992 | tokio::spawn(async move { | ||
| 993 | connection.run_event_loop(event_tx).await; | ||
| 994 | }); | ||
| 995 | |||
| 996 | // Spawn event processor | ||
| 997 | let relay_url_clone = relay_url.clone(); | ||
| 998 | tokio::spawn(async move { | ||
| 999 | while let Some(relay_event) = event_rx.recv().await { | ||
| 1000 | match relay_event { | ||
| 1001 | RelayEvent::Event(event) => { | ||
| 1002 | Self::process_event_static( | ||
| 1003 | &event, | ||
| 1004 | &relay_url_clone, | ||
| 1005 | &database, | ||
| 1006 | &write_policy, | ||
| 1007 | ) | ||
| 1008 | .await; | ||
| 1009 | } | ||
| 1010 | RelayEvent::EndOfStoredEvents(sub_id) => { | ||
| 1011 | tracing::debug!( | ||
| 1012 | relay = %relay_url_clone, | ||
| 1013 | sub_id = %sub_id, | ||
| 1014 | "EOSE received, notifying SyncManager" | ||
| 1015 | ); | ||
| 1016 | // Notify SyncManager of EOSE | ||
| 1017 | let _ = eose_tx | ||
| 1018 | .send(EoseNotification { | ||
| 1019 | relay_url: relay_url_clone.clone(), | ||
| 1020 | sub_id, | ||
| 1021 | }) | ||
| 1022 | .await; | ||
| 1023 | } | ||
| 1024 | RelayEvent::Closed(reason) => { | ||
| 1025 | tracing::info!(relay = %relay_url_clone, reason = %reason, "Relay connection closed"); | ||
| 1026 | // Notify SyncManager of disconnect | ||
| 1027 | let _ = disconnect_tx | ||
| 1028 | .send(DisconnectNotification { | ||
| 1029 | relay_url: relay_url_clone.clone(), | ||
| 1030 | }) | ||
| 1031 | .await; | ||
| 1032 | break; | ||
| 1033 | } | ||
| 1034 | RelayEvent::Shutdown => { | ||
| 1035 | tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); | ||
| 1036 | // Notify SyncManager of disconnect | ||
| 1037 | let _ = disconnect_tx | ||
| 1038 | .send(DisconnectNotification { | ||
| 1039 | relay_url: relay_url_clone.clone(), | ||
| 1040 | }) | ||
| 1041 | .await; | ||
| 1042 | break; | ||
| 1043 | } | ||
| 1044 | } | ||
| 1045 | } | 1080 | } |
| 1046 | }); | 1081 | }; |
| 1047 | } | 1082 | let connect_tx = match &self.connect_tx { |
| 1048 | 1083 | Some(tx) => tx.clone(), | |
| 1049 | /// Spawn a relay connection and start its event loop | 1084 | None => { |
| 1050 | async fn spawn_relay_connection( | 1085 | tracing::error!( |
| 1051 | &self, | 1086 | relay = %relay_url, |
| 1052 | relay_url: String, | 1087 | "Cannot spawn connection - channels not initialized" |
| 1053 | disconnect_tx: tokio::sync::mpsc::Sender<DisconnectNotification>, | 1088 | ); |
| 1054 | eose_tx: tokio::sync::mpsc::Sender<EoseNotification>, | 1089 | return; |
| 1055 | connect_tx: tokio::sync::mpsc::Sender<ConnectNotification>, | 1090 | } |
| 1056 | ) { | 1091 | }; |
| 1057 | use tokio::sync::mpsc; | ||
| 1058 | 1092 | ||
| 1059 | let database = Arc::clone(&self.database); | 1093 | let database = Arc::clone(&self.database); |
| 1060 | let write_policy = self.write_policy.clone(); | 1094 | let write_policy = self.write_policy.clone(); |
| 1061 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | 1095 | let relay_sync_index = Arc::clone(&self.relay_sync_index); |
| 1062 | 1096 | ||
| 1097 | // Check if this is a bootstrap relay | ||
| 1098 | let is_bootstrap = self.bootstrap_relay_url.as_ref() == Some(&relay_url); | ||
| 1099 | |||
| 1063 | // Create relay connection | 1100 | // Create relay connection |
| 1064 | let connection = RelayConnection::new(relay_url.clone()); | 1101 | let connection = RelayConnection::new(relay_url.clone()); |
| 1065 | 1102 | ||
| 1066 | // Connect and subscribe to Layer 1 | 1103 | // Connect and subscribe to Layer 1 |
| 1067 | if let Err(e) = connection.connect_and_subscribe(None).await { | 1104 | if let Err(e) = connection.connect_and_subscribe(None).await { |
| 1068 | tracing::error!("Failed to connect to relay {}: {}", relay_url, e); | 1105 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); |
| 1106 | // Update state to disconnected on failure | ||
| 1107 | { | ||
| 1108 | let mut index = relay_sync_index.write().await; | ||
| 1109 | if let Some(state) = index.get_mut(&relay_url) { | ||
| 1110 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 1111 | } | ||
| 1112 | } | ||
| 1069 | return; | 1113 | return; |
| 1070 | } | 1114 | } |
| 1071 | 1115 | ||
| 1072 | // Mark as connected in relay sync index | 1116 | // Mark as connected in relay sync index |
| 1073 | { | 1117 | { |
| 1074 | let mut index = relay_sync_index.write().await; | 1118 | let mut index = relay_sync_index.write().await; |
| 1075 | index.insert( | 1119 | let state = index.entry(relay_url.clone()).or_default(); |
| 1076 | relay_url.clone(), | 1120 | state.connection_status = ConnectionStatus::Connected; |
| 1077 | RelayState { | 1121 | state.is_bootstrap = is_bootstrap; |
| 1078 | repos: HashSet::new(), | 1122 | state.last_connected = Some(Timestamp::now()); |
| 1079 | root_events: HashSet::new(), | 1123 | state.disconnected_at = None; |
| 1080 | is_bootstrap: true, | ||
| 1081 | connection_status: ConnectionStatus::Connected, | ||
| 1082 | last_connected: Some(Timestamp::now()), | ||
| 1083 | disconnected_at: None, | ||
| 1084 | }, | ||
| 1085 | ); | ||
| 1086 | } | 1124 | } |
| 1087 | 1125 | ||
| 1126 | // Store connection for later use (for subscribing to filters) | ||
| 1127 | self.connections.insert(relay_url.clone(), connection); | ||
| 1128 | |||
| 1088 | // Notify SyncManager of successful connection | 1129 | // Notify SyncManager of successful connection |
| 1089 | let _ = connect_tx | 1130 | let _ = connect_tx |
| 1090 | .send(ConnectNotification { | 1131 | .send(ConnectNotification { |
| @@ -1092,6 +1133,16 @@ impl SyncManager { | |||
| 1092 | }) | 1133 | }) |
| 1093 | .await; | 1134 | .await; |
| 1094 | 1135 | ||
| 1136 | // Get the connection back for the event loop | ||
| 1137 | // We need to take it out because run_event_loop consumes self | ||
| 1138 | let connection = match self.connections.remove(&relay_url) { | ||
| 1139 | Some(conn) => conn, | ||
| 1140 | None => { | ||
| 1141 | tracing::error!(relay = %relay_url, "Connection disappeared after insert"); | ||
| 1142 | return; | ||
| 1143 | } | ||
| 1144 | }; | ||
| 1145 | |||
| 1095 | // Create event channel | 1146 | // Create event channel |
| 1096 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | 1147 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); |
| 1097 | 1148 | ||
| @@ -1155,6 +1206,12 @@ impl SyncManager { | |||
| 1155 | } | 1206 | } |
| 1156 | } | 1207 | } |
| 1157 | }); | 1208 | }); |
| 1209 | |||
| 1210 | tracing::info!( | ||
| 1211 | relay = %relay_url, | ||
| 1212 | is_bootstrap = is_bootstrap, | ||
| 1213 | "Spawned relay connection" | ||
| 1214 | ); | ||
| 1158 | } | 1215 | } |
| 1159 | 1216 | ||
| 1160 | /// Process a single event from a relay (static version for spawned tasks) | 1217 | /// Process a single event from a relay (static version for spawned tasks) |