diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 09:07:54 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 09:07:54 +0000 |
| commit | 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b (patch) | |
| tree | 013aaa9fd64fd2ef4d1a1c47a7fc348f0e6a92de /src | |
| parent | 7e68b71558c8f6d3f2aa1d3bf18e77eec335343d (diff) | |
improve sync design
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/mod.rs | 335 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 185 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 497 |
3 files changed, 986 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 71f91e2..9dec982 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -37,17 +37,26 @@ | |||
| 37 | //! for the complete design context. | 37 | //! for the complete design context. |
| 38 | 38 | ||
| 39 | use std::collections::{HashMap, HashSet}; | 39 | use std::collections::{HashMap, HashSet}; |
| 40 | use std::net::SocketAddr; | ||
| 40 | use std::sync::Arc; | 41 | use std::sync::Arc; |
| 41 | 42 | ||
| 42 | use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; | 43 | use nostr_relay_builder::prelude::{ |
| 44 | DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy, | ||
| 45 | }; | ||
| 46 | use nostr_sdk::prelude::*; | ||
| 43 | use nostr_sdk::EventId; | 47 | use nostr_sdk::EventId; |
| 44 | use tokio::sync::RwLock; | 48 | use tokio::sync::{mpsc, RwLock}; |
| 45 | 49 | ||
| 46 | use crate::config::Config; | 50 | use crate::config::Config; |
| 47 | use crate::nostr::builder::Nip34WritePolicy; | 51 | use crate::nostr::builder::Nip34WritePolicy; |
| 48 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | 52 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; |
| 49 | use crate::nostr::SharedDatabase; | 53 | use crate::nostr::SharedDatabase; |
| 50 | 54 | ||
| 55 | mod relay_connection; | ||
| 56 | mod self_subscriber; | ||
| 57 | pub use relay_connection::{RelayConnection, RelayEvent}; | ||
| 58 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | ||
| 59 | |||
| 51 | // ============================================================================= | 60 | // ============================================================================= |
| 52 | // Type Aliases for Sync State | 61 | // Type Aliases for Sync State |
| 53 | // ============================================================================= | 62 | // ============================================================================= |
| @@ -176,7 +185,7 @@ pub fn new_sync_relays() -> SyncRelays { | |||
| 176 | /// The SyncManager is responsible for: | 185 | /// The SyncManager is responsible for: |
| 177 | /// - Discovering relays from stored repository announcements | 186 | /// - Discovering relays from stored repository announcements |
| 178 | /// - Maintaining connections to sync relays | 187 | /// - Maintaining connections to sync relays |
| 179 | /// - Subscribing to events at external relays | 188 | /// - Subscribing to events at external relays |
| 180 | /// - Applying the acceptance policy to synced events | 189 | /// - Applying the acceptance policy to synced events |
| 181 | /// | 190 | /// |
| 182 | /// ## Lifecycle | 191 | /// ## Lifecycle |
| @@ -186,14 +195,16 @@ pub fn new_sync_relays() -> SyncRelays { | |||
| 186 | /// | 195 | /// |
| 187 | /// ## Current Status | 196 | /// ## Current Status |
| 188 | /// | 197 | /// |
| 189 | /// This is a stub implementation. The core data structures are: | 198 | /// Phase 2 implementation supports: |
| 199 | /// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter | ||
| 200 | /// - Event processing through write policy | ||
| 201 | /// - Storage of accepted events | ||
| 202 | /// | ||
| 203 | /// Core data structures: | ||
| 190 | /// - [`FollowingRepoRootEvents`]: Repository root events we're following | 204 | /// - [`FollowingRepoRootEvents`]: Repository root events we're following |
| 191 | /// - [`SyncRelays`]: Relays we sync from with their repos and events | 205 | /// - [`SyncRelays`]: Relays we sync from with their repos and events |
| 192 | /// | ||
| 193 | /// Full implementation will come in later phases. | ||
| 194 | pub struct SyncManager { | 206 | pub struct SyncManager { |
| 195 | /// Bootstrap relay URL if configured | 207 | /// Bootstrap relay URL if configured |
| 196 | #[allow(dead_code)] | ||
| 197 | bootstrap_relay_url: Option<String>, | 208 | bootstrap_relay_url: Option<String>, |
| 198 | 209 | ||
| 199 | /// Our service domain for filtering repo announcements | 210 | /// Our service domain for filtering repo announcements |
| @@ -201,11 +212,9 @@ pub struct SyncManager { | |||
| 201 | service_domain: String, | 212 | service_domain: String, |
| 202 | 213 | ||
| 203 | /// Database for querying/storing events | 214 | /// Database for querying/storing events |
| 204 | #[allow(dead_code)] | ||
| 205 | database: SharedDatabase, | 215 | database: SharedDatabase, |
| 206 | 216 | ||
| 207 | /// Write policy for applying acceptance rules | 217 | /// Write policy for applying acceptance rules |
| 208 | #[allow(dead_code)] | ||
| 209 | write_policy: Nip34WritePolicy, | 218 | write_policy: Nip34WritePolicy, |
| 210 | 219 | ||
| 211 | /// Repository root events we're following (Phase 1 data structure) | 220 | /// Repository root events we're following (Phase 1 data structure) |
| @@ -219,6 +228,9 @@ pub struct SyncManager { | |||
| 219 | /// Max backoff duration for relay reconnection | 228 | /// Max backoff duration for relay reconnection |
| 220 | #[allow(dead_code)] | 229 | #[allow(dead_code)] |
| 221 | max_backoff_secs: u64, | 230 | max_backoff_secs: u64, |
| 231 | |||
| 232 | /// Socket address used for sync source (for write policy) | ||
| 233 | sync_source_addr: SocketAddr, | ||
| 222 | } | 234 | } |
| 223 | 235 | ||
| 224 | impl SyncManager { | 236 | impl SyncManager { |
| @@ -238,6 +250,10 @@ impl SyncManager { | |||
| 238 | write_policy: Nip34WritePolicy, | 250 | write_policy: Nip34WritePolicy, |
| 239 | config: &Config, | 251 | config: &Config, |
| 240 | ) -> Self { | 252 | ) -> Self { |
| 253 | // Create a synthetic SocketAddr for sync source identification | ||
| 254 | // This is used when calling write_policy.admit_event() for synced events | ||
| 255 | let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); | ||
| 256 | |||
| 241 | Self { | 257 | Self { |
| 242 | bootstrap_relay_url, | 258 | bootstrap_relay_url, |
| 243 | service_domain, | 259 | service_domain, |
| @@ -246,6 +262,7 @@ impl SyncManager { | |||
| 246 | following_repo_root_events: new_following_repo_root_events(), | 262 | following_repo_root_events: new_following_repo_root_events(), |
| 247 | sync_relays: new_sync_relays(), | 263 | sync_relays: new_sync_relays(), |
| 248 | max_backoff_secs: config.sync_max_backoff_secs, | 264 | max_backoff_secs: config.sync_max_backoff_secs, |
| 265 | sync_source_addr, | ||
| 249 | } | 266 | } |
| 250 | } | 267 | } |
| 251 | 268 | ||
| @@ -460,14 +477,12 @@ impl SyncManager { | |||
| 460 | /// }); | 477 | /// }); |
| 461 | /// ``` | 478 | /// ``` |
| 462 | /// | 479 | /// |
| 463 | /// ## Current Status | 480 | /// ## Implementation Status |
| 464 | /// | 481 | /// |
| 465 | /// This is a stub that logs and then waits indefinitely. | 482 | /// - Phase 2: Layer 1 sync from bootstrap relay ✓ |
| 466 | /// Full implementation includes: | 483 | /// - Phase 3: Self-subscription and relay discovery ✓ |
| 467 | /// - Phase 2: Database initialization queries ✓ | 484 | /// - Phase 4-6: Filter building, connection management (TODO) |
| 468 | /// - Phase 3: Self-subscription for incremental updates | 485 | /// - Phase 7: Full sync loop (TODO) |
| 469 | /// - Phase 4-6: Filter building, connection management | ||
| 470 | /// - Phase 7: Full sync loop | ||
| 471 | pub async fn run(self) { | 486 | pub async fn run(self) { |
| 472 | tracing::info!( | 487 | tracing::info!( |
| 473 | "SyncManager starting (bootstrap_relay={:?}, domain={})", | 488 | "SyncManager starting (bootstrap_relay={:?}, domain={})", |
| @@ -475,27 +490,285 @@ impl SyncManager { | |||
| 475 | self.service_domain | 490 | self.service_domain |
| 476 | ); | 491 | ); |
| 477 | 492 | ||
| 478 | // Phase 2: Initialize from database | 493 | // Phase 3: Initialize state from database BEFORE spawning connections |
| 479 | if let Err(e) = self.initialize_from_database().await { | 494 | if let Err(e) = self.initialize_from_database().await { |
| 480 | tracing::error!("Failed to initialize sync state from database: {}", e); | 495 | tracing::error!("Failed to initialize from database: {}", e); |
| 481 | // Continue anyway - we can still receive events via self-subscription | 496 | // Continue anyway - we can still sync from bootstrap |
| 482 | } | 497 | } |
| 483 | 498 | ||
| 484 | // Log initialization results | 499 | // Create channel for relay actions from self-subscriber |
| 485 | { | 500 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); |
| 486 | let following_count = self.following_repo_root_events.read().await.len(); | 501 | |
| 487 | let sync_relays_count = self.sync_relays.read().await.len(); | 502 | // Construct our own relay URL for self-subscription |
| 488 | tracing::info!( | 503 | let own_relay_url = format!("ws://{}", self.service_domain); |
| 489 | "Sync state initialized: {} repos tracked, {} sync relays", | 504 | |
| 490 | following_count, | 505 | // Spawn self-subscriber task |
| 491 | sync_relays_count | 506 | let self_subscriber = SelfSubscriber::new( |
| 492 | ); | 507 | own_relay_url.clone(), |
| 508 | self.service_domain.clone(), | ||
| 509 | Arc::clone(&self.following_repo_root_events), | ||
| 510 | Arc::clone(&self.sync_relays), | ||
| 511 | action_tx, | ||
| 512 | ); | ||
| 513 | |||
| 514 | tokio::spawn(async move { | ||
| 515 | self_subscriber.run().await; | ||
| 516 | }); | ||
| 517 | |||
| 518 | tracing::info!("SelfSubscriber spawned for {}", own_relay_url); | ||
| 519 | |||
| 520 | // Track active relay connections (relay_url -> event_sender) | ||
| 521 | let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new(); | ||
| 522 | |||
| 523 | // Phase 2: Connect to bootstrap relay if configured | ||
| 524 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | ||
| 525 | if let Some(event_tx) = self | ||
| 526 | .spawn_relay_connection(bootstrap_url.clone(), None) | ||
| 527 | .await | ||
| 528 | { | ||
| 529 | active_relays.insert(bootstrap_url.clone(), event_tx); | ||
| 530 | } | ||
| 493 | } | 531 | } |
| 494 | 532 | ||
| 495 | // Stub: wait indefinitely until full implementation (Phases 3-7) | 533 | // Main coordination loop |
| 496 | // This prevents the spawned task from immediately completing | ||
| 497 | loop { | 534 | loop { |
| 498 | tokio::time::sleep(std::time::Duration::from_secs(3600)).await; | 535 | tokio::select! { |
| 536 | // Handle relay actions from self-subscriber | ||
| 537 | action = action_rx.recv() => { | ||
| 538 | match action { | ||
| 539 | Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => { | ||
| 540 | tracing::info!("Spawning new relay connection to {}", relay_url); | ||
| 541 | if !active_relays.contains_key(&relay_url) { | ||
| 542 | if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await { | ||
| 543 | active_relays.insert(relay_url, event_tx); | ||
| 544 | } | ||
| 545 | } | ||
| 546 | } | ||
| 547 | Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => { | ||
| 548 | tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len()); | ||
| 549 | // TODO: Implement filter updates for existing connections | ||
| 550 | } | ||
| 551 | None => { | ||
| 552 | tracing::info!("Action channel closed, continuing without self-subscriber"); | ||
| 553 | } | ||
| 554 | } | ||
| 555 | } | ||
| 556 | // Sleep to prevent busy loop when no events | ||
| 557 | _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { | ||
| 558 | // Periodic maintenance could go here | ||
| 559 | } | ||
| 560 | } | ||
| 561 | } | ||
| 562 | } | ||
| 563 | |||
| 564 | /// Spawn a relay connection with optional Layer 2 filters. | ||
| 565 | /// | ||
| 566 | /// Returns the event sender channel if successfully spawned. | ||
| 567 | async fn spawn_relay_connection( | ||
| 568 | &self, | ||
| 569 | relay_url: String, | ||
| 570 | repos: Option<HashMap<String, HashSet<EventId>>>, | ||
| 571 | ) -> Option<mpsc::Sender<RelayEvent>> { | ||
| 572 | // Create channel for receiving events | ||
| 573 | let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100); | ||
| 574 | |||
| 575 | // Create connection | ||
| 576 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 577 | |||
| 578 | // Determine if this is bootstrap (no repos) or discovered relay (with repos) | ||
| 579 | let is_bootstrap = repos.is_none(); | ||
| 580 | |||
| 581 | match connection.connect_and_subscribe().await { | ||
| 582 | Ok(()) => { | ||
| 583 | if is_bootstrap { | ||
| 584 | tracing::info!("Bootstrap relay connection established: {}", relay_url); | ||
| 585 | } else { | ||
| 586 | tracing::info!( | ||
| 587 | "Discovered relay connection established: {} (with Layer 2 filters)", | ||
| 588 | relay_url | ||
| 589 | ); | ||
| 590 | |||
| 591 | // Add Layer 2 subscription for repo events | ||
| 592 | if let Some(ref repos) = repos { | ||
| 593 | if let Err(e) = self.add_layer2_subscription(&connection, repos).await { | ||
| 594 | tracing::warn!("Failed to add Layer 2 subscription: {}", e); | ||
| 595 | } | ||
| 596 | } | ||
| 597 | } | ||
| 598 | |||
| 599 | // Clone refs needed for event processing task | ||
| 600 | let database = Arc::clone(&self.database); | ||
| 601 | let write_policy = self.write_policy.clone(); | ||
| 602 | let sync_source_addr = self.sync_source_addr; | ||
| 603 | |||
| 604 | // Clone event_tx for the spawned task | ||
| 605 | let event_tx_clone = event_tx.clone(); | ||
| 606 | |||
| 607 | // Spawn event loop task | ||
| 608 | let conn_url = relay_url.clone(); | ||
| 609 | tokio::spawn(async move { | ||
| 610 | connection.run_event_loop(event_tx_clone).await; | ||
| 611 | }); | ||
| 612 | |||
| 613 | // Spawn event processing task | ||
| 614 | tokio::spawn(async move { | ||
| 615 | Self::process_relay_events( | ||
| 616 | event_rx, | ||
| 617 | database, | ||
| 618 | write_policy, | ||
| 619 | sync_source_addr, | ||
| 620 | conn_url, | ||
| 621 | ) | ||
| 622 | .await; | ||
| 623 | }); | ||
| 624 | |||
| 625 | Some(event_tx) | ||
| 626 | } | ||
| 627 | Err(e) => { | ||
| 628 | tracing::error!("Failed to connect to relay {}: {}", relay_url, e); | ||
| 629 | None | ||
| 630 | } | ||
| 631 | } | ||
| 632 | } | ||
| 633 | |||
| 634 | /// Add Layer 2 subscription for repo-related events. | ||
| 635 | /// | ||
| 636 | /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track. | ||
| 637 | async fn add_layer2_subscription( | ||
| 638 | &self, | ||
| 639 | connection: &RelayConnection, | ||
| 640 | repos: &HashMap<String, HashSet<EventId>>, | ||
| 641 | ) -> Result<(), String> { | ||
| 642 | if repos.is_empty() { | ||
| 643 | return Ok(()); | ||
| 644 | } | ||
| 645 | |||
| 646 | // Build repo refs list for filter | ||
| 647 | let repo_refs: Vec<String> = repos.keys().cloned().collect(); | ||
| 648 | |||
| 649 | tracing::debug!( | ||
| 650 | "Adding Layer 2 subscription for {} repos to {}", | ||
| 651 | repo_refs.len(), | ||
| 652 | connection.url() | ||
| 653 | ); | ||
| 654 | |||
| 655 | // Chunk repo_refs into groups of 100 (per plan) | ||
| 656 | for chunk in repo_refs.chunks(100) { | ||
| 657 | // Build filter with lowercase 'a' tag for each repo ref | ||
| 658 | let mut filter = Filter::new().kinds([ | ||
| 659 | Kind::GitPatch, // 1617 | ||
| 660 | Kind::Custom(1618), // PR | ||
| 661 | Kind::Custom(1619), // PR update | ||
| 662 | Kind::GitIssue, // 1621 | ||
| 663 | ]); | ||
| 664 | |||
| 665 | // Add each repo ref as a custom tag filter | ||
| 666 | for repo_ref in chunk { | ||
| 667 | filter = | ||
| 668 | filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone()); | ||
| 669 | } | ||
| 670 | |||
| 671 | // Subscribe to this filter | ||
| 672 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 673 | return Err(format!("Failed to subscribe with Layer 2 filter: {}", e)); | ||
| 674 | } | ||
| 675 | } | ||
| 676 | |||
| 677 | Ok(()) | ||
| 678 | } | ||
| 679 | |||
| 680 | /// Process events from a single relay connection. | ||
| 681 | /// | ||
| 682 | /// This is a static method that runs in its own task. | ||
| 683 | async fn process_relay_events( | ||
| 684 | mut event_rx: mpsc::Receiver<RelayEvent>, | ||
| 685 | database: SharedDatabase, | ||
| 686 | write_policy: Nip34WritePolicy, | ||
| 687 | sync_source_addr: SocketAddr, | ||
| 688 | relay_url: String, | ||
| 689 | ) { | ||
| 690 | tracing::debug!("Starting event processing for relay: {}", relay_url); | ||
| 691 | |||
| 692 | while let Some(relay_event) = event_rx.recv().await { | ||
| 693 | match relay_event { | ||
| 694 | RelayEvent::Event(event) => { | ||
| 695 | Self::process_single_event_static( | ||
| 696 | &event, | ||
| 697 | &database, | ||
| 698 | &write_policy, | ||
| 699 | &sync_source_addr, | ||
| 700 | &relay_url, | ||
| 701 | ) | ||
| 702 | .await; | ||
| 703 | } | ||
| 704 | RelayEvent::EndOfStoredEvents => { | ||
| 705 | tracing::debug!("EOSE received from {}", relay_url); | ||
| 706 | } | ||
| 707 | RelayEvent::Closed(reason) => { | ||
| 708 | tracing::warn!("Connection to {} closed: {}", relay_url, reason); | ||
| 709 | break; | ||
| 710 | } | ||
| 711 | } | ||
| 712 | } | ||
| 713 | |||
| 714 | tracing::info!("Event processing ended for relay: {}", relay_url); | ||
| 715 | } | ||
| 716 | |||
| 717 | /// Process a single event (static version for use in spawned tasks). | ||
| 718 | async fn process_single_event_static( | ||
| 719 | event: &Event, | ||
| 720 | database: &SharedDatabase, | ||
| 721 | write_policy: &Nip34WritePolicy, | ||
| 722 | sync_source_addr: &SocketAddr, | ||
| 723 | relay_url: &str, | ||
| 724 | ) { | ||
| 725 | let event_id = event.id; | ||
| 726 | let kind = event.kind.as_u16(); | ||
| 727 | |||
| 728 | // Check if event already exists in database | ||
| 729 | match database.check_id(&event_id).await { | ||
| 730 | Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => { | ||
| 731 | tracing::trace!("Event {} already exists, skipping", event_id); | ||
| 732 | return; | ||
| 733 | } | ||
| 734 | Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing | ||
| 735 | Err(e) => { | ||
| 736 | tracing::warn!("Failed to check if event {} exists: {}", event_id, e); | ||
| 737 | } | ||
| 738 | } | ||
| 739 | |||
| 740 | // Pass through write policy | ||
| 741 | let policy_result = write_policy.admit_event(event, sync_source_addr).await; | ||
| 742 | |||
| 743 | match policy_result { | ||
| 744 | PolicyResult::Accept => match database.save_event(event).await { | ||
| 745 | Ok(SaveEventStatus::Success) => { | ||
| 746 | tracing::info!( | ||
| 747 | "Synced event {} (kind {}) from {}", | ||
| 748 | event_id, | ||
| 749 | kind, | ||
| 750 | relay_url | ||
| 751 | ); | ||
| 752 | } | ||
| 753 | Ok(_) => { | ||
| 754 | tracing::debug!( | ||
| 755 | "Event {} (kind {}) already stored or rejected by database", | ||
| 756 | event_id, | ||
| 757 | kind | ||
| 758 | ); | ||
| 759 | } | ||
| 760 | Err(e) => { | ||
| 761 | tracing::error!("Failed to save synced event {}: {}", event_id, e); | ||
| 762 | } | ||
| 763 | }, | ||
| 764 | PolicyResult::Reject(reason) => { | ||
| 765 | tracing::debug!( | ||
| 766 | "Rejected synced event {} (kind {}): {}", | ||
| 767 | event_id, | ||
| 768 | kind, | ||
| 769 | reason | ||
| 770 | ); | ||
| 771 | } | ||
| 499 | } | 772 | } |
| 500 | } | 773 | } |
| 501 | } | 774 | } |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs new file mode 100644 index 0000000..71b5d51 --- /dev/null +++ b/src/sync/relay_connection.rs | |||
| @@ -0,0 +1,185 @@ | |||
| 1 | //! Relay Connection for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module handles connecting to external relays and receiving events | ||
| 4 | //! for the proactive sync system. | ||
| 5 | |||
| 6 | use std::time::Duration; | ||
| 7 | |||
| 8 | use nostr_sdk::prelude::*; | ||
| 9 | use tokio::sync::mpsc; | ||
| 10 | |||
| 11 | use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; | ||
| 12 | |||
| 13 | /// Events received from a relay connection | ||
| 14 | #[derive(Debug)] | ||
| 15 | pub enum RelayEvent { | ||
| 16 | /// A nostr event was received | ||
| 17 | Event(Event), | ||
| 18 | /// End of stored events (EOSE) received | ||
| 19 | EndOfStoredEvents, | ||
| 20 | /// Connection was closed | ||
| 21 | Closed(String), | ||
| 22 | } | ||
| 23 | |||
| 24 | /// Connection to an external relay for syncing events. | ||
| 25 | /// | ||
| 26 | /// RelayConnection handles: | ||
| 27 | /// - Connecting to the relay | ||
| 28 | /// - Subscribing with appropriate filters (Layer 1 for bootstrap) | ||
| 29 | /// - Receiving events and sending them through a channel | ||
| 30 | pub struct RelayConnection { | ||
| 31 | /// The relay URL | ||
| 32 | url: String, | ||
| 33 | /// The nostr-sdk client | ||
| 34 | client: Client, | ||
| 35 | } | ||
| 36 | |||
| 37 | impl RelayConnection { | ||
| 38 | /// Create a new relay connection. | ||
| 39 | /// | ||
| 40 | /// # Arguments | ||
| 41 | /// | ||
| 42 | /// * `url` - The WebSocket URL of the relay to connect to | ||
| 43 | pub fn new(url: String) -> Self { | ||
| 44 | // Create a client with generated keys (we're just subscribing, not publishing) | ||
| 45 | let keys = Keys::generate(); | ||
| 46 | let client = Client::new(keys); | ||
| 47 | |||
| 48 | Self { url, client } | ||
| 49 | } | ||
| 50 | |||
| 51 | /// Connect to the relay and subscribe with Layer 1 filter. | ||
| 52 | /// | ||
| 53 | /// Layer 1 filter syncs announcement events (30617, 30618) which are | ||
| 54 | /// the foundation for discovering repository relationships. | ||
| 55 | /// | ||
| 56 | /// Returns the notification stream for receiving events. | ||
| 57 | pub async fn connect_and_subscribe(&self) -> Result<(), String> { | ||
| 58 | // Add the relay | ||
| 59 | self.client | ||
| 60 | .add_relay(&self.url) | ||
| 61 | .await | ||
| 62 | .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; | ||
| 63 | |||
| 64 | // Connect to relay | ||
| 65 | self.client.connect().await; | ||
| 66 | |||
| 67 | // Wait for connection to establish | ||
| 68 | let mut connected = false; | ||
| 69 | for _ in 0..30 { | ||
| 70 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 71 | let relays = self.client.relays().await; | ||
| 72 | if relays.values().any(|r| r.is_connected()) { | ||
| 73 | connected = true; | ||
| 74 | break; | ||
| 75 | } | ||
| 76 | } | ||
| 77 | |||
| 78 | if !connected { | ||
| 79 | return Err(format!( | ||
| 80 | "Failed to connect to relay {} after 3 seconds", | ||
| 81 | self.url | ||
| 82 | )); | ||
| 83 | } | ||
| 84 | |||
| 85 | tracing::info!("Connected to bootstrap relay: {}", self.url); | ||
| 86 | |||
| 87 | // Layer 1 filter: Repository announcements and state events | ||
| 88 | // These are addressable events that define repositories | ||
| 89 | let filter = Filter::new().kinds([ | ||
| 90 | Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 | ||
| 91 | Kind::Custom(KIND_REPOSITORY_STATE), // 30618 | ||
| 92 | ]); | ||
| 93 | |||
| 94 | // Subscribe to the filter | ||
| 95 | self.client | ||
| 96 | .subscribe(filter, None) | ||
| 97 | .await | ||
| 98 | .map_err(|e| format!("Failed to subscribe: {}", e))?; | ||
| 99 | |||
| 100 | tracing::debug!( | ||
| 101 | "Subscribed to Layer 1 events (kinds 30617, 30618) from {}", | ||
| 102 | self.url | ||
| 103 | ); | ||
| 104 | |||
| 105 | Ok(()) | ||
| 106 | } | ||
| 107 | |||
| 108 | /// Run the event loop, sending received events through the channel. | ||
| 109 | /// | ||
| 110 | /// This method runs until the connection is closed or an error occurs. | ||
| 111 | /// | ||
| 112 | /// # Arguments | ||
| 113 | /// | ||
| 114 | /// * `event_sender` - Channel to send received events | ||
| 115 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { | ||
| 116 | tracing::debug!("Starting event loop for relay: {}", self.url); | ||
| 117 | |||
| 118 | // Handle notifications | ||
| 119 | self.client | ||
| 120 | .handle_notifications(|notification| async { | ||
| 121 | match notification { | ||
| 122 | RelayPoolNotification::Event { event, .. } => { | ||
| 123 | tracing::debug!( | ||
| 124 | "Received event {} (kind {}) from {}", | ||
| 125 | event.id, | ||
| 126 | event.kind.as_u16(), | ||
| 127 | self.url | ||
| 128 | ); | ||
| 129 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | ||
| 130 | tracing::warn!("Event channel closed, stopping relay connection"); | ||
| 131 | return Ok(true); // Stop handling | ||
| 132 | } | ||
| 133 | } | ||
| 134 | RelayPoolNotification::Message { message, .. } => { | ||
| 135 | if let RelayMessage::EndOfStoredEvents(_) = message { | ||
| 136 | tracing::debug!("EOSE received from {}", self.url); | ||
| 137 | if event_sender | ||
| 138 | .send(RelayEvent::EndOfStoredEvents) | ||
| 139 | .await | ||
| 140 | .is_err() | ||
| 141 | { | ||
| 142 | return Ok(true); // Stop handling | ||
| 143 | } | ||
| 144 | } | ||
| 145 | } | ||
| 146 | RelayPoolNotification::Shutdown => { | ||
| 147 | tracing::info!("Relay {} shutting down", self.url); | ||
| 148 | let _ = event_sender | ||
| 149 | .send(RelayEvent::Closed("Shutdown".to_string())) | ||
| 150 | .await; | ||
| 151 | return Ok(true); // Stop handling | ||
| 152 | } | ||
| 153 | } | ||
| 154 | Ok(false) // Continue handling | ||
| 155 | }) | ||
| 156 | .await | ||
| 157 | .ok(); // Ignore errors on shutdown | ||
| 158 | |||
| 159 | // Disconnect when done | ||
| 160 | self.client.disconnect().await; | ||
| 161 | tracing::info!("Disconnected from relay: {}", self.url); | ||
| 162 | } | ||
| 163 | |||
| 164 | /// Get the relay URL | ||
| 165 | pub fn url(&self) -> &str { | ||
| 166 | &self.url | ||
| 167 | } | ||
| 168 | |||
| 169 | /// Subscribe to an additional filter. | ||
| 170 | /// | ||
| 171 | /// This is used to add Layer 2 filters for repo-related events after | ||
| 172 | /// the initial connection is established. | ||
| 173 | pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> { | ||
| 174 | self.client | ||
| 175 | .subscribe(filter, None) | ||
| 176 | .await | ||
| 177 | .map_err(|e| format!("Failed to subscribe with filter: {}", e))?; | ||
| 178 | Ok(()) | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Get a reference to the client for additional operations. | ||
| 182 | pub fn client(&self) -> &Client { | ||
| 183 | &self.client | ||
| 184 | } | ||
| 185 | } | ||
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs new file mode 100644 index 0000000..0512088 --- /dev/null +++ b/src/sync/self_subscriber.rs | |||
| @@ -0,0 +1,497 @@ | |||
| 1 | //! Self-Subscriber for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module handles subscribing to our own relay to detect new events | ||
| 4 | //! and trigger relay discovery from announcements. | ||
| 5 | |||
| 6 | use std::collections::{HashMap, HashSet}; | ||
| 7 | use std::time::Duration; | ||
| 8 | |||
| 9 | use nostr_sdk::prelude::*; | ||
| 10 | use tokio::sync::mpsc; | ||
| 11 | use tokio::time::Instant; | ||
| 12 | |||
| 13 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | ||
| 14 | |||
| 15 | use super::{FollowingRepoRootEvents, SyncManager, SyncRelays}; | ||
| 16 | |||
| 17 | // ============================================================================= | ||
| 18 | // Types | ||
| 19 | // ============================================================================= | ||
| 20 | |||
| 21 | /// Actions to be taken by the SyncManager based on self-subscription events. | ||
| 22 | #[derive(Debug, Clone)] | ||
| 23 | pub enum RelayAction { | ||
| 24 | /// Spawn a new relay connection to sync from. | ||
| 25 | /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering. | ||
| 26 | SpawnRelay { | ||
| 27 | relay_url: String, | ||
| 28 | repos_and_root_events: HashMap<String, HashSet<EventId>>, | ||
| 29 | }, | ||
| 30 | /// Add filters to an existing relay connection. | ||
| 31 | /// Contains: relay_url, additional repos to add. | ||
| 32 | AddFilters { | ||
| 33 | relay_url: String, | ||
| 34 | repos_and_new_root_event: HashMap<String, HashSet<EventId>>, | ||
| 35 | }, | ||
| 36 | } | ||
| 37 | |||
| 38 | /// Pending updates collected during batch window. | ||
| 39 | #[derive(Debug, Default)] | ||
| 40 | struct PendingUpdates { | ||
| 41 | /// New announcements (kind 30617) - triggers relay discovery | ||
| 42 | announcements: Vec<Event>, | ||
| 43 | /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set | ||
| 44 | root_events: Vec<Event>, | ||
| 45 | } | ||
| 46 | |||
| 47 | // ============================================================================= | ||
| 48 | // SelfSubscriber | ||
| 49 | // ============================================================================= | ||
| 50 | |||
| 51 | /// Subscribes to our own relay to detect new events. | ||
| 52 | /// | ||
| 53 | /// The self-subscriber: | ||
| 54 | /// 1. Connects to our own relay | ||
| 55 | /// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618) | ||
| 56 | /// 3. When events arrive, batches them | ||
| 57 | /// 4. On batch timer fire, processes updates and sends relay actions | ||
| 58 | pub struct SelfSubscriber { | ||
| 59 | /// URL of our own relay to subscribe to | ||
| 60 | own_relay_url: String, | ||
| 61 | /// Our relay domain for checking if announcements list us | ||
| 62 | relay_domain: String, | ||
| 63 | /// Reference to following repo root events (shared with SyncManager) | ||
| 64 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 65 | /// Reference to sync relays (shared with SyncManager) | ||
| 66 | sync_relays: SyncRelays, | ||
| 67 | /// Channel to send relay actions back to manager | ||
| 68 | action_tx: mpsc::Sender<RelayAction>, | ||
| 69 | } | ||
| 70 | |||
| 71 | impl SelfSubscriber { | ||
| 72 | /// Create a new self-subscriber. | ||
| 73 | pub fn new( | ||
| 74 | own_relay_url: String, | ||
| 75 | relay_domain: String, | ||
| 76 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 77 | sync_relays: SyncRelays, | ||
| 78 | action_tx: mpsc::Sender<RelayAction>, | ||
| 79 | ) -> Self { | ||
| 80 | Self { | ||
| 81 | own_relay_url, | ||
| 82 | relay_domain, | ||
| 83 | following_repo_root_events, | ||
| 84 | sync_relays, | ||
| 85 | action_tx, | ||
| 86 | } | ||
| 87 | } | ||
| 88 | |||
| 89 | /// Get the batch window duration from environment variable. | ||
| 90 | /// | ||
| 91 | /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS | ||
| 92 | /// for faster tests (typically 200ms). | ||
| 93 | fn get_batch_window() -> Duration { | ||
| 94 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | ||
| 95 | .ok() | ||
| 96 | .and_then(|s| s.parse().ok()) | ||
| 97 | .map(Duration::from_millis) | ||
| 98 | .unwrap_or(Duration::from_secs(5)) | ||
| 99 | } | ||
| 100 | |||
| 101 | /// Run the self-subscriber event loop. | ||
| 102 | /// | ||
| 103 | /// This method: | ||
| 104 | /// 1. Connects to our own relay | ||
| 105 | /// 2. Subscribes to relevant event kinds | ||
| 106 | /// 3. Receives events and batches them | ||
| 107 | /// 4. On batch timer fire, processes and sends relay actions | ||
| 108 | pub async fn run(self) { | ||
| 109 | tracing::info!("SelfSubscriber starting for {}", self.own_relay_url); | ||
| 110 | |||
| 111 | // Create nostr-sdk client | ||
| 112 | let keys = Keys::generate(); | ||
| 113 | let client = Client::new(keys); | ||
| 114 | |||
| 115 | // Connect to our own relay | ||
| 116 | if let Err(e) = client.add_relay(&self.own_relay_url).await { | ||
| 117 | tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e); | ||
| 118 | return; | ||
| 119 | } | ||
| 120 | |||
| 121 | client.connect().await; | ||
| 122 | |||
| 123 | // Wait for connection | ||
| 124 | let mut connected = false; | ||
| 125 | for _ in 0..30 { | ||
| 126 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 127 | let relays = client.relays().await; | ||
| 128 | if relays.values().any(|r| r.is_connected()) { | ||
| 129 | connected = true; | ||
| 130 | break; | ||
| 131 | } | ||
| 132 | } | ||
| 133 | |||
| 134 | if !connected { | ||
| 135 | tracing::error!( | ||
| 136 | "Failed to connect to own relay {} after 3 seconds", | ||
| 137 | self.own_relay_url | ||
| 138 | ); | ||
| 139 | return; | ||
| 140 | } | ||
| 141 | |||
| 142 | tracing::info!("SelfSubscriber connected to {}", self.own_relay_url); | ||
| 143 | |||
| 144 | // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design) | ||
| 145 | let filter = Filter::new() | ||
| 146 | .kinds([ | ||
| 147 | Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 | ||
| 148 | Kind::GitPatch, // 1617 | ||
| 149 | Kind::Custom(KIND_PR), // 1618 | ||
| 150 | Kind::Custom(KIND_PR_UPDATE), // 1619 | ||
| 151 | Kind::GitIssue, // 1621 | ||
| 152 | ]) | ||
| 153 | .since(Timestamp::now()); | ||
| 154 | |||
| 155 | if let Err(e) = client.subscribe(filter, None).await { | ||
| 156 | tracing::error!("Failed to subscribe to own relay: {}", e); | ||
| 157 | return; | ||
| 158 | } | ||
| 159 | |||
| 160 | tracing::info!("SelfSubscriber subscribed to event kinds on own relay"); | ||
| 161 | |||
| 162 | // Batch state | ||
| 163 | let mut pending = PendingUpdates::default(); | ||
| 164 | let mut batch_timer_started: Option<Instant> = None; | ||
| 165 | let batch_window = Self::get_batch_window(); | ||
| 166 | |||
| 167 | // Main event loop using notifications stream | ||
| 168 | loop { | ||
| 169 | // Calculate timeout for batch processing | ||
| 170 | let timeout = if let Some(started) = batch_timer_started { | ||
| 171 | let elapsed = started.elapsed(); | ||
| 172 | if elapsed >= batch_window { | ||
| 173 | Duration::ZERO | ||
| 174 | } else { | ||
| 175 | batch_window - elapsed | ||
| 176 | } | ||
| 177 | } else { | ||
| 178 | Duration::from_secs(60) // Long timeout when no batch pending | ||
| 179 | }; | ||
| 180 | |||
| 181 | // Wait for notification with timeout | ||
| 182 | let notification = tokio::time::timeout(timeout, client.notifications().recv()).await; | ||
| 183 | |||
| 184 | match notification { | ||
| 185 | Ok(Ok(notification)) => { | ||
| 186 | match notification { | ||
| 187 | RelayPoolNotification::Event { event, .. } => { | ||
| 188 | let kind = event.kind.as_u16(); | ||
| 189 | |||
| 190 | // Start batch timer on first event (does NOT reset) | ||
| 191 | if batch_timer_started.is_none() { | ||
| 192 | batch_timer_started = Some(Instant::now()); | ||
| 193 | tracing::debug!("Batch timer started"); | ||
| 194 | } | ||
| 195 | |||
| 196 | // Classify and add to pending | ||
| 197 | if kind == KIND_REPOSITORY_ANNOUNCEMENT { | ||
| 198 | tracing::debug!( | ||
| 199 | "SelfSubscriber received announcement {}", | ||
| 200 | event.id | ||
| 201 | ); | ||
| 202 | pending.announcements.push(*event); | ||
| 203 | } else { | ||
| 204 | tracing::debug!( | ||
| 205 | "SelfSubscriber received root event {} (kind {})", | ||
| 206 | event.id, | ||
| 207 | kind | ||
| 208 | ); | ||
| 209 | pending.root_events.push(*event); | ||
| 210 | } | ||
| 211 | } | ||
| 212 | RelayPoolNotification::Message { message, .. } => { | ||
| 213 | if let RelayMessage::EndOfStoredEvents(_) = message { | ||
| 214 | tracing::debug!("SelfSubscriber EOSE received"); | ||
| 215 | // Process any pending events after EOSE | ||
| 216 | if !pending.announcements.is_empty() | ||
| 217 | || !pending.root_events.is_empty() | ||
| 218 | { | ||
| 219 | self.process_batch(&mut pending).await; | ||
| 220 | batch_timer_started = None; | ||
| 221 | } | ||
| 222 | } | ||
| 223 | } | ||
| 224 | RelayPoolNotification::Shutdown => { | ||
| 225 | tracing::info!("SelfSubscriber shutting down"); | ||
| 226 | break; | ||
| 227 | } | ||
| 228 | } | ||
| 229 | } | ||
| 230 | Ok(Err(_)) => { | ||
| 231 | // Channel closed | ||
| 232 | tracing::warn!("SelfSubscriber notification channel closed"); | ||
| 233 | break; | ||
| 234 | } | ||
| 235 | Err(_) => { | ||
| 236 | // Timeout - check if batch should be processed | ||
| 237 | if let Some(started) = batch_timer_started { | ||
| 238 | if started.elapsed() >= batch_window { | ||
| 239 | if !pending.announcements.is_empty() || !pending.root_events.is_empty() | ||
| 240 | { | ||
| 241 | self.process_batch(&mut pending).await; | ||
| 242 | } | ||
| 243 | batch_timer_started = None; | ||
| 244 | } | ||
| 245 | } | ||
| 246 | } | ||
| 247 | } | ||
| 248 | } | ||
| 249 | |||
| 250 | client.disconnect().await; | ||
| 251 | tracing::info!("SelfSubscriber disconnected"); | ||
| 252 | } | ||
| 253 | |||
| 254 | /// Process a batch of pending updates. | ||
| 255 | async fn process_batch(&self, pending: &mut PendingUpdates) { | ||
| 256 | tracing::debug!( | ||
| 257 | "Processing batch: {} announcements, {} root events", | ||
| 258 | pending.announcements.len(), | ||
| 259 | pending.root_events.len() | ||
| 260 | ); | ||
| 261 | |||
| 262 | // Process root events first (update following_repo_root_events) | ||
| 263 | for event in pending.root_events.drain(..) { | ||
| 264 | let repo_refs = SyncManager::extract_all_repo_refs(&event); | ||
| 265 | if !repo_refs.is_empty() { | ||
| 266 | let mut guard = self.following_repo_root_events.write().await; | ||
| 267 | for repo_ref in repo_refs { | ||
| 268 | guard.entry(repo_ref).or_default().insert(event.id); | ||
| 269 | } | ||
| 270 | } | ||
| 271 | } | ||
| 272 | |||
| 273 | // Process announcements (relay discovery) | ||
| 274 | for event in pending.announcements.drain(..) { | ||
| 275 | self.process_announcement(&event).await; | ||
| 276 | } | ||
| 277 | } | ||
| 278 | |||
| 279 | /// Process an announcement event for relay discovery. | ||
| 280 | async fn process_announcement(&self, event: &Event) { | ||
| 281 | let repo_ref = SyncManager::build_repo_ref(event); | ||
| 282 | let relay_urls = Self::extract_relay_urls_from_announcement(event); | ||
| 283 | |||
| 284 | // Check if this announcement lists our relay | ||
| 285 | if !self.lists_our_service(event) { | ||
| 286 | tracing::debug!( | ||
| 287 | "Announcement {} does not list our service, skipping relay discovery", | ||
| 288 | event.id | ||
| 289 | ); | ||
| 290 | return; | ||
| 291 | } | ||
| 292 | |||
| 293 | tracing::info!( | ||
| 294 | "Processing announcement {} for repo {}, found {} relay URLs", | ||
| 295 | event.id, | ||
| 296 | repo_ref, | ||
| 297 | relay_urls.len() | ||
| 298 | ); | ||
| 299 | |||
| 300 | // Get current events for this repo from following_repo_root_events | ||
| 301 | let events = self | ||
| 302 | .following_repo_root_events | ||
| 303 | .read() | ||
| 304 | .await | ||
| 305 | .get(&repo_ref) | ||
| 306 | .cloned() | ||
| 307 | .unwrap_or_default(); | ||
| 308 | |||
| 309 | // For each relay URL in the announcement, check if we need to spawn or update | ||
| 310 | for relay_url in relay_urls { | ||
| 311 | if self.is_own_relay(&relay_url) { | ||
| 312 | continue; // Skip our own relay | ||
| 313 | } | ||
| 314 | |||
| 315 | let sync_relays_guard = self.sync_relays.read().await; | ||
| 316 | let exists = sync_relays_guard.contains_key(&relay_url); | ||
| 317 | drop(sync_relays_guard); | ||
| 318 | |||
| 319 | if exists { | ||
| 320 | // Relay already known - check if we need to add this repo | ||
| 321 | let mut guard = self.sync_relays.write().await; | ||
| 322 | let relay_repos = guard.entry(relay_url.clone()).or_default(); | ||
| 323 | let is_new_repo = !relay_repos.contains_key(&repo_ref); | ||
| 324 | |||
| 325 | if is_new_repo { | ||
| 326 | relay_repos.insert(repo_ref.clone(), events.clone()); | ||
| 327 | drop(guard); | ||
| 328 | |||
| 329 | // Send action to add filters | ||
| 330 | let mut repos_filters = HashMap::new(); | ||
| 331 | repos_filters.insert(repo_ref.clone(), events.clone()); | ||
| 332 | |||
| 333 | if let Err(e) = self | ||
| 334 | .action_tx | ||
| 335 | .send(RelayAction::AddFilters { | ||
| 336 | relay_url: relay_url.clone(), | ||
| 337 | repos_and_new_root_event: repos_filters, | ||
| 338 | }) | ||
| 339 | .await | ||
| 340 | { | ||
| 341 | tracing::warn!("Failed to send AddFilters action: {}", e); | ||
| 342 | } | ||
| 343 | } | ||
| 344 | } else { | ||
| 345 | // New relay - add to sync_relays and spawn | ||
| 346 | let mut guard = self.sync_relays.write().await; | ||
| 347 | let mut repos = HashMap::new(); | ||
| 348 | repos.insert(repo_ref.clone(), events.clone()); | ||
| 349 | guard.insert(relay_url.clone(), repos.clone()); | ||
| 350 | drop(guard); | ||
| 351 | |||
| 352 | tracing::info!("Discovered new relay to sync from: {}", relay_url); | ||
| 353 | |||
| 354 | // Send action to spawn relay | ||
| 355 | if let Err(e) = self | ||
| 356 | .action_tx | ||
| 357 | .send(RelayAction::SpawnRelay { | ||
| 358 | relay_url: relay_url.clone(), | ||
| 359 | repos_and_root_events: repos, | ||
| 360 | }) | ||
| 361 | .await | ||
| 362 | { | ||
| 363 | tracing::warn!("Failed to send SpawnRelay action: {}", e); | ||
| 364 | } | ||
| 365 | } | ||
| 366 | } | ||
| 367 | } | ||
| 368 | |||
| 369 | /// Extract relay URLs from an announcement event. | ||
| 370 | /// | ||
| 371 | /// Looks for both 'relays' and 'clone' tags. | ||
| 372 | fn extract_relay_urls_from_announcement(event: &Event) -> Vec<String> { | ||
| 373 | let mut urls = Vec::new(); | ||
| 374 | |||
| 375 | // Extract from 'relays' tag | ||
| 376 | for tag in event.tags.iter() { | ||
| 377 | if matches!(tag.kind(), TagKind::Relays) { | ||
| 378 | let vec = tag.clone().to_vec(); | ||
| 379 | urls.extend(vec.into_iter().skip(1)); // Skip tag name | ||
| 380 | } | ||
| 381 | } | ||
| 382 | |||
| 383 | // Extract from 'clone' tag - parse URLs to get relay hints | ||
| 384 | // Clone URLs look like: http://domain/repo.git or git://domain/repo.git | ||
| 385 | // We want to construct ws://domain from these | ||
| 386 | for tag in event.tags.iter() { | ||
| 387 | if matches!(tag.kind(), TagKind::Clone) { | ||
| 388 | let vec = tag.clone().to_vec(); | ||
| 389 | for url in vec.into_iter().skip(1) { | ||
| 390 | if let Some(relay_url) = Self::clone_url_to_relay_url(&url) { | ||
| 391 | if !urls.contains(&relay_url) { | ||
| 392 | urls.push(relay_url); | ||
| 393 | } | ||
| 394 | } | ||
| 395 | } | ||
| 396 | } | ||
| 397 | } | ||
| 398 | |||
| 399 | urls | ||
| 400 | } | ||
| 401 | |||
| 402 | /// Convert a clone URL to a potential relay URL. | ||
| 403 | /// | ||
| 404 | /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080" | ||
| 405 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | ||
| 406 | // Parse the URL to extract host:port | ||
| 407 | if let Ok(url) = url::Url::parse(clone_url) { | ||
| 408 | let host = url.host_str()?; | ||
| 409 | let port = url.port(); | ||
| 410 | let scheme = if url.scheme() == "https" { "wss" } else { "ws" }; | ||
| 411 | |||
| 412 | if let Some(port) = port { | ||
| 413 | Some(format!("{}://{}:{}", scheme, host, port)) | ||
| 414 | } else { | ||
| 415 | Some(format!("{}://{}", scheme, host)) | ||
| 416 | } | ||
| 417 | } else { | ||
| 418 | None | ||
| 419 | } | ||
| 420 | } | ||
| 421 | |||
| 422 | /// Check if event lists our service in the relays or clone tags. | ||
| 423 | fn lists_our_service(&self, event: &Event) -> bool { | ||
| 424 | // Check relays tag | ||
| 425 | for tag in event.tags.iter() { | ||
| 426 | if matches!(tag.kind(), TagKind::Relays) { | ||
| 427 | let vec = tag.clone().to_vec(); | ||
| 428 | for url in vec.into_iter().skip(1) { | ||
| 429 | if self.is_own_relay(&url) { | ||
| 430 | return true; | ||
| 431 | } | ||
| 432 | } | ||
| 433 | } | ||
| 434 | } | ||
| 435 | |||
| 436 | // Check clone tag | ||
| 437 | for tag in event.tags.iter() { | ||
| 438 | if matches!(tag.kind(), TagKind::Clone) { | ||
| 439 | let vec = tag.clone().to_vec(); | ||
| 440 | for url in vec.into_iter().skip(1) { | ||
| 441 | if url.contains(&self.relay_domain) { | ||
| 442 | return true; | ||
| 443 | } | ||
| 444 | } | ||
| 445 | } | ||
| 446 | } | ||
| 447 | |||
| 448 | false | ||
| 449 | } | ||
| 450 | |||
| 451 | /// Check if a relay URL matches our relay. | ||
| 452 | fn is_own_relay(&self, relay_url: &str) -> bool { | ||
| 453 | relay_url.contains(&self.relay_domain) | ||
| 454 | } | ||
| 455 | } | ||
| 456 | |||
| 457 | #[cfg(test)] | ||
| 458 | mod tests { | ||
| 459 | use super::*; | ||
| 460 | |||
| 461 | #[test] | ||
| 462 | fn test_clone_url_to_relay_url_http() { | ||
| 463 | let url = "http://127.0.0.1:8080/repo.git"; | ||
| 464 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 465 | assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string())); | ||
| 466 | } | ||
| 467 | |||
| 468 | #[test] | ||
| 469 | fn test_clone_url_to_relay_url_https() { | ||
| 470 | let url = "https://example.com/repo.git"; | ||
| 471 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 472 | assert_eq!(relay, Some("wss://example.com".to_string())); | ||
| 473 | } | ||
| 474 | |||
| 475 | #[test] | ||
| 476 | fn test_clone_url_to_relay_url_invalid() { | ||
| 477 | let url = "not-a-valid-url"; | ||
| 478 | let relay = SelfSubscriber::clone_url_to_relay_url(url); | ||
| 479 | assert_eq!(relay, None); | ||
| 480 | } | ||
| 481 | |||
| 482 | #[test] | ||
| 483 | fn test_get_batch_window_default() { | ||
| 484 | // Clear env var if set | ||
| 485 | std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); | ||
| 486 | let window = SelfSubscriber::get_batch_window(); | ||
| 487 | assert_eq!(window, Duration::from_secs(5)); | ||
| 488 | } | ||
| 489 | |||
| 490 | #[test] | ||
| 491 | fn test_get_batch_window_from_env() { | ||
| 492 | std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200"); | ||
| 493 | let window = SelfSubscriber::get_batch_window(); | ||
| 494 | assert_eq!(window, Duration::from_millis(200)); | ||
| 495 | std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); | ||
| 496 | } | ||
| 497 | } | ||