From 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 09:07:54 +0000 Subject: improve sync design --- src/sync/mod.rs | 335 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 304 insertions(+), 31 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 71f91e2..9dec982 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -37,17 +37,26 @@ //! for the complete design context. use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; use std::sync::Arc; -use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; +use nostr_relay_builder::prelude::{ + DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy, +}; +use nostr_sdk::prelude::*; use nostr_sdk::EventId; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, RwLock}; use crate::config::Config; use crate::nostr::builder::Nip34WritePolicy; use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; use crate::nostr::SharedDatabase; +mod relay_connection; +mod self_subscriber; +pub use relay_connection::{RelayConnection, RelayEvent}; +pub use self_subscriber::{RelayAction, SelfSubscriber}; + // ============================================================================= // Type Aliases for Sync State // ============================================================================= @@ -176,7 +185,7 @@ pub fn new_sync_relays() -> SyncRelays { /// The SyncManager is responsible for: /// - Discovering relays from stored repository announcements /// - Maintaining connections to sync relays -/// - Subscribing to events at external relays +/// - Subscribing to events at external relays /// - Applying the acceptance policy to synced events /// /// ## Lifecycle @@ -186,14 +195,16 @@ pub fn new_sync_relays() -> SyncRelays { /// /// ## Current Status /// -/// This is a stub implementation. The core data structures are: +/// Phase 2 implementation supports: +/// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter +/// - Event processing through write policy +/// - Storage of accepted events +/// +/// Core data structures: /// - [`FollowingRepoRootEvents`]: Repository root events we're following /// - [`SyncRelays`]: Relays we sync from with their repos and events -/// -/// Full implementation will come in later phases. pub struct SyncManager { /// Bootstrap relay URL if configured - #[allow(dead_code)] bootstrap_relay_url: Option, /// Our service domain for filtering repo announcements @@ -201,11 +212,9 @@ pub struct SyncManager { service_domain: String, /// Database for querying/storing events - #[allow(dead_code)] database: SharedDatabase, /// Write policy for applying acceptance rules - #[allow(dead_code)] write_policy: Nip34WritePolicy, /// Repository root events we're following (Phase 1 data structure) @@ -219,6 +228,9 @@ pub struct SyncManager { /// Max backoff duration for relay reconnection #[allow(dead_code)] max_backoff_secs: u64, + + /// Socket address used for sync source (for write policy) + sync_source_addr: SocketAddr, } impl SyncManager { @@ -238,6 +250,10 @@ impl SyncManager { write_policy: Nip34WritePolicy, config: &Config, ) -> Self { + // Create a synthetic SocketAddr for sync source identification + // This is used when calling write_policy.admit_event() for synced events + let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + Self { bootstrap_relay_url, service_domain, @@ -246,6 +262,7 @@ impl SyncManager { following_repo_root_events: new_following_repo_root_events(), sync_relays: new_sync_relays(), max_backoff_secs: config.sync_max_backoff_secs, + sync_source_addr, } } @@ -460,14 +477,12 @@ impl SyncManager { /// }); /// ``` /// - /// ## Current Status + /// ## Implementation Status /// - /// This is a stub that logs and then waits indefinitely. - /// Full implementation includes: - /// - Phase 2: Database initialization queries ✓ - /// - Phase 3: Self-subscription for incremental updates - /// - Phase 4-6: Filter building, connection management - /// - Phase 7: Full sync loop + /// - Phase 2: Layer 1 sync from bootstrap relay ✓ + /// - Phase 3: Self-subscription and relay discovery ✓ + /// - Phase 4-6: Filter building, connection management (TODO) + /// - Phase 7: Full sync loop (TODO) pub async fn run(self) { tracing::info!( "SyncManager starting (bootstrap_relay={:?}, domain={})", @@ -475,27 +490,285 @@ impl SyncManager { self.service_domain ); - // Phase 2: Initialize from database + // Phase 3: Initialize state from database BEFORE spawning connections if let Err(e) = self.initialize_from_database().await { - tracing::error!("Failed to initialize sync state from database: {}", e); - // Continue anyway - we can still receive events via self-subscription + tracing::error!("Failed to initialize from database: {}", e); + // Continue anyway - we can still sync from bootstrap } - // Log initialization results - { - let following_count = self.following_repo_root_events.read().await.len(); - let sync_relays_count = self.sync_relays.read().await.len(); - tracing::info!( - "Sync state initialized: {} repos tracked, {} sync relays", - following_count, - sync_relays_count - ); + // Create channel for relay actions from self-subscriber + let (action_tx, mut action_rx) = mpsc::channel::(100); + + // Construct our own relay URL for self-subscription + let own_relay_url = format!("ws://{}", self.service_domain); + + // Spawn self-subscriber task + let self_subscriber = SelfSubscriber::new( + own_relay_url.clone(), + self.service_domain.clone(), + Arc::clone(&self.following_repo_root_events), + Arc::clone(&self.sync_relays), + action_tx, + ); + + tokio::spawn(async move { + self_subscriber.run().await; + }); + + tracing::info!("SelfSubscriber spawned for {}", own_relay_url); + + // Track active relay connections (relay_url -> event_sender) + let mut active_relays: HashMap> = HashMap::new(); + + // Phase 2: Connect to bootstrap relay if configured + if let Some(ref bootstrap_url) = self.bootstrap_relay_url { + if let Some(event_tx) = self + .spawn_relay_connection(bootstrap_url.clone(), None) + .await + { + active_relays.insert(bootstrap_url.clone(), event_tx); + } } - // Stub: wait indefinitely until full implementation (Phases 3-7) - // This prevents the spawned task from immediately completing + // Main coordination loop loop { - tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + tokio::select! { + // Handle relay actions from self-subscriber + action = action_rx.recv() => { + match action { + Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => { + tracing::info!("Spawning new relay connection to {}", relay_url); + if !active_relays.contains_key(&relay_url) { + if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await { + active_relays.insert(relay_url, event_tx); + } + } + } + Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => { + tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len()); + // TODO: Implement filter updates for existing connections + } + None => { + tracing::info!("Action channel closed, continuing without self-subscriber"); + } + } + } + // Sleep to prevent busy loop when no events + _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { + // Periodic maintenance could go here + } + } + } + } + + /// Spawn a relay connection with optional Layer 2 filters. + /// + /// Returns the event sender channel if successfully spawned. + async fn spawn_relay_connection( + &self, + relay_url: String, + repos: Option>>, + ) -> Option> { + // Create channel for receiving events + let (event_tx, event_rx) = mpsc::channel::(100); + + // Create connection + let connection = RelayConnection::new(relay_url.clone()); + + // Determine if this is bootstrap (no repos) or discovered relay (with repos) + let is_bootstrap = repos.is_none(); + + match connection.connect_and_subscribe().await { + Ok(()) => { + if is_bootstrap { + tracing::info!("Bootstrap relay connection established: {}", relay_url); + } else { + tracing::info!( + "Discovered relay connection established: {} (with Layer 2 filters)", + relay_url + ); + + // Add Layer 2 subscription for repo events + if let Some(ref repos) = repos { + if let Err(e) = self.add_layer2_subscription(&connection, repos).await { + tracing::warn!("Failed to add Layer 2 subscription: {}", e); + } + } + } + + // Clone refs needed for event processing task + let database = Arc::clone(&self.database); + let write_policy = self.write_policy.clone(); + let sync_source_addr = self.sync_source_addr; + + // Clone event_tx for the spawned task + let event_tx_clone = event_tx.clone(); + + // Spawn event loop task + let conn_url = relay_url.clone(); + tokio::spawn(async move { + connection.run_event_loop(event_tx_clone).await; + }); + + // Spawn event processing task + tokio::spawn(async move { + Self::process_relay_events( + event_rx, + database, + write_policy, + sync_source_addr, + conn_url, + ) + .await; + }); + + Some(event_tx) + } + Err(e) => { + tracing::error!("Failed to connect to relay {}: {}", relay_url, e); + None + } + } + } + + /// Add Layer 2 subscription for repo-related events. + /// + /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track. + async fn add_layer2_subscription( + &self, + connection: &RelayConnection, + repos: &HashMap>, + ) -> Result<(), String> { + if repos.is_empty() { + return Ok(()); + } + + // Build repo refs list for filter + let repo_refs: Vec = repos.keys().cloned().collect(); + + tracing::debug!( + "Adding Layer 2 subscription for {} repos to {}", + repo_refs.len(), + connection.url() + ); + + // Chunk repo_refs into groups of 100 (per plan) + for chunk in repo_refs.chunks(100) { + // Build filter with lowercase 'a' tag for each repo ref + let mut filter = Filter::new().kinds([ + Kind::GitPatch, // 1617 + Kind::Custom(1618), // PR + Kind::Custom(1619), // PR update + Kind::GitIssue, // 1621 + ]); + + // Add each repo ref as a custom tag filter + for repo_ref in chunk { + filter = + filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone()); + } + + // Subscribe to this filter + if let Err(e) = connection.subscribe_filter(filter).await { + return Err(format!("Failed to subscribe with Layer 2 filter: {}", e)); + } + } + + Ok(()) + } + + /// Process events from a single relay connection. + /// + /// This is a static method that runs in its own task. + async fn process_relay_events( + mut event_rx: mpsc::Receiver, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + sync_source_addr: SocketAddr, + relay_url: String, + ) { + tracing::debug!("Starting event processing for relay: {}", relay_url); + + while let Some(relay_event) = event_rx.recv().await { + match relay_event { + RelayEvent::Event(event) => { + Self::process_single_event_static( + &event, + &database, + &write_policy, + &sync_source_addr, + &relay_url, + ) + .await; + } + RelayEvent::EndOfStoredEvents => { + tracing::debug!("EOSE received from {}", relay_url); + } + RelayEvent::Closed(reason) => { + tracing::warn!("Connection to {} closed: {}", relay_url, reason); + break; + } + } + } + + tracing::info!("Event processing ended for relay: {}", relay_url); + } + + /// Process a single event (static version for use in spawned tasks). + async fn process_single_event_static( + event: &Event, + database: &SharedDatabase, + write_policy: &Nip34WritePolicy, + sync_source_addr: &SocketAddr, + relay_url: &str, + ) { + let event_id = event.id; + let kind = event.kind.as_u16(); + + // Check if event already exists in database + match database.check_id(&event_id).await { + Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => { + tracing::trace!("Event {} already exists, skipping", event_id); + return; + } + Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing + Err(e) => { + tracing::warn!("Failed to check if event {} exists: {}", event_id, e); + } + } + + // Pass through write policy + let policy_result = write_policy.admit_event(event, sync_source_addr).await; + + match policy_result { + PolicyResult::Accept => match database.save_event(event).await { + Ok(SaveEventStatus::Success) => { + tracing::info!( + "Synced event {} (kind {}) from {}", + event_id, + kind, + relay_url + ); + } + Ok(_) => { + tracing::debug!( + "Event {} (kind {}) already stored or rejected by database", + event_id, + kind + ); + } + Err(e) => { + tracing::error!("Failed to save synced event {}: {}", event_id, e); + } + }, + PolicyResult::Reject(reason) => { + tracing::debug!( + "Rejected synced event {} (kind {}): {}", + event_id, + kind, + reason + ); + } } } } -- cgit v1.2.3