diff options
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 106 |
1 files changed, 54 insertions, 52 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 6f95ad3..8113035 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -15,30 +15,7 @@ use nostr_sdk::prelude::*; | |||
| 15 | use nostr_sdk::Timestamp; | 15 | use nostr_sdk::Timestamp; |
| 16 | use tokio::sync::{broadcast, mpsc}; | 16 | use tokio::sync::{broadcast, mpsc}; |
| 17 | 17 | ||
| 18 | use super::{RepoSyncIndex, RepoSyncNeeds}; | 18 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 19 | |||
| 20 | // ============================================================================= | ||
| 21 | // RelayAction - Actions to send to SyncManager | ||
| 22 | // ============================================================================= | ||
| 23 | |||
| 24 | /// Actions that the SelfSubscriber sends to the SyncManager | ||
| 25 | #[derive(Debug)] | ||
| 26 | pub enum RelayAction { | ||
| 27 | /// Spawn a new relay connection | ||
| 28 | SpawnRelay { | ||
| 29 | /// The relay URL to connect to | ||
| 30 | relay_url: String, | ||
| 31 | /// Repos to sync, mapped to their root event IDs | ||
| 32 | repos: HashMap<String, HashSet<EventId>>, | ||
| 33 | }, | ||
| 34 | /// Add filters to an existing relay connection | ||
| 35 | AddFilters { | ||
| 36 | /// The relay URL to add filters to | ||
| 37 | relay_url: String, | ||
| 38 | /// Repos to sync, mapped to their root event IDs | ||
| 39 | repos: HashMap<String, HashSet<EventId>>, | ||
| 40 | }, | ||
| 41 | } | ||
| 42 | 19 | ||
| 43 | // ============================================================================= | 20 | // ============================================================================= |
| 44 | // PendingUpdates - Accumulator for batching | 21 | // PendingUpdates - Accumulator for batching |
| @@ -100,8 +77,8 @@ pub struct SelfSubscriber { | |||
| 100 | relay_domain: String, | 77 | relay_domain: String, |
| 101 | /// Shared index of repos to sync | 78 | /// Shared index of repos to sync |
| 102 | repo_sync_index: RepoSyncIndex, | 79 | repo_sync_index: RepoSyncIndex, |
| 103 | /// Channel to send actions to SyncManager | 80 | /// Channel to send AddFilters actions to SyncManager |
| 104 | action_tx: mpsc::Sender<RelayAction>, | 81 | action_tx: mpsc::Sender<AddFilters>, |
| 105 | /// Last time we connected - used for since filter on reconnect | 82 | /// Last time we connected - used for since filter on reconnect |
| 106 | last_connected: Option<Timestamp>, | 83 | last_connected: Option<Timestamp>, |
| 107 | } | 84 | } |
| @@ -113,12 +90,12 @@ impl SelfSubscriber { | |||
| 113 | /// * `own_relay_url` - The WebSocket URL of our own relay | 90 | /// * `own_relay_url` - The WebSocket URL of our own relay |
| 114 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 91 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 115 | /// * `repo_sync_index` - Shared index to update with discovered repos | 92 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 116 | /// * `action_tx` - Channel to send RelayActions to the SyncManager | 93 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 117 | pub fn new( | 94 | pub fn new( |
| 118 | own_relay_url: String, | 95 | own_relay_url: String, |
| 119 | relay_domain: String, | 96 | relay_domain: String, |
| 120 | repo_sync_index: RepoSyncIndex, | 97 | repo_sync_index: RepoSyncIndex, |
| 121 | action_tx: mpsc::Sender<RelayAction>, | 98 | action_tx: mpsc::Sender<AddFilters>, |
| 122 | ) -> Self { | 99 | ) -> Self { |
| 123 | Self { | 100 | Self { |
| 124 | own_relay_url, | 101 | own_relay_url, |
| @@ -163,9 +140,11 @@ impl SelfSubscriber { | |||
| 163 | } | 140 | } |
| 164 | } | 141 | } |
| 165 | "clone" if tag_vec.len() >= 2 => { | 142 | "clone" if tag_vec.len() >= 2 => { |
| 166 | // Convert http(s) clone URL to ws(s) relay URL | 143 | // Convert ALL http(s) clone URLs to ws(s) relay URLs |
| 167 | if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) { | 144 | for clone_url in tag_vec.iter().skip(1) { |
| 168 | relays.insert(relay_url); | 145 | if let Some(relay_url) = clone_url_to_relay_url(clone_url) { |
| 146 | relays.insert(relay_url); | ||
| 147 | } | ||
| 169 | } | 148 | } |
| 170 | } | 149 | } |
| 171 | _ => {} | 150 | _ => {} |
| @@ -302,10 +281,13 @@ impl SelfSubscriber { | |||
| 302 | // 30617 announcements don't contribute to root_events - those are | 281 | // 30617 announcements don't contribute to root_events - those are |
| 303 | // the 1617/1618/1619/1621 event IDs that get added when we receive | 282 | // the 1617/1618/1619/1621 event IDs that get added when we receive |
| 304 | // root events via handle_root_event. See mod.rs:71 for details. | 283 | // root events via handle_root_event. See mod.rs:71 for details. |
| 305 | pending.add_repo(repo_id, relays, HashSet::new()); | 284 | pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new()); |
| 306 | tracing::debug!( | 285 | tracing::info!( |
| 307 | event_id = %event.id, | 286 | event_id = %event.id, |
| 308 | "Queued 30617 announcement for batch processing" | 287 | repo_id = %repo_id, |
| 288 | relay_count = relays.len(), | ||
| 289 | relays = ?relays, | ||
| 290 | "[DIAG] Queued 30617 announcement for batch processing" | ||
| 309 | ); | 291 | ); |
| 310 | } | 292 | } |
| 311 | } else { | 293 | } else { |
| @@ -493,32 +475,38 @@ impl SelfSubscriber { | |||
| 493 | let targets = derive_relay_targets(&index); | 475 | let targets = derive_relay_targets(&index); |
| 494 | drop(index); // Release lock before async operations | 476 | drop(index); // Release lock before async operations |
| 495 | 477 | ||
| 496 | // For each relay, send SpawnRelay action | 478 | // For each relay, send AddFilters action directly |
| 497 | // SyncManager will check if relay already exists | 479 | // SyncManager's handle_add_filters auto-spawns connection for unknown relays |
| 498 | for (relay_url, needs) in targets { | 480 | for (relay_url, needs) in targets { |
| 499 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) | 481 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) |
| 500 | if relay_url.contains(&self.relay_domain) { | 482 | if relay_url.contains(&self.relay_domain) { |
| 501 | continue; | 483 | continue; |
| 502 | } | 484 | } |
| 503 | 485 | ||
| 504 | // Convert needs to HashMap<String, HashSet<EventId>> | 486 | // Build filters for these repos |
| 505 | let mut repos = HashMap::new(); | 487 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( |
| 506 | for repo_id in needs.repos { | 488 | &needs.repos, |
| 507 | repos.insert(repo_id, needs.root_events.clone()); | 489 | &needs.root_events, |
| 508 | } | 490 | None, |
| 491 | ); | ||
| 509 | 492 | ||
| 510 | let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos }; | 493 | let action = AddFilters { |
| 494 | relay_url: relay_url.clone(), | ||
| 495 | repos: needs.repos, | ||
| 496 | root_events: needs.root_events, | ||
| 497 | filters, | ||
| 498 | }; | ||
| 511 | 499 | ||
| 512 | if let Err(e) = self.action_tx.send(action).await { | 500 | if let Err(e) = self.action_tx.send(action).await { |
| 513 | tracing::error!( | 501 | tracing::error!( |
| 514 | relay = %relay_url, | 502 | relay = %relay_url, |
| 515 | error = %e, | 503 | error = %e, |
| 516 | "Failed to send SpawnRelay action" | 504 | "Failed to send AddFilters action" |
| 517 | ); | 505 | ); |
| 518 | } else { | 506 | } else { |
| 519 | tracing::debug!( | 507 | tracing::debug!( |
| 520 | relay = %relay_url, | 508 | relay = %relay_url, |
| 521 | "Sent SpawnRelay action to SyncManager" | 509 | "Sent AddFilters action to SyncManager" |
| 522 | ); | 510 | ); |
| 523 | } | 511 | } |
| 524 | } | 512 | } |
| @@ -531,16 +519,22 @@ impl SelfSubscriber { | |||
| 531 | 519 | ||
| 532 | /// Convert clone URL to relay URL | 520 | /// Convert clone URL to relay URL |
| 533 | /// | 521 | /// |
| 534 | /// Converts http:// to ws:// and https:// to wss:// | 522 | /// Converts http://domain:port/path.git to ws://domain:port |
| 523 | /// Converts https://domain:port/path.git to wss://domain:port | ||
| 524 | /// Strips the path component to get just the relay URL | ||
| 535 | /// Returns None for unsupported URL schemes | 525 | /// Returns None for unsupported URL schemes |
| 536 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | 526 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { |
| 537 | if clone_url.starts_with("http://") { | 527 | let (ws_scheme, rest) = if clone_url.starts_with("http://") { |
| 538 | Some(clone_url.replacen("http://", "ws://", 1)) | 528 | ("ws://", clone_url.strip_prefix("http://")?) |
| 539 | } else if clone_url.starts_with("https://") { | 529 | } else if clone_url.starts_with("https://") { |
| 540 | Some(clone_url.replacen("https://", "wss://", 1)) | 530 | ("wss://", clone_url.strip_prefix("https://")?) |
| 541 | } else { | 531 | } else { |
| 542 | None | 532 | return None; |
| 543 | } | 533 | }; |
| 534 | |||
| 535 | // Extract just the host:port part (everything before the first /) | ||
| 536 | let host_port = rest.split('/').next()?; | ||
| 537 | Some(format!("{}{}", ws_scheme, host_port)) | ||
| 544 | } | 538 | } |
| 545 | 539 | ||
| 546 | #[cfg(test)] | 540 | #[cfg(test)] |
| @@ -551,7 +545,7 @@ mod tests { | |||
| 551 | fn test_clone_url_to_relay_url_https() { | 545 | fn test_clone_url_to_relay_url_https() { |
| 552 | assert_eq!( | 546 | assert_eq!( |
| 553 | clone_url_to_relay_url("https://example.com/repo.git"), | 547 | clone_url_to_relay_url("https://example.com/repo.git"), |
| 554 | Some("wss://example.com/repo.git".to_string()) | 548 | Some("wss://example.com".to_string()) |
| 555 | ); | 549 | ); |
| 556 | } | 550 | } |
| 557 | 551 | ||
| @@ -559,7 +553,15 @@ mod tests { | |||
| 559 | fn test_clone_url_to_relay_url_http() { | 553 | fn test_clone_url_to_relay_url_http() { |
| 560 | assert_eq!( | 554 | assert_eq!( |
| 561 | clone_url_to_relay_url("http://localhost:3000/repo.git"), | 555 | clone_url_to_relay_url("http://localhost:3000/repo.git"), |
| 562 | Some("ws://localhost:3000/repo.git".to_string()) | 556 | Some("ws://localhost:3000".to_string()) |
| 557 | ); | ||
| 558 | } | ||
| 559 | |||
| 560 | #[test] | ||
| 561 | fn test_clone_url_to_relay_url_with_port() { | ||
| 562 | assert_eq!( | ||
| 563 | clone_url_to_relay_url("http://127.0.0.1:41463/test-repo.git"), | ||
| 564 | Some("ws://127.0.0.1:41463".to_string()) | ||
| 563 | ); | 565 | ); |
| 564 | } | 566 | } |
| 565 | 567 | ||