diff options
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index f83b081..e29e45b 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -49,7 +49,12 @@ impl PendingUpdates { | |||
| 49 | } | 49 | } |
| 50 | 50 | ||
| 51 | /// Add or update a repo with its relays and root events | 51 | /// Add or update a repo with its relays and root events |
| 52 | fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) { | 52 | fn add_repo( |
| 53 | &mut self, | ||
| 54 | repo_id: String, | ||
| 55 | relays: HashSet<String>, | ||
| 56 | root_events: HashSet<EventId>, | ||
| 57 | ) { | ||
| 53 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { | 58 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { |
| 54 | relays: HashSet::new(), | 59 | relays: HashSet::new(), |
| 55 | root_events: HashSet::new(), | 60 | root_events: HashSet::new(), |
| @@ -251,9 +256,9 @@ impl SelfSubscriber { | |||
| 251 | /// | 256 | /// |
| 252 | /// Returns true if any extracted relay URL contains our domain | 257 | /// Returns true if any extracted relay URL contains our domain |
| 253 | fn lists_our_relay(&self, event: &Event) -> bool { | 258 | fn lists_our_relay(&self, event: &Event) -> bool { |
| 254 | Self::extract_relay_urls(event).iter().any(|url| { | 259 | Self::extract_relay_urls(event) |
| 255 | url.contains(&self.relay_domain) || url == &self.own_relay_url | 260 | .iter() |
| 256 | }) | 261 | .any(|url| url.contains(&self.relay_domain) || url == &self.own_relay_url) |
| 257 | } | 262 | } |
| 258 | 263 | ||
| 259 | /// Main run loop | 264 | /// Main run loop |
| @@ -413,21 +418,21 @@ impl SelfSubscriber { | |||
| 413 | if let Some(repo_sync) = index.get_mut(&repo_ref) { | 418 | if let Some(repo_sync) = index.get_mut(&repo_ref) { |
| 414 | // Add event.id to root_events set in the index (immediate availability) | 419 | // Add event.id to root_events set in the index (immediate availability) |
| 415 | repo_sync.root_events.insert(event.id); | 420 | repo_sync.root_events.insert(event.id); |
| 416 | 421 | ||
| 417 | // Clone the relays before releasing the lock - Layer 3 filters need to be | 422 | // Clone the relays before releasing the lock - Layer 3 filters need to be |
| 418 | // sent to the same relays as Layer 2 filters for this repo | 423 | // sent to the same relays as Layer 2 filters for this repo |
| 419 | let relays = repo_sync.relays.clone(); | 424 | let relays = repo_sync.relays.clone(); |
| 420 | 425 | ||
| 421 | // Release lock before modifying pending | 426 | // Release lock before modifying pending |
| 422 | drop(index); | 427 | drop(index); |
| 423 | 428 | ||
| 424 | // Also add root event to pending - this ensures batch processing runs | 429 | // Also add root event to pending - this ensures batch processing runs |
| 425 | // and creates Layer 3 filters for events referencing this root event. | 430 | // and creates Layer 3 filters for events referencing this root event. |
| 426 | // CRITICAL: Include relays so derive_relay_targets knows where to send filters! | 431 | // CRITICAL: Include relays so derive_relay_targets knows where to send filters! |
| 427 | let mut root_events = HashSet::new(); | 432 | let mut root_events = HashSet::new(); |
| 428 | root_events.insert(event.id); | 433 | root_events.insert(event.id); |
| 429 | pending.add_repo(repo_ref.clone(), relays.clone(), root_events); | 434 | pending.add_repo(repo_ref.clone(), relays.clone(), root_events); |
| 430 | 435 | ||
| 431 | tracing::debug!( | 436 | tracing::debug!( |
| 432 | event_id = %event.id, | 437 | event_id = %event.id, |
| 433 | repo_ref = %repo_ref, | 438 | repo_ref = %repo_ref, |
| @@ -475,10 +480,12 @@ impl SelfSubscriber { | |||
| 475 | 480 | ||
| 476 | for (repo_id, needs) in updates { | 481 | for (repo_id, needs) in updates { |
| 477 | // Merge with existing entry or insert new | 482 | // Merge with existing entry or insert new |
| 478 | let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { | 483 | let entry = index |
| 479 | relays: HashSet::new(), | 484 | .entry(repo_id.clone()) |
| 480 | root_events: HashSet::new(), | 485 | .or_insert_with(|| RepoSyncNeeds { |
| 481 | }); | 486 | relays: HashSet::new(), |
| 487 | root_events: HashSet::new(), | ||
| 488 | }); | ||
| 482 | entry.relays.extend(needs.relays); | 489 | entry.relays.extend(needs.relays); |
| 483 | entry.root_events.extend(needs.root_events); | 490 | entry.root_events.extend(needs.root_events); |
| 484 | 491 | ||
| @@ -556,7 +563,7 @@ fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | |||
| 556 | } else { | 563 | } else { |
| 557 | return None; | 564 | return None; |
| 558 | }; | 565 | }; |
| 559 | 566 | ||
| 560 | // Extract just the host:port part (everything before the first /) | 567 | // Extract just the host:port part (everything before the first /) |
| 561 | let host_port = rest.split('/').next()?; | 568 | let host_port = rest.split('/').next()?; |
| 562 | Some(format!("{}{}", ws_scheme, host_port)) | 569 | Some(format!("{}{}", ws_scheme, host_port)) |
| @@ -581,7 +588,7 @@ mod tests { | |||
| 581 | Some("ws://localhost:3000".to_string()) | 588 | Some("ws://localhost:3000".to_string()) |
| 582 | ); | 589 | ); |
| 583 | } | 590 | } |
| 584 | 591 | ||
| 585 | #[test] | 592 | #[test] |
| 586 | fn test_clone_url_to_relay_url_with_port() { | 593 | fn test_clone_url_to_relay_url_with_port() { |
| 587 | assert_eq!( | 594 | assert_eq!( |
| @@ -593,6 +600,9 @@ mod tests { | |||
| 593 | #[test] | 600 | #[test] |
| 594 | fn test_clone_url_to_relay_url_unsupported() { | 601 | fn test_clone_url_to_relay_url_unsupported() { |
| 595 | assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); | 602 | assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); |
| 596 | assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); | 603 | assert_eq!( |
| 604 | clone_url_to_relay_url("ssh://git@example.com/repo.git"), | ||
| 605 | None | ||
| 606 | ); | ||
| 597 | } | 607 | } |
| 598 | } \ No newline at end of file | 608 | } |