From 9f594fadf2a1d5bfda0ab027f2b3cf7a247900ec Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 8 Dec 2025 16:47:46 +0000 Subject: proposed sync change to use self subscribe to trigger everything --- src/sync/manager.rs | 499 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 414 insertions(+), 85 deletions(-) (limited to 'src/sync/manager.rs') diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 96bf0f4..6ae82ef 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -1,12 +1,42 @@ //! SyncManager - Coordinates proactive sync operations //! -//! The SyncManager discovers relays from stored announcements, spawns connections -//! to each relay, receives events, validates them through the write policy, -//! and stores accepted events. +//! The SyncManager connects to remote relays, receives events, validates them +//! through the write policy, and stores accepted events. +//! +//! ## Simplified Relay Discovery Architecture +//! +//! All relay discovery is centralized in the self-subscriber: +//! - Bootstrap relay: connected immediately (no jitter, single relay) +//! - All other relays: discovered via self-subscriber announcements (with jitter) +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────┐ +//! │ ngit-grasp │ +//! │ │ +//! │ ┌─────────────┐ broadcasts ┌───────────────┐ │ +//! │ │ Relay │ ─────────────────────▶ │ Self-Subscribe│ │ +//! │ │ Database │ │ Client │ │ +//! │ └─────────────┘ └───────┬───────┘ │ +//! │ ▲ │ │ +//! │ │ stores │ extracts │ +//! │ │ │ relay │ +//! │ ┌─────┴─────┐ │ URLs │ +//! │ │ Remote │◀────────────────────────────────┘ │ +//! │ │Connections│ spawns new │ +//! │ └───────────┘ connections (with jitter) │ +//! └─────────────────────────────────────────────────────────────┘ +//! ``` +//! +//! ## Key Design Decisions +//! +//! - **Single relay discovery path**: Only the self-subscriber discovers new relays +//! - **Jitter at point of discovery**: Applied when spawning connections from announcements +//! - **Since filter on reconnection**: Avoids re-processing old announcements after disconnect +//! - **Bootstrap relay has no jitter**: Single relay doesn't cause thundering herd //! //! ## Phase 2 Features //! -//! - Relay discovery from stored kind 30617 announcements +//! - Relay discovery from kind 30617 announcements (via self-subscriber) //! - Multiple simultaneous relay connections //! - Three-layer filter strategy via FilterService //! @@ -30,6 +60,7 @@ use std::sync::Arc; use std::time::Duration; use nostr_relay_builder::prelude::*; +use nostr_sdk::prelude::{Client, Filter, Kind, RelayPoolNotification, Timestamp}; use rand::Rng; use tokio::sync::mpsc; @@ -39,7 +70,7 @@ use super::health::RelayHealthTracker; use super::metrics::SyncMetrics; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; - +use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT; /// Default fallback address for sync source when bind_address cannot be parsed /// @@ -58,6 +89,11 @@ fn get_sync_source_addr(bind_address: &str) -> SocketAddr { .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) } +/// Derive the WebSocket URL for our own relay from bind_address +fn derive_own_relay_url(bind_address: &str) -> String { + format!("ws://{}", bind_address) +} + /// Coordinates proactive sync from configured and discovered relays pub struct SyncManager { /// Bootstrap relay URL for initial sync (from config) @@ -65,6 +101,8 @@ pub struct SyncManager { bootstrap_relay_url: Option, /// Our relay's domain (for filtering) relay_domain: String, + /// Our relay's WebSocket URL (for self-subscribe) + own_relay_url: String, /// Database for storing accepted events database: SharedDatabase, /// Write policy for validating events @@ -95,9 +133,11 @@ impl SyncManager { write_policy: Nip34WritePolicy, config: &Config, ) -> Self { + let own_relay_url = derive_own_relay_url(&config.bind_address); Self { bootstrap_relay_url, relay_domain, + own_relay_url, database, write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), @@ -124,9 +164,11 @@ impl SyncManager { config: &Config, metrics: SyncMetrics, ) -> Self { + let own_relay_url = derive_own_relay_url(&config.bind_address); Self { bootstrap_relay_url, relay_domain, + own_relay_url, database, write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), @@ -144,9 +186,11 @@ impl SyncManager { ) -> Self { // Extract domain from URL for filtering let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default(); + let own_relay_url = format!("ws://{}", relay_domain); Self { bootstrap_relay_url: Some(bootstrap_url), relay_domain, + own_relay_url, database, write_policy, health_tracker: Arc::new(RelayHealthTracker::with_defaults()), @@ -173,12 +217,27 @@ impl SyncManager { /// Run the sync manager /// - /// This discovers relays from stored announcements, spawns connection tasks, - /// and processes incoming events. Runs indefinitely until cancelled. + /// This spawns the bootstrap relay connection (if configured), sets up a + /// self-subscriber for event-driven relay discovery, and processes incoming + /// events. The self-subscriber handles ALL relay discovery from announcements. + /// Runs indefinitely until cancelled. + /// + /// ## Simplified Relay Discovery Architecture + /// + /// All relay discovery is centralized in the self-subscriber: + /// - Bootstrap relay: connected immediately (no jitter, single relay) + /// - All other relays: discovered via self-subscriber announcements (with jitter) + /// - Jitter applied at point of discovery (not startup) + /// + /// This eliminates three redundant discovery paths: + /// 1. DB query at startup (removed) + /// 2. Remote event extraction (removed) + /// 3. Self-subscriber (sole discovery path) pub async fn run(self) { tracing::info!( - "Starting SyncManager (domain: {}, bootstrap relay: {:?})", + "Starting SyncManager (domain: {}, own_relay: {}, bootstrap relay: {:?})", self.relay_domain, + self.own_relay_url, self.bootstrap_relay_url ); @@ -191,74 +250,56 @@ impl SyncManager { // Create channel for receiving events from all connections let (tx, mut rx) = mpsc::channel::(100); - // Track active relay URLs to avoid duplicates - let mut active_relays: HashSet = HashSet::new(); - - // Collect all relays to connect to - let mut relays_to_connect: Vec = Vec::new(); + // Track active relay URLs to avoid duplicates (wrapped in Arc for sharing) + let active_relays = Arc::new(tokio::sync::Mutex::new(HashSet::::new())); - // Start with bootstrap relay if configured + // Bootstrap relay - connect immediately (no jitter, just one relay) if let Some(ref url) = self.bootstrap_relay_url { if !self.is_own_relay(url) { - relays_to_connect.push(url.clone()); - active_relays.insert(url.clone()); + tracing::info!("Connecting to bootstrap relay: {}", url); + active_relays.lock().await.insert(url.clone()); + self.spawn_connection(url.clone(), tx.clone(), filter_service.clone(), false); } else { tracing::info!("Skipping bootstrap relay (is our own relay): {}", url); } } - // Discover additional relays from stored announcements - let discovered_urls = filter_service.discover_relay_urls().await; - for url in discovered_urls { - if !active_relays.contains(&url) && !self.is_own_relay(&url) { - relays_to_connect.push(url.clone()); - active_relays.insert(url.clone()); - } - } - // Record initial tracked relay count if let Some(ref metrics) = self.metrics { - metrics.set_tracked_count(active_relays.len() as i64); + let count = active_relays.lock().await.len(); + metrics.set_tracked_count(count as i64); } - // Spawn connections with startup jitter to prevent thundering herd - for url in relays_to_connect { - tracing::info!("Scheduling connection to sync relay: {}", url); - self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone()); + { + let active = active_relays.lock().await; + if active.is_empty() { + tracing::info!( + "No bootstrap relay configured, waiting for announcements via self-subscriber..." + ); + } else { + tracing::info!( + "SyncManager connected to {} relay(s): {:?}", + active.len(), + *active + ); + } } - if active_relays.is_empty() { - tracing::warn!("No sync relays configured or discovered, SyncManager idle"); - } else { - tracing::info!( - "SyncManager connected to {} relays: {:?}", - active_relays.len(), - active_relays - ); - } + // Spawn self-subscriber task for ALL relay discovery + let self_subscriber_handle = self.spawn_self_subscriber( + tx.clone(), + filter_service.clone(), + active_relays.clone(), + ); - // Process incoming events from all connections + // Process incoming events - just validate and store, NO relay discovery + // (relay discovery is handled solely by the self-subscriber) while let Some(synced_event) = rx.recv().await { - // Check if this event reveals new relays to sync from - let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event); - for url in new_urls { - if !active_relays.contains(&url) && !self.is_own_relay(&url) { - tracing::info!("Discovered new relay from event, connecting: {}", url); - active_relays.insert(url.clone()); - - // Update tracked relay count - if let Some(ref metrics) = self.metrics { - metrics.inc_tracked_count(); - } - - // New relays discovered during runtime don't need jitter - self.spawn_connection(url, tx.clone(), filter_service.clone()); - } - } - self.process_event(synced_event).await; } + // Clean up self-subscriber + self_subscriber_handle.abort(); tracing::warn!("SyncManager event channel closed, shutting down"); } @@ -267,52 +308,325 @@ impl SyncManager { url.contains(&self.relay_domain) } - /// Spawn a connection task for a relay with startup jitter + /// Spawn a self-subscriber task that connects to our own relay + /// and watches for kind 30617 announcements to discover new relays. /// - /// Adds a random delay (0 to startup_jitter_ms) before connecting to prevent - /// thundering herd on startup when multiple relays are configured. - /// Set startup_jitter_ms to 0 to disable jitter (useful for testing). - fn spawn_connection_with_jitter( + /// This is the SOLE relay discovery path - all relay discovery happens here. + /// When a new announcement is saved to our database (from direct submission + /// or synced from another relay), the self-subscriber receives it immediately + /// and spawns connections to newly discovered relays (with jitter). + fn spawn_self_subscriber( &self, - url: String, tx: mpsc::Sender, filter_service: Arc, - ) { - let domain = self.relay_domain.clone(); - let health_tracker = self.health_tracker.clone(); + active_relays: Arc>>, + ) -> tokio::task::JoinHandle<()> { + let own_relay_url = self.own_relay_url.clone(); + let relay_domain = self.relay_domain.clone(); let metrics = self.metrics.clone(); - let max_jitter = self.startup_jitter_ms; + let health_tracker = self.health_tracker.clone(); + let startup_jitter_ms = self.startup_jitter_ms; tokio::spawn(async move { - // Apply startup jitter (if configured) - if max_jitter > 0 { - let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); - tracing::debug!( - "Applying {}ms startup jitter before connecting to {}", - jitter_ms, - url - ); - tokio::time::sleep(Duration::from_millis(jitter_ms)).await; + Self::run_self_subscriber_loop( + own_relay_url, + relay_domain, + tx, + filter_service, + active_relays, + metrics, + health_tracker, + startup_jitter_ms, + ) + .await; + }) + } + + /// Main loop for the self-subscriber + /// + /// Connects to our own relay, subscribes to kind 30617 announcements, + /// and processes events to discover new relays. Handles reconnection + /// on disconnect. + /// + /// ## Reconnection Behavior + /// + /// - First connection: no `since` filter (get all historical announcements) + /// - Reconnections: use `since` filter (15 minutes ago) to avoid re-processing + #[allow(clippy::too_many_arguments)] + async fn run_self_subscriber_loop( + own_relay_url: String, + relay_domain: String, + tx: mpsc::Sender, + filter_service: Arc, + active_relays: Arc>>, + metrics: Option, + health_tracker: Arc, + startup_jitter_ms: u64, + ) { + let mut reconnect_delay = Duration::from_secs(1); + let max_reconnect_delay = Duration::from_secs(60); + let mut is_first_connection = true; + + loop { + tracing::info!( + "Self-subscriber connecting to own relay: {}", + own_relay_url + ); + + match Self::connect_self_subscriber(&own_relay_url).await { + Ok(client) => { + // Reset reconnect delay on successful connection + reconnect_delay = Duration::from_secs(1); + + tracing::info!( + "Self-subscriber connected to own relay, subscribing to kind {} announcements{}", + KIND_REPOSITORY_ANNOUNCEMENT, + if is_first_connection { " (initial, no since filter)" } else { " (reconnection, with since filter)" } + ); + + // Subscribe to repository announcements + // First connection: get all historical; reconnections: only last 15 minutes + let filter = if is_first_connection { + Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) + } else { + let since = Timestamp::now() - 15 * 60; // 15 minutes ago + Filter::new() + .kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) + .since(since) + }; + + is_first_connection = false; + + if let Err(e) = client.subscribe(filter, None).await { + tracing::error!( + "Self-subscriber failed to subscribe on {}: {}", + own_relay_url, + e + ); + // Will reconnect after delay + } else { + // Handle notifications until disconnect + Self::handle_self_subscriber_notifications( + &client, + &own_relay_url, + &relay_domain, + &tx, + &filter_service, + &active_relays, + &metrics, + &health_tracker, + startup_jitter_ms, + ) + .await; + } + + // Disconnect and cleanup + client.disconnect().await; + } + Err(e) => { + tracing::warn!( + "Self-subscriber failed to connect to {}: {}", + own_relay_url, + e + ); + } } - connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; - }); + // Wait before reconnecting with exponential backoff + tracing::debug!( + "Self-subscriber will reconnect to {} in {:?}", + own_relay_url, + reconnect_delay + ); + tokio::time::sleep(reconnect_delay).await; + reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); + } + } + + /// Connect to our own relay for self-subscribing + async fn connect_self_subscriber( + url: &str, + ) -> Result> { + let client = Client::default(); + client.add_relay(url).await?; + client.connect().await; + + // Wait for connection to establish (with timeout) + let timeout = Duration::from_secs(10); + let start = std::time::Instant::now(); + + while start.elapsed() < timeout { + let relays = client.relays().await; + if relays.values().any(|r| r.is_connected()) { + return Ok(client); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + Err("Timeout waiting for self-subscriber connection".into()) } - /// Spawn a connection task for a relay without jitter + /// Handle notifications from the self-subscriber client /// - /// Used for relays discovered during runtime (not at startup). + /// Processes announcement events to discover new relay URLs. + /// Applies jitter before spawning connections to prevent thundering herd. + #[allow(clippy::too_many_arguments)] + async fn handle_self_subscriber_notifications( + client: &Client, + own_relay_url: &str, + relay_domain: &str, + tx: &mpsc::Sender, + filter_service: &Arc, + active_relays: &Arc>>, + metrics: &Option, + health_tracker: &Arc, + startup_jitter_ms: u64, + ) { + let own_relay_url = own_relay_url.to_string(); + let relay_domain = relay_domain.to_string(); + let filter_service = filter_service.clone(); + let active_relays = active_relays.clone(); + let metrics = metrics.clone(); + let health_tracker = health_tracker.clone(); + let tx = tx.clone(); + + client + .handle_notifications(|notification| { + let own_relay_url = own_relay_url.clone(); + let relay_domain = relay_domain.clone(); + let filter_service = filter_service.clone(); + let active_relays = active_relays.clone(); + let metrics = metrics.clone(); + let health_tracker = health_tracker.clone(); + let tx = tx.clone(); + + async move { + match notification { + RelayPoolNotification::Event { event, .. } => { + // Only process repository announcement events + if event.kind.as_u16() != KIND_REPOSITORY_ANNOUNCEMENT { + return Ok(false); + } + + tracing::debug!( + "Self-subscriber received announcement {} from {}", + event.id, + own_relay_url + ); + + // Extract relay URLs from the announcement + let new_urls = filter_service.extract_relay_urls_from_event(&event); + + for url in new_urls { + // Check if we should connect to this relay + let should_connect = { + let mut active = active_relays.lock().await; + let is_new = !active.contains(&url); + let is_not_self = !url.contains(&relay_domain); + + if is_new && is_not_self { + active.insert(url.clone()); + true + } else { + false + } + }; + + if should_connect { + tracing::info!( + "Self-subscriber discovered new relay from announcement {}, scheduling connection: {}", + event.id, + url + ); + + // Update tracked relay count + if let Some(ref m) = metrics { + m.inc_tracked_count(); + } + + // Spawn connection to the new relay WITH jitter at point of discovery + let url_clone = url.clone(); + let tx_clone = tx.clone(); + let filter_service_clone = filter_service.clone(); + let domain_clone = relay_domain.clone(); + let health_tracker_clone = health_tracker.clone(); + let metrics_clone = metrics.clone(); + + tokio::spawn(async move { + // Apply jitter at point of discovery + if startup_jitter_ms > 0 { + let jitter_ms = rand::thread_rng().gen_range(0..startup_jitter_ms); + tracing::debug!( + "Applying {}ms jitter before connecting to discovered relay {}", + jitter_ms, + url_clone + ); + tokio::time::sleep(Duration::from_millis(jitter_ms)).await; + } + + connect_with_retry( + &url_clone, + tx_clone, + filter_service_clone, + &domain_clone, + health_tracker_clone, + metrics_clone, + ) + .await; + }); + } + } + + Ok(false) // Continue processing + } + RelayPoolNotification::Shutdown => { + tracing::warn!( + "Self-subscriber connection shutdown for {}", + own_relay_url + ); + Ok(true) // Stop on shutdown + } + RelayPoolNotification::Message { .. } => { + Ok(false) // Continue processing + } + } + } + }) + .await + .ok(); + } + + /// Spawn a connection task for a relay + /// + /// # Arguments + /// * `url` - Relay URL to connect to + /// * `tx` - Channel sender for synced events + /// * `filter_service` - Filter service for subscriptions + /// * `apply_jitter` - Whether to apply startup jitter before connecting fn spawn_connection( &self, url: String, tx: mpsc::Sender, filter_service: Arc, + apply_jitter: bool, ) { let domain = self.relay_domain.clone(); let health_tracker = self.health_tracker.clone(); let metrics = self.metrics.clone(); + let max_jitter = self.startup_jitter_ms; tokio::spawn(async move { + // Apply startup jitter if requested + if apply_jitter && max_jitter > 0 { + let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); + tracing::debug!( + "Applying {}ms jitter before connecting to {}", + jitter_ms, + url + ); + tokio::time::sleep(Duration::from_millis(jitter_ms)).await; + } + connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; }); } @@ -351,7 +665,10 @@ impl SyncManager { } // Validate through write policy using sync_source_addr derived from config - let result = self.write_policy.admit_event(event, &self.sync_source_addr).await; + let result = self + .write_policy + .admit_event(event, &self.sync_source_addr) + .await; match result { PolicyResult::Accept => { @@ -386,12 +703,16 @@ impl SyncManager { /// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" /// - "wss://relay.example.com" -> "relay.example.com" fn extract_domain_from_url(url: &str) -> Option { - let url = url.trim_start_matches("ws://").trim_start_matches("wss://"); - let url = url.trim_start_matches("http://").trim_start_matches("https://"); - + let url = url + .trim_start_matches("ws://") + .trim_start_matches("wss://"); + let url = url + .trim_start_matches("http://") + .trim_start_matches("https://"); + // Remove path let domain = url.split('/').next()?; - + Some(domain.to_string()) } @@ -430,4 +751,12 @@ mod tests { Some("example.com:3000".to_string()) ); } + + #[test] + fn test_derive_own_relay_url() { + assert_eq!( + derive_own_relay_url("127.0.0.1:8080"), + "ws://127.0.0.1:8080".to_string() + ); + } } \ No newline at end of file -- cgit v1.2.3