From 466a009d8248aab274a9da419e4c0d83a4b9f466 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 21:24:21 +0000 Subject: feat(sync): broadcast synced events to WebSocket subscribers Enable recursive relay discovery by broadcasting synced events to WebSocket subscribers via LocalRelay.notify_event(). This allows the SelfSubscriber to receive 30617 announcements synced from external relays and discover additional relay URLs to connect to. Changes: - Pass LocalRelay to SyncManager::new() from main.rs - Add local_relay field to SyncManager struct - Call notify_event() after saving synced events to database - Enable test_recursive_relay_discovery_syncs_announcement test The test verifies that when relay_a syncs announcement_x from bootstrap relay_b (which lists relay_c), relay_a discovers and connects to relay_c to sync announcement_y. Fixes recursive relay discovery from bootstrap sync. --- src/sync/mod.rs | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index b38c4a9..1e60e4a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -40,6 +40,7 @@ use tokio::sync::{broadcast, Mutex, RwLock}; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; +use nostr_relay_builder::prelude::LocalRelay; // ============================================================================= // Type Aliases for Index Structures @@ -467,6 +468,8 @@ pub struct SyncManager { database: SharedDatabase, /// Write policy for validating incoming events write_policy: Nip34WritePolicy, + /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) + local_relay: LocalRelay, /// Configuration reference for sync settings config: Config, /// What we want to sync (source of truth) @@ -499,12 +502,14 @@ impl SyncManager { /// * `service_domain` - The domain this relay serves (for filtering repos) /// * `database` - Shared database for event storage /// * `write_policy` - Policy for validating events before storage + /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) /// * `config` - Configuration for sync settings pub fn new( bootstrap_relay_url: Option, service_domain: String, database: SharedDatabase, write_policy: Nip34WritePolicy, + local_relay: LocalRelay, config: &Config, ) -> Self { Self { @@ -512,6 +517,7 @@ impl SyncManager { service_domain, database, write_policy, + local_relay, config: config.clone(), repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), @@ -1269,6 +1275,7 @@ impl SyncManager { let database = Arc::clone(&self.database); let write_policy = self.write_policy.clone(); + let local_relay = self.local_relay.clone(); let relay_sync_index = Arc::clone(&self.relay_sync_index); // Check if this is a bootstrap relay @@ -1340,6 +1347,7 @@ impl SyncManager { &relay_url_clone, &database, &write_policy, + &local_relay, ) .await; } @@ -1393,11 +1401,18 @@ impl SyncManager { } /// Process a single event from a relay (static version for spawned tasks) + /// + /// Processes events with dedup, policy check, database save, and broadcast: + /// - Deduplication (skips if event already exists) + /// - Write policy validation + /// - Database save + /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery) async fn process_event_static( event: &Event, relay_url: &str, database: &SharedDatabase, write_policy: &Nip34WritePolicy, + local_relay: &LocalRelay, ) { use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -1421,7 +1436,7 @@ impl SyncManager { match result { PolicyResult::Accept => { - // Save event + // Save event to database if let Err(e) = database.save_event(event).await { tracing::error!( event_id = %event.id, @@ -1429,14 +1444,20 @@ impl SyncManager { error = %e, "Failed to save synced event" ); - } else { - tracing::debug!( - event_id = %event.id, - relay = %relay_url, - kind = %event.kind.as_u16(), - "Saved synced event" - ); + return; } + + // Broadcast to WebSocket subscribers (enables recursive relay discovery) + // This allows SelfSubscriber to receive synced 30617 announcements + let broadcast_success = local_relay.notify_event(event.clone()); + + tracing::debug!( + event_id = %event.id, + relay = %relay_url, + kind = %event.kind.as_u16(), + broadcast = broadcast_success, + "Synced event saved and broadcast" + ); } PolicyResult::Reject(reason) => { tracing::debug!( -- cgit v1.2.3