From 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 09:07:54 +0000 Subject: improve sync design --- src/sync/relay_connection.rs | 185 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 src/sync/relay_connection.rs (limited to 'src/sync/relay_connection.rs') diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs new file mode 100644 index 0000000..71b5d51 --- /dev/null +++ b/src/sync/relay_connection.rs @@ -0,0 +1,185 @@ +//! Relay Connection for Proactive Sync +//! +//! This module handles connecting to external relays and receiving events +//! for the proactive sync system. + +use std::time::Duration; + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; + +use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; + +/// Events received from a relay connection +#[derive(Debug)] +pub enum RelayEvent { + /// A nostr event was received + Event(Event), + /// End of stored events (EOSE) received + EndOfStoredEvents, + /// Connection was closed + Closed(String), +} + +/// Connection to an external relay for syncing events. +/// +/// RelayConnection handles: +/// - Connecting to the relay +/// - Subscribing with appropriate filters (Layer 1 for bootstrap) +/// - Receiving events and sending them through a channel +pub struct RelayConnection { + /// The relay URL + url: String, + /// The nostr-sdk client + client: Client, +} + +impl RelayConnection { + /// Create a new relay connection. + /// + /// # Arguments + /// + /// * `url` - The WebSocket URL of the relay to connect to + pub fn new(url: String) -> Self { + // Create a client with generated keys (we're just subscribing, not publishing) + let keys = Keys::generate(); + let client = Client::new(keys); + + Self { url, client } + } + + /// Connect to the relay and subscribe with Layer 1 filter. + /// + /// Layer 1 filter syncs announcement events (30617, 30618) which are + /// the foundation for discovering repository relationships. + /// + /// Returns the notification stream for receiving events. + pub async fn connect_and_subscribe(&self) -> Result<(), String> { + // Add the relay + self.client + .add_relay(&self.url) + .await + .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; + + // Connect to relay + self.client.connect().await; + + // Wait for connection to establish + let mut connected = false; + for _ in 0..30 { + tokio::time::sleep(Duration::from_millis(100)).await; + let relays = self.client.relays().await; + if relays.values().any(|r| r.is_connected()) { + connected = true; + break; + } + } + + if !connected { + return Err(format!( + "Failed to connect to relay {} after 3 seconds", + self.url + )); + } + + tracing::info!("Connected to bootstrap relay: {}", self.url); + + // Layer 1 filter: Repository announcements and state events + // These are addressable events that define repositories + let filter = Filter::new().kinds([ + Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 + Kind::Custom(KIND_REPOSITORY_STATE), // 30618 + ]); + + // Subscribe to the filter + self.client + .subscribe(filter, None) + .await + .map_err(|e| format!("Failed to subscribe: {}", e))?; + + tracing::debug!( + "Subscribed to Layer 1 events (kinds 30617, 30618) from {}", + self.url + ); + + Ok(()) + } + + /// Run the event loop, sending received events through the channel. + /// + /// This method runs until the connection is closed or an error occurs. + /// + /// # Arguments + /// + /// * `event_sender` - Channel to send received events + pub async fn run_event_loop(self, event_sender: mpsc::Sender) { + tracing::debug!("Starting event loop for relay: {}", self.url); + + // Handle notifications + self.client + .handle_notifications(|notification| async { + match notification { + RelayPoolNotification::Event { event, .. } => { + tracing::debug!( + "Received event {} (kind {}) from {}", + event.id, + event.kind.as_u16(), + self.url + ); + if event_sender.send(RelayEvent::Event(*event)).await.is_err() { + tracing::warn!("Event channel closed, stopping relay connection"); + return Ok(true); // Stop handling + } + } + RelayPoolNotification::Message { message, .. } => { + if let RelayMessage::EndOfStoredEvents(_) = message { + tracing::debug!("EOSE received from {}", self.url); + if event_sender + .send(RelayEvent::EndOfStoredEvents) + .await + .is_err() + { + return Ok(true); // Stop handling + } + } + } + RelayPoolNotification::Shutdown => { + tracing::info!("Relay {} shutting down", self.url); + let _ = event_sender + .send(RelayEvent::Closed("Shutdown".to_string())) + .await; + return Ok(true); // Stop handling + } + } + Ok(false) // Continue handling + }) + .await + .ok(); // Ignore errors on shutdown + + // Disconnect when done + self.client.disconnect().await; + tracing::info!("Disconnected from relay: {}", self.url); + } + + /// Get the relay URL + pub fn url(&self) -> &str { + &self.url + } + + /// Subscribe to an additional filter. + /// + /// This is used to add Layer 2 filters for repo-related events after + /// the initial connection is established. + pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> { + self.client + .subscribe(filter, None) + .await + .map_err(|e| format!("Failed to subscribe with filter: {}", e))?; + Ok(()) + } + + /// Get a reference to the client for additional operations. + pub fn client(&self) -> &Client { + &self.client + } +} -- cgit v1.2.3