From 586fc2a7df1ce256469f0742d23f687ac4b075b1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 10:33:07 +0000 Subject: stub of sync v4 --- src/sync/relay_connection.rs | 185 ------------------------------------------- 1 file changed, 185 deletions(-) delete mode 100644 src/sync/relay_connection.rs (limited to 'src/sync/relay_connection.rs') diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs deleted file mode 100644 index 71b5d51..0000000 --- a/src/sync/relay_connection.rs +++ /dev/null @@ -1,185 +0,0 @@ -//! Relay Connection for Proactive Sync -//! -//! This module handles connecting to external relays and receiving events -//! for the proactive sync system. - -use std::time::Duration; - -use nostr_sdk::prelude::*; -use tokio::sync::mpsc; - -use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; - -/// Events received from a relay connection -#[derive(Debug)] -pub enum RelayEvent { - /// A nostr event was received - Event(Event), - /// End of stored events (EOSE) received - EndOfStoredEvents, - /// Connection was closed - Closed(String), -} - -/// Connection to an external relay for syncing events. -/// -/// RelayConnection handles: -/// - Connecting to the relay -/// - Subscribing with appropriate filters (Layer 1 for bootstrap) -/// - Receiving events and sending them through a channel -pub struct RelayConnection { - /// The relay URL - url: String, - /// The nostr-sdk client - client: Client, -} - -impl RelayConnection { - /// Create a new relay connection. - /// - /// # Arguments - /// - /// * `url` - The WebSocket URL of the relay to connect to - pub fn new(url: String) -> Self { - // Create a client with generated keys (we're just subscribing, not publishing) - let keys = Keys::generate(); - let client = Client::new(keys); - - Self { url, client } - } - - /// Connect to the relay and subscribe with Layer 1 filter. - /// - /// Layer 1 filter syncs announcement events (30617, 30618) which are - /// the foundation for discovering repository relationships. - /// - /// Returns the notification stream for receiving events. - pub async fn connect_and_subscribe(&self) -> Result<(), String> { - // Add the relay - self.client - .add_relay(&self.url) - .await - .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; - - // Connect to relay - self.client.connect().await; - - // Wait for connection to establish - let mut connected = false; - for _ in 0..30 { - tokio::time::sleep(Duration::from_millis(100)).await; - let relays = self.client.relays().await; - if relays.values().any(|r| r.is_connected()) { - connected = true; - break; - } - } - - if !connected { - return Err(format!( - "Failed to connect to relay {} after 3 seconds", - self.url - )); - } - - tracing::info!("Connected to bootstrap relay: {}", self.url); - - // Layer 1 filter: Repository announcements and state events - // These are addressable events that define repositories - let filter = Filter::new().kinds([ - Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 - Kind::Custom(KIND_REPOSITORY_STATE), // 30618 - ]); - - // Subscribe to the filter - self.client - .subscribe(filter, None) - .await - .map_err(|e| format!("Failed to subscribe: {}", e))?; - - tracing::debug!( - "Subscribed to Layer 1 events (kinds 30617, 30618) from {}", - self.url - ); - - Ok(()) - } - - /// Run the event loop, sending received events through the channel. - /// - /// This method runs until the connection is closed or an error occurs. - /// - /// # Arguments - /// - /// * `event_sender` - Channel to send received events - pub async fn run_event_loop(self, event_sender: mpsc::Sender) { - tracing::debug!("Starting event loop for relay: {}", self.url); - - // Handle notifications - self.client - .handle_notifications(|notification| async { - match notification { - RelayPoolNotification::Event { event, .. } => { - tracing::debug!( - "Received event {} (kind {}) from {}", - event.id, - event.kind.as_u16(), - self.url - ); - if event_sender.send(RelayEvent::Event(*event)).await.is_err() { - tracing::warn!("Event channel closed, stopping relay connection"); - return Ok(true); // Stop handling - } - } - RelayPoolNotification::Message { message, .. } => { - if let RelayMessage::EndOfStoredEvents(_) = message { - tracing::debug!("EOSE received from {}", self.url); - if event_sender - .send(RelayEvent::EndOfStoredEvents) - .await - .is_err() - { - return Ok(true); // Stop handling - } - } - } - RelayPoolNotification::Shutdown => { - tracing::info!("Relay {} shutting down", self.url); - let _ = event_sender - .send(RelayEvent::Closed("Shutdown".to_string())) - .await; - return Ok(true); // Stop handling - } - } - Ok(false) // Continue handling - }) - .await - .ok(); // Ignore errors on shutdown - - // Disconnect when done - self.client.disconnect().await; - tracing::info!("Disconnected from relay: {}", self.url); - } - - /// Get the relay URL - pub fn url(&self) -> &str { - &self.url - } - - /// Subscribe to an additional filter. - /// - /// This is used to add Layer 2 filters for repo-related events after - /// the initial connection is established. - pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> { - self.client - .subscribe(filter, None) - .await - .map_err(|e| format!("Failed to subscribe with filter: {}", e))?; - Ok(()) - } - - /// Get a reference to the client for additional operations. - pub fn client(&self) -> &Client { - &self.client - } -} -- cgit v1.2.3