upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs106
1 files changed, 54 insertions, 52 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 6f95ad3..8113035 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -15,30 +15,7 @@ use nostr_sdk::prelude::*;
15use nostr_sdk::Timestamp; 15use nostr_sdk::Timestamp;
16use tokio::sync::{broadcast, mpsc}; 16use tokio::sync::{broadcast, mpsc};
17 17
18use super::{RepoSyncIndex, RepoSyncNeeds}; 18use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds};
19
20// =============================================================================
21// RelayAction - Actions to send to SyncManager
22// =============================================================================
23
24/// Actions that the SelfSubscriber sends to the SyncManager
25#[derive(Debug)]
26pub enum RelayAction {
27 /// Spawn a new relay connection
28 SpawnRelay {
29 /// The relay URL to connect to
30 relay_url: String,
31 /// Repos to sync, mapped to their root event IDs
32 repos: HashMap<String, HashSet<EventId>>,
33 },
34 /// Add filters to an existing relay connection
35 AddFilters {
36 /// The relay URL to add filters to
37 relay_url: String,
38 /// Repos to sync, mapped to their root event IDs
39 repos: HashMap<String, HashSet<EventId>>,
40 },
41}
42 19
43// ============================================================================= 20// =============================================================================
44// PendingUpdates - Accumulator for batching 21// PendingUpdates - Accumulator for batching
@@ -100,8 +77,8 @@ pub struct SelfSubscriber {
100 relay_domain: String, 77 relay_domain: String,
101 /// Shared index of repos to sync 78 /// Shared index of repos to sync
102 repo_sync_index: RepoSyncIndex, 79 repo_sync_index: RepoSyncIndex,
103 /// Channel to send actions to SyncManager 80 /// Channel to send AddFilters actions to SyncManager
104 action_tx: mpsc::Sender<RelayAction>, 81 action_tx: mpsc::Sender<AddFilters>,
105 /// Last time we connected - used for since filter on reconnect 82 /// Last time we connected - used for since filter on reconnect
106 last_connected: Option<Timestamp>, 83 last_connected: Option<Timestamp>,
107} 84}
@@ -113,12 +90,12 @@ impl SelfSubscriber {
113 /// * `own_relay_url` - The WebSocket URL of our own relay 90 /// * `own_relay_url` - The WebSocket URL of our own relay
114 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 91 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
115 /// * `repo_sync_index` - Shared index to update with discovered repos 92 /// * `repo_sync_index` - Shared index to update with discovered repos
116 /// * `action_tx` - Channel to send RelayActions to the SyncManager 93 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
117 pub fn new( 94 pub fn new(
118 own_relay_url: String, 95 own_relay_url: String,
119 relay_domain: String, 96 relay_domain: String,
120 repo_sync_index: RepoSyncIndex, 97 repo_sync_index: RepoSyncIndex,
121 action_tx: mpsc::Sender<RelayAction>, 98 action_tx: mpsc::Sender<AddFilters>,
122 ) -> Self { 99 ) -> Self {
123 Self { 100 Self {
124 own_relay_url, 101 own_relay_url,
@@ -163,9 +140,11 @@ impl SelfSubscriber {
163 } 140 }
164 } 141 }
165 "clone" if tag_vec.len() >= 2 => { 142 "clone" if tag_vec.len() >= 2 => {
166 // Convert http(s) clone URL to ws(s) relay URL 143 // Convert ALL http(s) clone URLs to ws(s) relay URLs
167 if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) { 144 for clone_url in tag_vec.iter().skip(1) {
168 relays.insert(relay_url); 145 if let Some(relay_url) = clone_url_to_relay_url(clone_url) {
146 relays.insert(relay_url);
147 }
169 } 148 }
170 } 149 }
171 _ => {} 150 _ => {}
@@ -302,10 +281,13 @@ impl SelfSubscriber {
302 // 30617 announcements don't contribute to root_events - those are 281 // 30617 announcements don't contribute to root_events - those are
303 // the 1617/1618/1619/1621 event IDs that get added when we receive 282 // the 1617/1618/1619/1621 event IDs that get added when we receive
304 // root events via handle_root_event. See mod.rs:71 for details. 283 // root events via handle_root_event. See mod.rs:71 for details.
305 pending.add_repo(repo_id, relays, HashSet::new()); 284 pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new());
306 tracing::debug!( 285 tracing::info!(
307 event_id = %event.id, 286 event_id = %event.id,
308 "Queued 30617 announcement for batch processing" 287 repo_id = %repo_id,
288 relay_count = relays.len(),
289 relays = ?relays,
290 "[DIAG] Queued 30617 announcement for batch processing"
309 ); 291 );
310 } 292 }
311 } else { 293 } else {
@@ -493,32 +475,38 @@ impl SelfSubscriber {
493 let targets = derive_relay_targets(&index); 475 let targets = derive_relay_targets(&index);
494 drop(index); // Release lock before async operations 476 drop(index); // Release lock before async operations
495 477
496 // For each relay, send SpawnRelay action 478 // For each relay, send AddFilters action directly
497 // SyncManager will check if relay already exists 479 // SyncManager's handle_add_filters auto-spawns connection for unknown relays
498 for (relay_url, needs) in targets { 480 for (relay_url, needs) in targets {
499 // Skip our own relay URL (we're subscribed to ourselves via self-subscription) 481 // Skip our own relay URL (we're subscribed to ourselves via self-subscription)
500 if relay_url.contains(&self.relay_domain) { 482 if relay_url.contains(&self.relay_domain) {
501 continue; 483 continue;
502 } 484 }
503 485
504 // Convert needs to HashMap<String, HashSet<EventId>> 486 // Build filters for these repos
505 let mut repos = HashMap::new(); 487 let filters = crate::sync::filters::build_layer2_and_layer3_filters(
506 for repo_id in needs.repos { 488 &needs.repos,
507 repos.insert(repo_id, needs.root_events.clone()); 489 &needs.root_events,
508 } 490 None,
491 );
509 492
510 let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos }; 493 let action = AddFilters {
494 relay_url: relay_url.clone(),
495 repos: needs.repos,
496 root_events: needs.root_events,
497 filters,
498 };
511 499
512 if let Err(e) = self.action_tx.send(action).await { 500 if let Err(e) = self.action_tx.send(action).await {
513 tracing::error!( 501 tracing::error!(
514 relay = %relay_url, 502 relay = %relay_url,
515 error = %e, 503 error = %e,
516 "Failed to send SpawnRelay action" 504 "Failed to send AddFilters action"
517 ); 505 );
518 } else { 506 } else {
519 tracing::debug!( 507 tracing::debug!(
520 relay = %relay_url, 508 relay = %relay_url,
521 "Sent SpawnRelay action to SyncManager" 509 "Sent AddFilters action to SyncManager"
522 ); 510 );
523 } 511 }
524 } 512 }
@@ -531,16 +519,22 @@ impl SelfSubscriber {
531 519
532/// Convert clone URL to relay URL 520/// Convert clone URL to relay URL
533/// 521///
534/// Converts http:// to ws:// and https:// to wss:// 522/// Converts http://domain:port/path.git to ws://domain:port
523/// Converts https://domain:port/path.git to wss://domain:port
524/// Strips the path component to get just the relay URL
535/// Returns None for unsupported URL schemes 525/// Returns None for unsupported URL schemes
536fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { 526fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
537 if clone_url.starts_with("http://") { 527 let (ws_scheme, rest) = if clone_url.starts_with("http://") {
538 Some(clone_url.replacen("http://", "ws://", 1)) 528 ("ws://", clone_url.strip_prefix("http://")?)
539 } else if clone_url.starts_with("https://") { 529 } else if clone_url.starts_with("https://") {
540 Some(clone_url.replacen("https://", "wss://", 1)) 530 ("wss://", clone_url.strip_prefix("https://")?)
541 } else { 531 } else {
542 None 532 return None;
543 } 533 };
534
535 // Extract just the host:port part (everything before the first /)
536 let host_port = rest.split('/').next()?;
537 Some(format!("{}{}", ws_scheme, host_port))
544} 538}
545 539
546#[cfg(test)] 540#[cfg(test)]
@@ -551,7 +545,7 @@ mod tests {
551 fn test_clone_url_to_relay_url_https() { 545 fn test_clone_url_to_relay_url_https() {
552 assert_eq!( 546 assert_eq!(
553 clone_url_to_relay_url("https://example.com/repo.git"), 547 clone_url_to_relay_url("https://example.com/repo.git"),
554 Some("wss://example.com/repo.git".to_string()) 548 Some("wss://example.com".to_string())
555 ); 549 );
556 } 550 }
557 551
@@ -559,7 +553,15 @@ mod tests {
559 fn test_clone_url_to_relay_url_http() { 553 fn test_clone_url_to_relay_url_http() {
560 assert_eq!( 554 assert_eq!(
561 clone_url_to_relay_url("http://localhost:3000/repo.git"), 555 clone_url_to_relay_url("http://localhost:3000/repo.git"),
562 Some("ws://localhost:3000/repo.git".to_string()) 556 Some("ws://localhost:3000".to_string())
557 );
558 }
559
560 #[test]
561 fn test_clone_url_to_relay_url_with_port() {
562 assert_eq!(
563 clone_url_to_relay_url("http://127.0.0.1:41463/test-repo.git"),
564 Some("ws://127.0.0.1:41463".to_string())
563 ); 565 );
564 } 566 }
565 567