upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/self_subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/self_subscriber.rs')
-rw-r--r--src/sync/self_subscriber.rs42
1 files changed, 26 insertions, 16 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index f83b081..e29e45b 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -49,7 +49,12 @@ impl PendingUpdates {
49 } 49 }
50 50
51 /// Add or update a repo with its relays and root events 51 /// Add or update a repo with its relays and root events
52 fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) { 52 fn add_repo(
53 &mut self,
54 repo_id: String,
55 relays: HashSet<String>,
56 root_events: HashSet<EventId>,
57 ) {
53 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { 58 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
54 relays: HashSet::new(), 59 relays: HashSet::new(),
55 root_events: HashSet::new(), 60 root_events: HashSet::new(),
@@ -251,9 +256,9 @@ impl SelfSubscriber {
251 /// 256 ///
252 /// Returns true if any extracted relay URL contains our domain 257 /// Returns true if any extracted relay URL contains our domain
253 fn lists_our_relay(&self, event: &Event) -> bool { 258 fn lists_our_relay(&self, event: &Event) -> bool {
254 Self::extract_relay_urls(event).iter().any(|url| { 259 Self::extract_relay_urls(event)
255 url.contains(&self.relay_domain) || url == &self.own_relay_url 260 .iter()
256 }) 261 .any(|url| url.contains(&self.relay_domain) || url == &self.own_relay_url)
257 } 262 }
258 263
259 /// Main run loop 264 /// Main run loop
@@ -413,21 +418,21 @@ impl SelfSubscriber {
413 if let Some(repo_sync) = index.get_mut(&repo_ref) { 418 if let Some(repo_sync) = index.get_mut(&repo_ref) {
414 // Add event.id to root_events set in the index (immediate availability) 419 // Add event.id to root_events set in the index (immediate availability)
415 repo_sync.root_events.insert(event.id); 420 repo_sync.root_events.insert(event.id);
416 421
417 // Clone the relays before releasing the lock - Layer 3 filters need to be 422 // Clone the relays before releasing the lock - Layer 3 filters need to be
418 // sent to the same relays as Layer 2 filters for this repo 423 // sent to the same relays as Layer 2 filters for this repo
419 let relays = repo_sync.relays.clone(); 424 let relays = repo_sync.relays.clone();
420 425
421 // Release lock before modifying pending 426 // Release lock before modifying pending
422 drop(index); 427 drop(index);
423 428
424 // Also add root event to pending - this ensures batch processing runs 429 // Also add root event to pending - this ensures batch processing runs
425 // and creates Layer 3 filters for events referencing this root event. 430 // and creates Layer 3 filters for events referencing this root event.
426 // CRITICAL: Include relays so derive_relay_targets knows where to send filters! 431 // CRITICAL: Include relays so derive_relay_targets knows where to send filters!
427 let mut root_events = HashSet::new(); 432 let mut root_events = HashSet::new();
428 root_events.insert(event.id); 433 root_events.insert(event.id);
429 pending.add_repo(repo_ref.clone(), relays.clone(), root_events); 434 pending.add_repo(repo_ref.clone(), relays.clone(), root_events);
430 435
431 tracing::debug!( 436 tracing::debug!(
432 event_id = %event.id, 437 event_id = %event.id,
433 repo_ref = %repo_ref, 438 repo_ref = %repo_ref,
@@ -475,10 +480,12 @@ impl SelfSubscriber {
475 480
476 for (repo_id, needs) in updates { 481 for (repo_id, needs) in updates {
477 // Merge with existing entry or insert new 482 // Merge with existing entry or insert new
478 let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { 483 let entry = index
479 relays: HashSet::new(), 484 .entry(repo_id.clone())
480 root_events: HashSet::new(), 485 .or_insert_with(|| RepoSyncNeeds {
481 }); 486 relays: HashSet::new(),
487 root_events: HashSet::new(),
488 });
482 entry.relays.extend(needs.relays); 489 entry.relays.extend(needs.relays);
483 entry.root_events.extend(needs.root_events); 490 entry.root_events.extend(needs.root_events);
484 491
@@ -556,7 +563,7 @@ fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
556 } else { 563 } else {
557 return None; 564 return None;
558 }; 565 };
559 566
560 // Extract just the host:port part (everything before the first /) 567 // Extract just the host:port part (everything before the first /)
561 let host_port = rest.split('/').next()?; 568 let host_port = rest.split('/').next()?;
562 Some(format!("{}{}", ws_scheme, host_port)) 569 Some(format!("{}{}", ws_scheme, host_port))
@@ -581,7 +588,7 @@ mod tests {
581 Some("ws://localhost:3000".to_string()) 588 Some("ws://localhost:3000".to_string())
582 ); 589 );
583 } 590 }
584 591
585 #[test] 592 #[test]
586 fn test_clone_url_to_relay_url_with_port() { 593 fn test_clone_url_to_relay_url_with_port() {
587 assert_eq!( 594 assert_eq!(
@@ -593,6 +600,9 @@ mod tests {
593 #[test] 600 #[test]
594 fn test_clone_url_to_relay_url_unsupported() { 601 fn test_clone_url_to_relay_url_unsupported() {
595 assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); 602 assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None);
596 assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); 603 assert_eq!(
604 clone_url_to_relay_url("ssh://git@example.com/repo.git"),
605 None
606 );
597 } 607 }
598} \ No newline at end of file 608}