diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/mod.rs | 51 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 106 |
2 files changed, 59 insertions, 98 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3c69fb9..0e39aaf 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -25,7 +25,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; | |||
| 25 | pub use relay_connection::{RelayConnection, RelayEvent}; | 25 | pub use relay_connection::{RelayConnection, RelayEvent}; |
| 26 | 26 | ||
| 27 | // Re-export self-subscriber types | 27 | // Re-export self-subscriber types |
| 28 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | 28 | pub use self_subscriber::SelfSubscriber; |
| 29 | 29 | ||
| 30 | // Re-export health tracking types | 30 | // Re-export health tracking types |
| 31 | pub use health::RelayHealthTracker; | 31 | pub use health::RelayHealthTracker; |
| @@ -702,7 +702,7 @@ impl SyncManager { | |||
| 702 | ); | 702 | ); |
| 703 | 703 | ||
| 704 | // 1. Create action channel for self-subscriber -> manager communication | 704 | // 1. Create action channel for self-subscriber -> manager communication |
| 705 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); | 705 | let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100); |
| 706 | 706 | ||
| 707 | // 2. Create disconnect channel for spawned tasks -> manager communication | 707 | // 2. Create disconnect channel for spawned tasks -> manager communication |
| 708 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); | 708 | let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); |
| @@ -760,51 +760,10 @@ impl SyncManager { | |||
| 760 | tokio::select! { | 760 | tokio::select! { |
| 761 | action = action_rx.recv() => { | 761 | action = action_rx.recv() => { |
| 762 | match action { | 762 | match action { |
| 763 | Some(RelayAction::SpawnRelay { relay_url, repos }) => { | 763 | Some(add_filters) => { |
| 764 | // Convert to AddFilters format and use unified handler | 764 | // Process AddFilters action directly |
| 765 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | ||
| 766 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | ||
| 767 | |||
| 768 | // Build filters for these repos | ||
| 769 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | ||
| 770 | &repo_ids, | ||
| 771 | &root_events, | ||
| 772 | None, | ||
| 773 | ); | ||
| 774 | |||
| 775 | let action = AddFilters { | ||
| 776 | relay_url, | ||
| 777 | repos: repo_ids, | ||
| 778 | root_events, | ||
| 779 | filters, | ||
| 780 | }; | ||
| 781 | |||
| 782 | // Acquire lock to process action | ||
| 783 | let mut manager = sync_manager.lock().await; | 765 | let mut manager = sync_manager.lock().await; |
| 784 | manager.handle_add_filters(action).await; | 766 | manager.handle_add_filters(add_filters).await; |
| 785 | } | ||
| 786 | Some(RelayAction::AddFilters { relay_url, repos }) => { | ||
| 787 | // Convert to AddFilters format and use unified handler | ||
| 788 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | ||
| 789 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | ||
| 790 | |||
| 791 | // Build filters for these repos | ||
| 792 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | ||
| 793 | &repo_ids, | ||
| 794 | &root_events, | ||
| 795 | None, | ||
| 796 | ); | ||
| 797 | |||
| 798 | let action = AddFilters { | ||
| 799 | relay_url, | ||
| 800 | repos: repo_ids, | ||
| 801 | root_events, | ||
| 802 | filters, | ||
| 803 | }; | ||
| 804 | |||
| 805 | // Acquire lock to process action | ||
| 806 | let mut manager = sync_manager.lock().await; | ||
| 807 | manager.handle_add_filters(action).await; | ||
| 808 | } | 767 | } |
| 809 | None => break, | 768 | None => break, |
| 810 | } | 769 | } |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 6f95ad3..8113035 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -15,30 +15,7 @@ use nostr_sdk::prelude::*; | |||
| 15 | use nostr_sdk::Timestamp; | 15 | use nostr_sdk::Timestamp; |
| 16 | use tokio::sync::{broadcast, mpsc}; | 16 | use tokio::sync::{broadcast, mpsc}; |
| 17 | 17 | ||
| 18 | use super::{RepoSyncIndex, RepoSyncNeeds}; | 18 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 19 | |||
| 20 | // ============================================================================= | ||
| 21 | // RelayAction - Actions to send to SyncManager | ||
| 22 | // ============================================================================= | ||
| 23 | |||
| 24 | /// Actions that the SelfSubscriber sends to the SyncManager | ||
| 25 | #[derive(Debug)] | ||
| 26 | pub enum RelayAction { | ||
| 27 | /// Spawn a new relay connection | ||
| 28 | SpawnRelay { | ||
| 29 | /// The relay URL to connect to | ||
| 30 | relay_url: String, | ||
| 31 | /// Repos to sync, mapped to their root event IDs | ||
| 32 | repos: HashMap<String, HashSet<EventId>>, | ||
| 33 | }, | ||
| 34 | /// Add filters to an existing relay connection | ||
| 35 | AddFilters { | ||
| 36 | /// The relay URL to add filters to | ||
| 37 | relay_url: String, | ||
| 38 | /// Repos to sync, mapped to their root event IDs | ||
| 39 | repos: HashMap<String, HashSet<EventId>>, | ||
| 40 | }, | ||
| 41 | } | ||
| 42 | 19 | ||
| 43 | // ============================================================================= | 20 | // ============================================================================= |
| 44 | // PendingUpdates - Accumulator for batching | 21 | // PendingUpdates - Accumulator for batching |
| @@ -100,8 +77,8 @@ pub struct SelfSubscriber { | |||
| 100 | relay_domain: String, | 77 | relay_domain: String, |
| 101 | /// Shared index of repos to sync | 78 | /// Shared index of repos to sync |
| 102 | repo_sync_index: RepoSyncIndex, | 79 | repo_sync_index: RepoSyncIndex, |
| 103 | /// Channel to send actions to SyncManager | 80 | /// Channel to send AddFilters actions to SyncManager |
| 104 | action_tx: mpsc::Sender<RelayAction>, | 81 | action_tx: mpsc::Sender<AddFilters>, |
| 105 | /// Last time we connected - used for since filter on reconnect | 82 | /// Last time we connected - used for since filter on reconnect |
| 106 | last_connected: Option<Timestamp>, | 83 | last_connected: Option<Timestamp>, |
| 107 | } | 84 | } |
| @@ -113,12 +90,12 @@ impl SelfSubscriber { | |||
| 113 | /// * `own_relay_url` - The WebSocket URL of our own relay | 90 | /// * `own_relay_url` - The WebSocket URL of our own relay |
| 114 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 91 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 115 | /// * `repo_sync_index` - Shared index to update with discovered repos | 92 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 116 | /// * `action_tx` - Channel to send RelayActions to the SyncManager | 93 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 117 | pub fn new( | 94 | pub fn new( |
| 118 | own_relay_url: String, | 95 | own_relay_url: String, |
| 119 | relay_domain: String, | 96 | relay_domain: String, |
| 120 | repo_sync_index: RepoSyncIndex, | 97 | repo_sync_index: RepoSyncIndex, |
| 121 | action_tx: mpsc::Sender<RelayAction>, | 98 | action_tx: mpsc::Sender<AddFilters>, |
| 122 | ) -> Self { | 99 | ) -> Self { |
| 123 | Self { | 100 | Self { |
| 124 | own_relay_url, | 101 | own_relay_url, |
| @@ -163,9 +140,11 @@ impl SelfSubscriber { | |||
| 163 | } | 140 | } |
| 164 | } | 141 | } |
| 165 | "clone" if tag_vec.len() >= 2 => { | 142 | "clone" if tag_vec.len() >= 2 => { |
| 166 | // Convert http(s) clone URL to ws(s) relay URL | 143 | // Convert ALL http(s) clone URLs to ws(s) relay URLs |
| 167 | if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) { | 144 | for clone_url in tag_vec.iter().skip(1) { |
| 168 | relays.insert(relay_url); | 145 | if let Some(relay_url) = clone_url_to_relay_url(clone_url) { |
| 146 | relays.insert(relay_url); | ||
| 147 | } | ||
| 169 | } | 148 | } |
| 170 | } | 149 | } |
| 171 | _ => {} | 150 | _ => {} |
| @@ -302,10 +281,13 @@ impl SelfSubscriber { | |||
| 302 | // 30617 announcements don't contribute to root_events - those are | 281 | // 30617 announcements don't contribute to root_events - those are |
| 303 | // the 1617/1618/1619/1621 event IDs that get added when we receive | 282 | // the 1617/1618/1619/1621 event IDs that get added when we receive |
| 304 | // root events via handle_root_event. See mod.rs:71 for details. | 283 | // root events via handle_root_event. See mod.rs:71 for details. |
| 305 | pending.add_repo(repo_id, relays, HashSet::new()); | 284 | pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new()); |
| 306 | tracing::debug!( | 285 | tracing::info!( |
| 307 | event_id = %event.id, | 286 | event_id = %event.id, |
| 308 | "Queued 30617 announcement for batch processing" | 287 | repo_id = %repo_id, |
| 288 | relay_count = relays.len(), | ||
| 289 | relays = ?relays, | ||
| 290 | "[DIAG] Queued 30617 announcement for batch processing" | ||
| 309 | ); | 291 | ); |
| 310 | } | 292 | } |
| 311 | } else { | 293 | } else { |
| @@ -493,32 +475,38 @@ impl SelfSubscriber { | |||
| 493 | let targets = derive_relay_targets(&index); | 475 | let targets = derive_relay_targets(&index); |
| 494 | drop(index); // Release lock before async operations | 476 | drop(index); // Release lock before async operations |
| 495 | 477 | ||
| 496 | // For each relay, send SpawnRelay action | 478 | // For each relay, send AddFilters action directly |
| 497 | // SyncManager will check if relay already exists | 479 | // SyncManager's handle_add_filters auto-spawns connection for unknown relays |
| 498 | for (relay_url, needs) in targets { | 480 | for (relay_url, needs) in targets { |
| 499 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) | 481 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) |
| 500 | if relay_url.contains(&self.relay_domain) { | 482 | if relay_url.contains(&self.relay_domain) { |
| 501 | continue; | 483 | continue; |
| 502 | } | 484 | } |
| 503 | 485 | ||
| 504 | // Convert needs to HashMap<String, HashSet<EventId>> | 486 | // Build filters for these repos |
| 505 | let mut repos = HashMap::new(); | 487 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( |
| 506 | for repo_id in needs.repos { | 488 | &needs.repos, |
| 507 | repos.insert(repo_id, needs.root_events.clone()); | 489 | &needs.root_events, |
| 508 | } | 490 | None, |
| 491 | ); | ||
| 509 | 492 | ||
| 510 | let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos }; | 493 | let action = AddFilters { |
| 494 | relay_url: relay_url.clone(), | ||
| 495 | repos: needs.repos, | ||
| 496 | root_events: needs.root_events, | ||
| 497 | filters, | ||
| 498 | }; | ||
| 511 | 499 | ||
| 512 | if let Err(e) = self.action_tx.send(action).await { | 500 | if let Err(e) = self.action_tx.send(action).await { |
| 513 | tracing::error!( | 501 | tracing::error!( |
| 514 | relay = %relay_url, | 502 | relay = %relay_url, |
| 515 | error = %e, | 503 | error = %e, |
| 516 | "Failed to send SpawnRelay action" | 504 | "Failed to send AddFilters action" |
| 517 | ); | 505 | ); |
| 518 | } else { | 506 | } else { |
| 519 | tracing::debug!( | 507 | tracing::debug!( |
| 520 | relay = %relay_url, | 508 | relay = %relay_url, |
| 521 | "Sent SpawnRelay action to SyncManager" | 509 | "Sent AddFilters action to SyncManager" |
| 522 | ); | 510 | ); |
| 523 | } | 511 | } |
| 524 | } | 512 | } |
| @@ -531,16 +519,22 @@ impl SelfSubscriber { | |||
| 531 | 519 | ||
| 532 | /// Convert clone URL to relay URL | 520 | /// Convert clone URL to relay URL |
| 533 | /// | 521 | /// |
| 534 | /// Converts http:// to ws:// and https:// to wss:// | 522 | /// Converts http://domain:port/path.git to ws://domain:port |
| 523 | /// Converts https://domain:port/path.git to wss://domain:port | ||
| 524 | /// Strips the path component to get just the relay URL | ||
| 535 | /// Returns None for unsupported URL schemes | 525 | /// Returns None for unsupported URL schemes |
| 536 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | 526 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { |
| 537 | if clone_url.starts_with("http://") { | 527 | let (ws_scheme, rest) = if clone_url.starts_with("http://") { |
| 538 | Some(clone_url.replacen("http://", "ws://", 1)) | 528 | ("ws://", clone_url.strip_prefix("http://")?) |
| 539 | } else if clone_url.starts_with("https://") { | 529 | } else if clone_url.starts_with("https://") { |
| 540 | Some(clone_url.replacen("https://", "wss://", 1)) | 530 | ("wss://", clone_url.strip_prefix("https://")?) |
| 541 | } else { | 531 | } else { |
| 542 | None | 532 | return None; |
| 543 | } | 533 | }; |
| 534 | |||
| 535 | // Extract just the host:port part (everything before the first /) | ||
| 536 | let host_port = rest.split('/').next()?; | ||
| 537 | Some(format!("{}{}", ws_scheme, host_port)) | ||
| 544 | } | 538 | } |
| 545 | 539 | ||
| 546 | #[cfg(test)] | 540 | #[cfg(test)] |
| @@ -551,7 +545,7 @@ mod tests { | |||
| 551 | fn test_clone_url_to_relay_url_https() { | 545 | fn test_clone_url_to_relay_url_https() { |
| 552 | assert_eq!( | 546 | assert_eq!( |
| 553 | clone_url_to_relay_url("https://example.com/repo.git"), | 547 | clone_url_to_relay_url("https://example.com/repo.git"), |
| 554 | Some("wss://example.com/repo.git".to_string()) | 548 | Some("wss://example.com".to_string()) |
| 555 | ); | 549 | ); |
| 556 | } | 550 | } |
| 557 | 551 | ||
| @@ -559,7 +553,15 @@ mod tests { | |||
| 559 | fn test_clone_url_to_relay_url_http() { | 553 | fn test_clone_url_to_relay_url_http() { |
| 560 | assert_eq!( | 554 | assert_eq!( |
| 561 | clone_url_to_relay_url("http://localhost:3000/repo.git"), | 555 | clone_url_to_relay_url("http://localhost:3000/repo.git"), |
| 562 | Some("ws://localhost:3000/repo.git".to_string()) | 556 | Some("ws://localhost:3000".to_string()) |
| 557 | ); | ||
| 558 | } | ||
| 559 | |||
| 560 | #[test] | ||
| 561 | fn test_clone_url_to_relay_url_with_port() { | ||
| 562 | assert_eq!( | ||
| 563 | clone_url_to_relay_url("http://127.0.0.1:41463/test-repo.git"), | ||
| 564 | Some("ws://127.0.0.1:41463".to_string()) | ||
| 563 | ); | 565 | ); |
| 564 | } | 566 | } |
| 565 | 567 | ||