From c9c1cdff2b2c836cd18e3f1fd6f46b0bdb0d6f7e Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 13:56:45 +0000 Subject: refactor: remove redundant RelayAction enum (SIMPLIFY-1) --- src/sync/mod.rs | 51 +++------------------ src/sync/self_subscriber.rs | 106 ++++++++++++++++++++++---------------------- 2 files changed, 59 insertions(+), 98 deletions(-) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3c69fb9..0e39aaf 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -25,7 +25,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; pub use relay_connection::{RelayConnection, RelayEvent}; // Re-export self-subscriber types -pub use self_subscriber::{RelayAction, SelfSubscriber}; +pub use self_subscriber::SelfSubscriber; // Re-export health tracking types pub use health::RelayHealthTracker; @@ -702,7 +702,7 @@ impl SyncManager { ); // 1. Create action channel for self-subscriber -> manager communication - let (action_tx, mut action_rx) = mpsc::channel::(100); + let (action_tx, mut action_rx) = mpsc::channel::(100); // 2. Create disconnect channel for spawned tasks -> manager communication let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); @@ -760,51 +760,10 @@ impl SyncManager { tokio::select! { action = action_rx.recv() => { match action { - Some(RelayAction::SpawnRelay { relay_url, repos }) => { - // Convert to AddFilters format and use unified handler - let root_events: HashSet = repos.values().flatten().cloned().collect(); - let repo_ids: HashSet = repos.keys().cloned().collect(); - - // Build filters for these repos - let filters = crate::sync::filters::build_layer2_and_layer3_filters( - &repo_ids, - &root_events, - None, - ); - - let action = AddFilters { - relay_url, - repos: repo_ids, - root_events, - filters, - }; - - // Acquire lock to process action + Some(add_filters) => { + // Process AddFilters action directly let mut manager = sync_manager.lock().await; - manager.handle_add_filters(action).await; - } - Some(RelayAction::AddFilters { relay_url, repos }) => { - // Convert to AddFilters format and use unified handler - let root_events: HashSet = repos.values().flatten().cloned().collect(); - let repo_ids: HashSet = repos.keys().cloned().collect(); - - // Build filters for these repos - let filters = crate::sync::filters::build_layer2_and_layer3_filters( - &repo_ids, - &root_events, - None, - ); - - let action = AddFilters { - relay_url, - repos: repo_ids, - root_events, - filters, - }; - - // Acquire lock to process action - let mut manager = sync_manager.lock().await; - manager.handle_add_filters(action).await; + manager.handle_add_filters(add_filters).await; } None => break, } diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 6f95ad3..8113035 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -15,30 +15,7 @@ use nostr_sdk::prelude::*; use nostr_sdk::Timestamp; use tokio::sync::{broadcast, mpsc}; -use super::{RepoSyncIndex, RepoSyncNeeds}; - -// ============================================================================= -// RelayAction - Actions to send to SyncManager -// ============================================================================= - -/// Actions that the SelfSubscriber sends to the SyncManager -#[derive(Debug)] -pub enum RelayAction { - /// Spawn a new relay connection - SpawnRelay { - /// The relay URL to connect to - relay_url: String, - /// Repos to sync, mapped to their root event IDs - repos: HashMap>, - }, - /// Add filters to an existing relay connection - AddFilters { - /// The relay URL to add filters to - relay_url: String, - /// Repos to sync, mapped to their root event IDs - repos: HashMap>, - }, -} +use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; // ============================================================================= // PendingUpdates - Accumulator for batching @@ -100,8 +77,8 @@ pub struct SelfSubscriber { relay_domain: String, /// Shared index of repos to sync repo_sync_index: RepoSyncIndex, - /// Channel to send actions to SyncManager - action_tx: mpsc::Sender, + /// Channel to send AddFilters actions to SyncManager + action_tx: mpsc::Sender, /// Last time we connected - used for since filter on reconnect last_connected: Option, } @@ -113,12 +90,12 @@ impl SelfSubscriber { /// * `own_relay_url` - The WebSocket URL of our own relay /// * `relay_domain` - Our service domain (used for filtering relevant repos) /// * `repo_sync_index` - Shared index to update with discovered repos - /// * `action_tx` - Channel to send RelayActions to the SyncManager + /// * `action_tx` - Channel to send AddFilters actions to the SyncManager pub fn new( own_relay_url: String, relay_domain: String, repo_sync_index: RepoSyncIndex, - action_tx: mpsc::Sender, + action_tx: mpsc::Sender, ) -> Self { Self { own_relay_url, @@ -163,9 +140,11 @@ impl SelfSubscriber { } } "clone" if tag_vec.len() >= 2 => { - // Convert http(s) clone URL to ws(s) relay URL - if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) { - relays.insert(relay_url); + // Convert ALL http(s) clone URLs to ws(s) relay URLs + for clone_url in tag_vec.iter().skip(1) { + if let Some(relay_url) = clone_url_to_relay_url(clone_url) { + relays.insert(relay_url); + } } } _ => {} @@ -302,10 +281,13 @@ impl SelfSubscriber { // 30617 announcements don't contribute to root_events - those are // the 1617/1618/1619/1621 event IDs that get added when we receive // root events via handle_root_event. See mod.rs:71 for details. - pending.add_repo(repo_id, relays, HashSet::new()); - tracing::debug!( + pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new()); + tracing::info!( event_id = %event.id, - "Queued 30617 announcement for batch processing" + repo_id = %repo_id, + relay_count = relays.len(), + relays = ?relays, + "[DIAG] Queued 30617 announcement for batch processing" ); } } else { @@ -493,32 +475,38 @@ impl SelfSubscriber { let targets = derive_relay_targets(&index); drop(index); // Release lock before async operations - // For each relay, send SpawnRelay action - // SyncManager will check if relay already exists + // For each relay, send AddFilters action directly + // SyncManager's handle_add_filters auto-spawns connection for unknown relays for (relay_url, needs) in targets { // Skip our own relay URL (we're subscribed to ourselves via self-subscription) if relay_url.contains(&self.relay_domain) { continue; } - // Convert needs to HashMap> - let mut repos = HashMap::new(); - for repo_id in needs.repos { - repos.insert(repo_id, needs.root_events.clone()); - } + // Build filters for these repos + let filters = crate::sync::filters::build_layer2_and_layer3_filters( + &needs.repos, + &needs.root_events, + None, + ); - let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos }; + let action = AddFilters { + relay_url: relay_url.clone(), + repos: needs.repos, + root_events: needs.root_events, + filters, + }; if let Err(e) = self.action_tx.send(action).await { tracing::error!( relay = %relay_url, error = %e, - "Failed to send SpawnRelay action" + "Failed to send AddFilters action" ); } else { tracing::debug!( relay = %relay_url, - "Sent SpawnRelay action to SyncManager" + "Sent AddFilters action to SyncManager" ); } } @@ -531,16 +519,22 @@ impl SelfSubscriber { /// Convert clone URL to relay URL /// -/// Converts http:// to ws:// and https:// to wss:// +/// Converts http://domain:port/path.git to ws://domain:port +/// Converts https://domain:port/path.git to wss://domain:port +/// Strips the path component to get just the relay URL /// Returns None for unsupported URL schemes fn clone_url_to_relay_url(clone_url: &str) -> Option { - if clone_url.starts_with("http://") { - Some(clone_url.replacen("http://", "ws://", 1)) + let (ws_scheme, rest) = if clone_url.starts_with("http://") { + ("ws://", clone_url.strip_prefix("http://")?) } else if clone_url.starts_with("https://") { - Some(clone_url.replacen("https://", "wss://", 1)) + ("wss://", clone_url.strip_prefix("https://")?) } else { - None - } + return None; + }; + + // Extract just the host:port part (everything before the first /) + let host_port = rest.split('/').next()?; + Some(format!("{}{}", ws_scheme, host_port)) } #[cfg(test)] @@ -551,7 +545,7 @@ mod tests { fn test_clone_url_to_relay_url_https() { assert_eq!( clone_url_to_relay_url("https://example.com/repo.git"), - Some("wss://example.com/repo.git".to_string()) + Some("wss://example.com".to_string()) ); } @@ -559,7 +553,15 @@ mod tests { fn test_clone_url_to_relay_url_http() { assert_eq!( clone_url_to_relay_url("http://localhost:3000/repo.git"), - Some("ws://localhost:3000/repo.git".to_string()) + Some("ws://localhost:3000".to_string()) + ); + } + + #[test] + fn test_clone_url_to_relay_url_with_port() { + assert_eq!( + clone_url_to_relay_url("http://127.0.0.1:41463/test-repo.git"), + Some("ws://127.0.0.1:41463".to_string()) ); } -- cgit v1.2.3