From 39e782b12fce1776f2ad0b0f5430749533cb80ea Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 11:07:50 +0000 Subject: sync v4 mvp --- src/sync/mod.rs | 301 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 292 insertions(+), 9 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c1f8bca..fb09896 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -12,6 +12,20 @@ //! //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. +pub mod algorithms; +pub mod filters; +pub mod relay_connection; +pub mod self_subscriber; + +// Re-export core algorithm types +pub use algorithms::{AddFilters, RelaySyncNeeds}; + +// Re-export relay connection types +pub use relay_connection::{RelayConnection, RelayEvent}; + +// Re-export self-subscriber types +pub use self_subscriber::{RelayAction, SelfSubscriber}; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -355,21 +369,290 @@ impl SyncManager { } } - /// Run the sync manager (placeholder for Phase 1) + /// Run the sync manager /// - /// This will be implemented in later phases to: - /// 1. Subscribe to local relay for 30617 events - /// 2. Process events to build RepoSyncIndex - /// 3. Compute and execute sync actions - /// 4. Handle reconnection and catch-up logic + /// Coordinates all sync components: + /// 1. Spawns self-subscriber to monitor own relay for announcements + /// 2. Connects to bootstrap relay if configured + /// 3. Handles relay actions from self-subscriber pub async fn run(self) { + use tokio::sync::mpsc; + tracing::info!( bootstrap_relay = ?self.bootstrap_relay_url, service_domain = %self.service_domain, - "SyncManager starting (placeholder - not yet implemented)" + "SyncManager starting" + ); + + // 1. Create action channel for self-subscriber -> manager communication + let (action_tx, mut action_rx) = mpsc::channel::(100); + + // 2. Spawn self-subscriber + let self_subscriber = SelfSubscriber::new( + format!("ws://{}", self.service_domain), + self.service_domain.clone(), + Arc::clone(&self.repo_sync_index), + action_tx, ); + tokio::spawn(async move { self_subscriber.run().await }); + + // 3. Connect to bootstrap relay if configured + if let Some(ref bootstrap_url) = self.bootstrap_relay_url { + self.spawn_relay_connection(bootstrap_url.clone()).await; + } - // Phase 1: Just log and return - // Full implementation will be added in subsequent phases + // 4. Main loop - handle actions from self-subscriber + loop { + tokio::select! { + action = action_rx.recv() => { + match action { + Some(RelayAction::SpawnRelay { relay_url, repos }) => { + // Check if relay already exists + let relay_index = self.relay_sync_index.read().await; + let exists = relay_index.contains_key(&relay_url); + drop(relay_index); + + if !exists { + tracing::info!(relay = %relay_url, "Spawning new relay connection"); + self.spawn_relay_with_layer2(relay_url, repos).await; + } else { + tracing::debug!( + relay = %relay_url, + "Relay already exists, considering AddFilters" + ); + // For MVP, we don't handle AddFilters - just log + // Full implementation would call subscribe_filters on existing connection + } + } + Some(RelayAction::AddFilters { relay_url, repos }) => { + tracing::debug!( + relay = %relay_url, + repo_count = repos.len(), + "AddFilters action (MVP: not implemented)" + ); + // For MVP, not implemented - full version would add Layer 2 filters + // to existing relay connection + } + None => break, + } + } + } + } + } + + /// Spawn relay connection with Layer 2 filters for specific repos + /// + /// Used when discovering relays from announcements. Connects to the relay, + /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the + /// specific repos we want to sync. + async fn spawn_relay_with_layer2( + &self, + relay_url: String, + repos: HashMap>, + ) { + use crate::sync::filters::build_layer2_and_layer3_filters; + use tokio::sync::mpsc; + + let database = Arc::clone(&self.database); + let write_policy = self.write_policy.clone(); + let relay_sync_index = Arc::clone(&self.relay_sync_index); + + // Create relay connection + let connection = RelayConnection::new(relay_url.clone()); + + // Connect and subscribe to Layer 1 (announcements) + if let Err(e) = connection.connect_and_subscribe(None).await { + tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); + return; + } + + // Mark as connected in relay sync index + { + let mut index = relay_sync_index.write().await; + index.insert( + relay_url.clone(), + RelayState { + repos: repos.keys().cloned().collect(), + root_events: repos.values().flatten().cloned().collect(), + is_bootstrap: false, + connection_status: ConnectionStatus::Connected, + last_connected: Some(Timestamp::now()), + disconnected_at: None, + }, + ); + } + + // Subscribe to Layer 2+3 filters for the repos + let repo_ids: HashSet = repos.keys().cloned().collect(); + let root_events: HashSet = repos.values().flatten().cloned().collect(); + let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None); + + for filter in filters { + if let Err(e) = connection.subscribe_filter(filter).await { + tracing::error!( + relay = %relay_url, + error = %e, + "Failed to subscribe to Layer 2 filter" + ); + } + } + + tracing::info!( + relay = %relay_url, + repo_count = repos.len(), + "Connected to discovered relay with Layer 2+3 filters" + ); + + // Create event channel + let (event_tx, mut event_rx) = mpsc::channel::(1000); + + // Spawn event loop + tokio::spawn(async move { + connection.run_event_loop(event_tx).await; + }); + + // Spawn event processor + let relay_url_clone = relay_url.clone(); + tokio::spawn(async move { + while let Some(relay_event) = event_rx.recv().await { + match relay_event { + RelayEvent::Event(event) => { + Self::process_event_static( + &event, + &relay_url_clone, + &database, + &write_policy, + ) + .await; + } + RelayEvent::EndOfStoredEvents(_) => { + tracing::debug!(relay = %relay_url_clone, "EOSE received"); + } + RelayEvent::Closed(_) | RelayEvent::Shutdown => { + tracing::info!(relay = %relay_url_clone, "Relay disconnected"); + break; + } + } + } + }); + } + + /// Spawn a relay connection and start its event loop + async fn spawn_relay_connection(&self, relay_url: String) { + use tokio::sync::mpsc; + + let database = Arc::clone(&self.database); + let write_policy = self.write_policy.clone(); + let relay_sync_index = Arc::clone(&self.relay_sync_index); + + // Create relay connection + let connection = RelayConnection::new(relay_url.clone()); + + // Connect and subscribe to Layer 1 + if let Err(e) = connection.connect_and_subscribe(None).await { + tracing::error!("Failed to connect to relay {}: {}", relay_url, e); + return; + } + + // Mark as connected in relay sync index + { + let mut index = relay_sync_index.write().await; + index.insert( + relay_url.clone(), + RelayState { + repos: HashSet::new(), + root_events: HashSet::new(), + is_bootstrap: true, + connection_status: ConnectionStatus::Connected, + last_connected: Some(Timestamp::now()), + disconnected_at: None, + }, + ); + } + + // Create event channel + let (event_tx, mut event_rx) = mpsc::channel::(1000); + + // Spawn event loop + tokio::spawn(async move { + connection.run_event_loop(event_tx).await; + }); + + // Spawn event processor + let relay_url_clone = relay_url.clone(); + tokio::spawn(async move { + while let Some(relay_event) = event_rx.recv().await { + match relay_event { + RelayEvent::Event(event) => { + Self::process_event_static(&event, &relay_url_clone, &database, &write_policy) + .await; + } + RelayEvent::EndOfStoredEvents(_) => { + tracing::debug!("EOSE from {}", relay_url_clone); + } + RelayEvent::Closed(_) | RelayEvent::Shutdown => { + tracing::info!("Relay {} disconnected", relay_url_clone); + break; + } + } + } + }); + } + + /// Process a single event from a relay (static version for spawned tasks) + async fn process_event_static( + event: &Event, + relay_url: &str, + database: &SharedDatabase, + write_policy: &Nip34WritePolicy, + ) { + use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + // Check if event already exists + match database.event_by_id(&event.id).await { + Ok(Some(_)) => { + tracing::trace!(event_id = %event.id, "Event already exists, skipping"); + return; + } + Err(e) => { + tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); + return; + } + Ok(None) => {} // Continue processing + } + + // Apply write policy using a dummy address (sync events aren't from network clients) + let dummy_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); + let result = write_policy.admit_event(event, &dummy_addr).await; + + match result { + PolicyResult::Accept => { + // Save event + if let Err(e) = database.save_event(event).await { + tracing::error!( + event_id = %event.id, + relay = %relay_url, + error = %e, + "Failed to save synced event" + ); + } else { + tracing::debug!( + event_id = %event.id, + relay = %relay_url, + kind = %event.kind.as_u16(), + "Saved synced event" + ); + } + } + PolicyResult::Reject(reason) => { + tracing::debug!( + event_id = %event.id, + relay = %relay_url, + reason = %reason, + "Event rejected by write policy" + ); + } + } } } \ No newline at end of file -- cgit v1.2.3