upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 13:56:45 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 13:56:45 +0000
commitc9c1cdff2b2c836cd18e3f1fd6f46b0bdb0d6f7e (patch)
treed0186747692a4ad80b8d28e1d0af6e086bbccf41
parentb1aefcadf4fbe2ed2b71248208d4f6662b4bee98 (diff)
refactor: remove redundant RelayAction enum (SIMPLIFY-1)
-rw-r--r--src/sync/mod.rs51
-rw-r--r--src/sync/self_subscriber.rs106
2 files changed, 59 insertions, 98 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 3c69fb9..0e39aaf 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -25,7 +25,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds};
25pub use relay_connection::{RelayConnection, RelayEvent}; 25pub use relay_connection::{RelayConnection, RelayEvent};
26 26
27// Re-export self-subscriber types 27// Re-export self-subscriber types
28pub use self_subscriber::{RelayAction, SelfSubscriber}; 28pub use self_subscriber::SelfSubscriber;
29 29
30// Re-export health tracking types 30// Re-export health tracking types
31pub use health::RelayHealthTracker; 31pub use health::RelayHealthTracker;
@@ -702,7 +702,7 @@ impl SyncManager {
702 ); 702 );
703 703
704 // 1. Create action channel for self-subscriber -> manager communication 704 // 1. Create action channel for self-subscriber -> manager communication
705 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); 705 let (action_tx, mut action_rx) = mpsc::channel::<AddFilters>(100);
706 706
707 // 2. Create disconnect channel for spawned tasks -> manager communication 707 // 2. Create disconnect channel for spawned tasks -> manager communication
708 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100); 708 let (disconnect_tx, mut disconnect_rx) = mpsc::channel::<DisconnectNotification>(100);
@@ -760,51 +760,10 @@ impl SyncManager {
760 tokio::select! { 760 tokio::select! {
761 action = action_rx.recv() => { 761 action = action_rx.recv() => {
762 match action { 762 match action {
763 Some(RelayAction::SpawnRelay { relay_url, repos }) => { 763 Some(add_filters) => {
764 // Convert to AddFilters format and use unified handler 764 // Process AddFilters action directly
765 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
766 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
767
768 // Build filters for these repos
769 let filters = crate::sync::filters::build_layer2_and_layer3_filters(
770 &repo_ids,
771 &root_events,
772 None,
773 );
774
775 let action = AddFilters {
776 relay_url,
777 repos: repo_ids,
778 root_events,
779 filters,
780 };
781
782 // Acquire lock to process action
783 let mut manager = sync_manager.lock().await; 765 let mut manager = sync_manager.lock().await;
784 manager.handle_add_filters(action).await; 766 manager.handle_add_filters(add_filters).await;
785 }
786 Some(RelayAction::AddFilters { relay_url, repos }) => {
787 // Convert to AddFilters format and use unified handler
788 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
789 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
790
791 // Build filters for these repos
792 let filters = crate::sync::filters::build_layer2_and_layer3_filters(
793 &repo_ids,
794 &root_events,
795 None,
796 );
797
798 let action = AddFilters {
799 relay_url,
800 repos: repo_ids,
801 root_events,
802 filters,
803 };
804
805 // Acquire lock to process action
806 let mut manager = sync_manager.lock().await;
807 manager.handle_add_filters(action).await;
808 } 767 }
809 None => break, 768 None => break,
810 } 769 }
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 6f95ad3..8113035 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -15,30 +15,7 @@ use nostr_sdk::prelude::*;
15use nostr_sdk::Timestamp; 15use nostr_sdk::Timestamp;
16use tokio::sync::{broadcast, mpsc}; 16use tokio::sync::{broadcast, mpsc};
17 17
18use super::{RepoSyncIndex, RepoSyncNeeds}; 18use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds};
19
20// =============================================================================
21// RelayAction - Actions to send to SyncManager
22// =============================================================================
23
24/// Actions that the SelfSubscriber sends to the SyncManager
25#[derive(Debug)]
26pub enum RelayAction {
27 /// Spawn a new relay connection
28 SpawnRelay {
29 /// The relay URL to connect to
30 relay_url: String,
31 /// Repos to sync, mapped to their root event IDs
32 repos: HashMap<String, HashSet<EventId>>,
33 },
34 /// Add filters to an existing relay connection
35 AddFilters {
36 /// The relay URL to add filters to
37 relay_url: String,
38 /// Repos to sync, mapped to their root event IDs
39 repos: HashMap<String, HashSet<EventId>>,
40 },
41}
42 19
43// ============================================================================= 20// =============================================================================
44// PendingUpdates - Accumulator for batching 21// PendingUpdates - Accumulator for batching
@@ -100,8 +77,8 @@ pub struct SelfSubscriber {
100 relay_domain: String, 77 relay_domain: String,
101 /// Shared index of repos to sync 78 /// Shared index of repos to sync
102 repo_sync_index: RepoSyncIndex, 79 repo_sync_index: RepoSyncIndex,
103 /// Channel to send actions to SyncManager 80 /// Channel to send AddFilters actions to SyncManager
104 action_tx: mpsc::Sender<RelayAction>, 81 action_tx: mpsc::Sender<AddFilters>,
105 /// Last time we connected - used for since filter on reconnect 82 /// Last time we connected - used for since filter on reconnect
106 last_connected: Option<Timestamp>, 83 last_connected: Option<Timestamp>,
107} 84}
@@ -113,12 +90,12 @@ impl SelfSubscriber {
113 /// * `own_relay_url` - The WebSocket URL of our own relay 90 /// * `own_relay_url` - The WebSocket URL of our own relay
114 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 91 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
115 /// * `repo_sync_index` - Shared index to update with discovered repos 92 /// * `repo_sync_index` - Shared index to update with discovered repos
116 /// * `action_tx` - Channel to send RelayActions to the SyncManager 93 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
117 pub fn new( 94 pub fn new(
118 own_relay_url: String, 95 own_relay_url: String,
119 relay_domain: String, 96 relay_domain: String,
120 repo_sync_index: RepoSyncIndex, 97 repo_sync_index: RepoSyncIndex,
121 action_tx: mpsc::Sender<RelayAction>, 98 action_tx: mpsc::Sender<AddFilters>,
122 ) -> Self { 99 ) -> Self {
123 Self { 100 Self {
124 own_relay_url, 101 own_relay_url,
@@ -163,9 +140,11 @@ impl SelfSubscriber {
163 } 140 }
164 } 141 }
165 "clone" if tag_vec.len() >= 2 => { 142 "clone" if tag_vec.len() >= 2 => {
166 // Convert http(s) clone URL to ws(s) relay URL 143 // Convert ALL http(s) clone URLs to ws(s) relay URLs
167 if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) { 144 for clone_url in tag_vec.iter().skip(1) {
168 relays.insert(relay_url); 145 if let Some(relay_url) = clone_url_to_relay_url(clone_url) {
146 relays.insert(relay_url);
147 }
169 } 148 }
170 } 149 }
171 _ => {} 150 _ => {}
@@ -302,10 +281,13 @@ impl SelfSubscriber {
302 // 30617 announcements don't contribute to root_events - those are 281 // 30617 announcements don't contribute to root_events - those are
303 // the 1617/1618/1619/1621 event IDs that get added when we receive 282 // the 1617/1618/1619/1621 event IDs that get added when we receive
304 // root events via handle_root_event. See mod.rs:71 for details. 283 // root events via handle_root_event. See mod.rs:71 for details.
305 pending.add_repo(repo_id, relays, HashSet::new()); 284 pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new());
306 tracing::debug!( 285 tracing::info!(
307 event_id = %event.id, 286 event_id = %event.id,
308 "Queued 30617 announcement for batch processing" 287 repo_id = %repo_id,
288 relay_count = relays.len(),
289 relays = ?relays,
290 "[DIAG] Queued 30617 announcement for batch processing"
309 ); 291 );
310 } 292 }
311 } else { 293 } else {
@@ -493,32 +475,38 @@ impl SelfSubscriber {
493 let targets = derive_relay_targets(&index); 475 let targets = derive_relay_targets(&index);
494 drop(index); // Release lock before async operations 476 drop(index); // Release lock before async operations
495 477
496 // For each relay, send SpawnRelay action 478 // For each relay, send AddFilters action directly
497 // SyncManager will check if relay already exists 479 // SyncManager's handle_add_filters auto-spawns connection for unknown relays
498 for (relay_url, needs) in targets { 480 for (relay_url, needs) in targets {
499 // Skip our own relay URL (we're subscribed to ourselves via self-subscription) 481 // Skip our own relay URL (we're subscribed to ourselves via self-subscription)
500 if relay_url.contains(&self.relay_domain) { 482 if relay_url.contains(&self.relay_domain) {
501 continue; 483 continue;
502 } 484 }
503 485
504 // Convert needs to HashMap<String, HashSet<EventId>> 486 // Build filters for these repos
505 let mut repos = HashMap::new(); 487 let filters = crate::sync::filters::build_layer2_and_layer3_filters(
506 for repo_id in needs.repos { 488 &needs.repos,
507 repos.insert(repo_id, needs.root_events.clone()); 489 &needs.root_events,
508 } 490 None,
491 );
509 492
510 let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos }; 493 let action = AddFilters {
494 relay_url: relay_url.clone(),
495 repos: needs.repos,
496 root_events: needs.root_events,
497 filters,
498 };
511 499
512 if let Err(e) = self.action_tx.send(action).await { 500 if let Err(e) = self.action_tx.send(action).await {
513 tracing::error!( 501 tracing::error!(
514 relay = %relay_url, 502 relay = %relay_url,
515 error = %e, 503 error = %e,
516 "Failed to send SpawnRelay action" 504 "Failed to send AddFilters action"
517 ); 505 );
518 } else { 506 } else {
519 tracing::debug!( 507 tracing::debug!(
520 relay = %relay_url, 508 relay = %relay_url,
521 "Sent SpawnRelay action to SyncManager" 509 "Sent AddFilters action to SyncManager"
522 ); 510 );
523 } 511 }
524 } 512 }
@@ -531,16 +519,22 @@ impl SelfSubscriber {
531 519
532/// Convert clone URL to relay URL 520/// Convert clone URL to relay URL
533/// 521///
534/// Converts http:// to ws:// and https:// to wss:// 522/// Converts http://domain:port/path.git to ws://domain:port
523/// Converts https://domain:port/path.git to wss://domain:port
524/// Strips the path component to get just the relay URL
535/// Returns None for unsupported URL schemes 525/// Returns None for unsupported URL schemes
536fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { 526fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
537 if clone_url.starts_with("http://") { 527 let (ws_scheme, rest) = if clone_url.starts_with("http://") {
538 Some(clone_url.replacen("http://", "ws://", 1)) 528 ("ws://", clone_url.strip_prefix("http://")?)
539 } else if clone_url.starts_with("https://") { 529 } else if clone_url.starts_with("https://") {
540 Some(clone_url.replacen("https://", "wss://", 1)) 530 ("wss://", clone_url.strip_prefix("https://")?)
541 } else { 531 } else {
542 None 532 return None;
543 } 533 };
534
535 // Extract just the host:port part (everything before the first /)
536 let host_port = rest.split('/').next()?;
537 Some(format!("{}{}", ws_scheme, host_port))
544} 538}
545 539
546#[cfg(test)] 540#[cfg(test)]
@@ -551,7 +545,7 @@ mod tests {
551 fn test_clone_url_to_relay_url_https() { 545 fn test_clone_url_to_relay_url_https() {
552 assert_eq!( 546 assert_eq!(
553 clone_url_to_relay_url("https://example.com/repo.git"), 547 clone_url_to_relay_url("https://example.com/repo.git"),
554 Some("wss://example.com/repo.git".to_string()) 548 Some("wss://example.com".to_string())
555 ); 549 );
556 } 550 }
557 551
@@ -559,7 +553,15 @@ mod tests {
559 fn test_clone_url_to_relay_url_http() { 553 fn test_clone_url_to_relay_url_http() {
560 assert_eq!( 554 assert_eq!(
561 clone_url_to_relay_url("http://localhost:3000/repo.git"), 555 clone_url_to_relay_url("http://localhost:3000/repo.git"),
562 Some("ws://localhost:3000/repo.git".to_string()) 556 Some("ws://localhost:3000".to_string())
557 );
558 }
559
560 #[test]
561 fn test_clone_url_to_relay_url_with_port() {
562 assert_eq!(
563 clone_url_to_relay_url("http://127.0.0.1:41463/test-repo.git"),
564 Some("ws://127.0.0.1:41463".to_string())
563 ); 565 );
564 } 566 }
565 567