diff options
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 51 |
1 files changed, 5 insertions, 46 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3c69fb9..0e39aaf 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -25,7 +25,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; | |||
| 25 | pub use relay_connection::{RelayConnection, RelayEvent}; | 25 | pub use relay_connection::{RelayConnection, RelayEvent}; |
| 26 | 26 | ||
| 27 | // Re-export self-subscriber types | 27 | // Re-export self-subscriber types |
| 28 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | 28 | pub use self_subscriber::SelfSubscriber; |
| 29 | 29 | ||
| 30 | // Re-export health tracking types | 30 | // Re-export health tracking types |
| 31 | pub use health::RelayHealthTracker; | 31 | pub use health::RelayHealthTracker; |
| @@ -702,7 +702,7 @@ impl SyncManager { | |||
| 702 | ); | 702 | ); |
| 703 | 703 | ||
| 704 | // 1. Create action channel for self-subscriber -> manager communication | 704 | // 1. Create action channel for self-subscriber -> manager communication |
| 705 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); | 705 | let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100); |
| 706 | 706 | ||
| 707 | // 2. Create disconnect channel for spawned tasks -> manager communication | 707 | // 2. Create disconnect channel for spawned tasks -> manager communication |
| 708 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); | 708 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); |
| @@ -760,51 +760,10 @@ impl SyncManager { | |||
| 760 | tokio::select! { | 760 | tokio::select! { |
| 761 | action = action_rx.recv() => { | 761 | action = action_rx.recv() => { |
| 762 | match action { | 762 | match action { |
| 763 | Some(RelayAction::SpawnRelay { relay_url, repos }) => { | 763 | Some(add_filters) => { |
| 764 | // Convert to AddFilters format and use unified handler | 764 | // Process AddFilters action directly |
| 765 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | ||
| 766 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | ||
| 767 | |||
| 768 | // Build filters for these repos | ||
| 769 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | ||
| 770 | &repo_ids, | ||
| 771 | &root_events, | ||
| 772 | None, | ||
| 773 | ); | ||
| 774 | |||
| 775 | let action = AddFilters { | ||
| 776 | relay_url, | ||
| 777 | repos: repo_ids, | ||
| 778 | root_events, | ||
| 779 | filters, | ||
| 780 | }; | ||
| 781 | |||
| 782 | // Acquire lock to process action | ||
| 783 | let mut manager = sync_manager.lock().await; | 765 | let mut manager = sync_manager.lock().await; |
| 784 | manager.handle_add_filters(action).await; | 766 | manager.handle_add_filters(add_filters).await; |
| 785 | } | ||
| 786 | Some(RelayAction::AddFilters { relay_url, repos }) => { | ||
| 787 | // Convert to AddFilters format and use unified handler | ||
| 788 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | ||
| 789 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | ||
| 790 | |||
| 791 | // Build filters for these repos | ||
| 792 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | ||
| 793 | &repo_ids, | ||
| 794 | &root_events, | ||
| 795 | None, | ||
| 796 | ); | ||
| 797 | |||
| 798 | let action = AddFilters { | ||
| 799 | relay_url, | ||
| 800 | repos: repo_ids, | ||
| 801 | root_events, | ||
| 802 | filters, | ||
| 803 | }; | ||
| 804 | |||
| 805 | // Acquire lock to process action | ||
| 806 | let mut manager = sync_manager.lock().await; | ||
| 807 | manager.handle_add_filters(action).await; | ||
| 808 | } | 767 | } |
| 809 | None => break, | 768 | None => break, |
| 810 | } | 769 | } |