From 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 09:07:54 +0000 Subject: improve sync design --- src/sync/mod.rs | 335 ++++++++++++++++++++++++++--- src/sync/relay_connection.rs | 185 ++++++++++++++++ src/sync/self_subscriber.rs | 497 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 986 insertions(+), 31 deletions(-) create mode 100644 src/sync/relay_connection.rs create mode 100644 src/sync/self_subscriber.rs (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 71f91e2..9dec982 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -37,17 +37,26 @@ //! for the complete design context. use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; use std::sync::Arc; -use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; +use nostr_relay_builder::prelude::{ + DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy, +}; +use nostr_sdk::prelude::*; use nostr_sdk::EventId; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, RwLock}; use crate::config::Config; use crate::nostr::builder::Nip34WritePolicy; use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; use crate::nostr::SharedDatabase; +mod relay_connection; +mod self_subscriber; +pub use relay_connection::{RelayConnection, RelayEvent}; +pub use self_subscriber::{RelayAction, SelfSubscriber}; + // ============================================================================= // Type Aliases for Sync State // ============================================================================= @@ -176,7 +185,7 @@ pub fn new_sync_relays() -> SyncRelays { /// The SyncManager is responsible for: /// - Discovering relays from stored repository announcements /// - Maintaining connections to sync relays -/// - Subscribing to events at external relays +/// - Subscribing to events at external relays /// - Applying the acceptance policy to synced events /// /// ## Lifecycle @@ -186,14 +195,16 @@ pub fn new_sync_relays() -> SyncRelays { /// /// ## Current Status /// -/// This is a stub implementation. The core data structures are: +/// Phase 2 implementation supports: +/// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter +/// - Event processing through write policy +/// - Storage of accepted events +/// +/// Core data structures: /// - [`FollowingRepoRootEvents`]: Repository root events we're following /// - [`SyncRelays`]: Relays we sync from with their repos and events -/// -/// Full implementation will come in later phases. pub struct SyncManager { /// Bootstrap relay URL if configured - #[allow(dead_code)] bootstrap_relay_url: Option, /// Our service domain for filtering repo announcements @@ -201,11 +212,9 @@ pub struct SyncManager { service_domain: String, /// Database for querying/storing events - #[allow(dead_code)] database: SharedDatabase, /// Write policy for applying acceptance rules - #[allow(dead_code)] write_policy: Nip34WritePolicy, /// Repository root events we're following (Phase 1 data structure) @@ -219,6 +228,9 @@ pub struct SyncManager { /// Max backoff duration for relay reconnection #[allow(dead_code)] max_backoff_secs: u64, + + /// Socket address used for sync source (for write policy) + sync_source_addr: SocketAddr, } impl SyncManager { @@ -238,6 +250,10 @@ impl SyncManager { write_policy: Nip34WritePolicy, config: &Config, ) -> Self { + // Create a synthetic SocketAddr for sync source identification + // This is used when calling write_policy.admit_event() for synced events + let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + Self { bootstrap_relay_url, service_domain, @@ -246,6 +262,7 @@ impl SyncManager { following_repo_root_events: new_following_repo_root_events(), sync_relays: new_sync_relays(), max_backoff_secs: config.sync_max_backoff_secs, + sync_source_addr, } } @@ -460,14 +477,12 @@ impl SyncManager { /// }); /// ``` /// - /// ## Current Status + /// ## Implementation Status /// - /// This is a stub that logs and then waits indefinitely. - /// Full implementation includes: - /// - Phase 2: Database initialization queries ✓ - /// - Phase 3: Self-subscription for incremental updates - /// - Phase 4-6: Filter building, connection management - /// - Phase 7: Full sync loop + /// - Phase 2: Layer 1 sync from bootstrap relay ✓ + /// - Phase 3: Self-subscription and relay discovery ✓ + /// - Phase 4-6: Filter building, connection management (TODO) + /// - Phase 7: Full sync loop (TODO) pub async fn run(self) { tracing::info!( "SyncManager starting (bootstrap_relay={:?}, domain={})", @@ -475,27 +490,285 @@ impl SyncManager { self.service_domain ); - // Phase 2: Initialize from database + // Phase 3: Initialize state from database BEFORE spawning connections if let Err(e) = self.initialize_from_database().await { - tracing::error!("Failed to initialize sync state from database: {}", e); - // Continue anyway - we can still receive events via self-subscription + tracing::error!("Failed to initialize from database: {}", e); + // Continue anyway - we can still sync from bootstrap } - // Log initialization results - { - let following_count = self.following_repo_root_events.read().await.len(); - let sync_relays_count = self.sync_relays.read().await.len(); - tracing::info!( - "Sync state initialized: {} repos tracked, {} sync relays", - following_count, - sync_relays_count - ); + // Create channel for relay actions from self-subscriber + let (action_tx, mut action_rx) = mpsc::channel::(100); + + // Construct our own relay URL for self-subscription + let own_relay_url = format!("ws://{}", self.service_domain); + + // Spawn self-subscriber task + let self_subscriber = SelfSubscriber::new( + own_relay_url.clone(), + self.service_domain.clone(), + Arc::clone(&self.following_repo_root_events), + Arc::clone(&self.sync_relays), + action_tx, + ); + + tokio::spawn(async move { + self_subscriber.run().await; + }); + + tracing::info!("SelfSubscriber spawned for {}", own_relay_url); + + // Track active relay connections (relay_url -> event_sender) + let mut active_relays: HashMap> = HashMap::new(); + + // Phase 2: Connect to bootstrap relay if configured + if let Some(ref bootstrap_url) = self.bootstrap_relay_url { + if let Some(event_tx) = self + .spawn_relay_connection(bootstrap_url.clone(), None) + .await + { + active_relays.insert(bootstrap_url.clone(), event_tx); + } } - // Stub: wait indefinitely until full implementation (Phases 3-7) - // This prevents the spawned task from immediately completing + // Main coordination loop loop { - tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + tokio::select! { + // Handle relay actions from self-subscriber + action = action_rx.recv() => { + match action { + Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => { + tracing::info!("Spawning new relay connection to {}", relay_url); + if !active_relays.contains_key(&relay_url) { + if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await { + active_relays.insert(relay_url, event_tx); + } + } + } + Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => { + tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len()); + // TODO: Implement filter updates for existing connections + } + None => { + tracing::info!("Action channel closed, continuing without self-subscriber"); + } + } + } + // Sleep to prevent busy loop when no events + _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { + // Periodic maintenance could go here + } + } + } + } + + /// Spawn a relay connection with optional Layer 2 filters. + /// + /// Returns the event sender channel if successfully spawned. + async fn spawn_relay_connection( + &self, + relay_url: String, + repos: Option>>, + ) -> Option> { + // Create channel for receiving events + let (event_tx, event_rx) = mpsc::channel::(100); + + // Create connection + let connection = RelayConnection::new(relay_url.clone()); + + // Determine if this is bootstrap (no repos) or discovered relay (with repos) + let is_bootstrap = repos.is_none(); + + match connection.connect_and_subscribe().await { + Ok(()) => { + if is_bootstrap { + tracing::info!("Bootstrap relay connection established: {}", relay_url); + } else { + tracing::info!( + "Discovered relay connection established: {} (with Layer 2 filters)", + relay_url + ); + + // Add Layer 2 subscription for repo events + if let Some(ref repos) = repos { + if let Err(e) = self.add_layer2_subscription(&connection, repos).await { + tracing::warn!("Failed to add Layer 2 subscription: {}", e); + } + } + } + + // Clone refs needed for event processing task + let database = Arc::clone(&self.database); + let write_policy = self.write_policy.clone(); + let sync_source_addr = self.sync_source_addr; + + // Clone event_tx for the spawned task + let event_tx_clone = event_tx.clone(); + + // Spawn event loop task + let conn_url = relay_url.clone(); + tokio::spawn(async move { + connection.run_event_loop(event_tx_clone).await; + }); + + // Spawn event processing task + tokio::spawn(async move { + Self::process_relay_events( + event_rx, + database, + write_policy, + sync_source_addr, + conn_url, + ) + .await; + }); + + Some(event_tx) + } + Err(e) => { + tracing::error!("Failed to connect to relay {}: {}", relay_url, e); + None + } + } + } + + /// Add Layer 2 subscription for repo-related events. + /// + /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track. + async fn add_layer2_subscription( + &self, + connection: &RelayConnection, + repos: &HashMap>, + ) -> Result<(), String> { + if repos.is_empty() { + return Ok(()); + } + + // Build repo refs list for filter + let repo_refs: Vec = repos.keys().cloned().collect(); + + tracing::debug!( + "Adding Layer 2 subscription for {} repos to {}", + repo_refs.len(), + connection.url() + ); + + // Chunk repo_refs into groups of 100 (per plan) + for chunk in repo_refs.chunks(100) { + // Build filter with lowercase 'a' tag for each repo ref + let mut filter = Filter::new().kinds([ + Kind::GitPatch, // 1617 + Kind::Custom(1618), // PR + Kind::Custom(1619), // PR update + Kind::GitIssue, // 1621 + ]); + + // Add each repo ref as a custom tag filter + for repo_ref in chunk { + filter = + filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone()); + } + + // Subscribe to this filter + if let Err(e) = connection.subscribe_filter(filter).await { + return Err(format!("Failed to subscribe with Layer 2 filter: {}", e)); + } + } + + Ok(()) + } + + /// Process events from a single relay connection. + /// + /// This is a static method that runs in its own task. + async fn process_relay_events( + mut event_rx: mpsc::Receiver, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + sync_source_addr: SocketAddr, + relay_url: String, + ) { + tracing::debug!("Starting event processing for relay: {}", relay_url); + + while let Some(relay_event) = event_rx.recv().await { + match relay_event { + RelayEvent::Event(event) => { + Self::process_single_event_static( + &event, + &database, + &write_policy, + &sync_source_addr, + &relay_url, + ) + .await; + } + RelayEvent::EndOfStoredEvents => { + tracing::debug!("EOSE received from {}", relay_url); + } + RelayEvent::Closed(reason) => { + tracing::warn!("Connection to {} closed: {}", relay_url, reason); + break; + } + } + } + + tracing::info!("Event processing ended for relay: {}", relay_url); + } + + /// Process a single event (static version for use in spawned tasks). + async fn process_single_event_static( + event: &Event, + database: &SharedDatabase, + write_policy: &Nip34WritePolicy, + sync_source_addr: &SocketAddr, + relay_url: &str, + ) { + let event_id = event.id; + let kind = event.kind.as_u16(); + + // Check if event already exists in database + match database.check_id(&event_id).await { + Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => { + tracing::trace!("Event {} already exists, skipping", event_id); + return; + } + Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing + Err(e) => { + tracing::warn!("Failed to check if event {} exists: {}", event_id, e); + } + } + + // Pass through write policy + let policy_result = write_policy.admit_event(event, sync_source_addr).await; + + match policy_result { + PolicyResult::Accept => match database.save_event(event).await { + Ok(SaveEventStatus::Success) => { + tracing::info!( + "Synced event {} (kind {}) from {}", + event_id, + kind, + relay_url + ); + } + Ok(_) => { + tracing::debug!( + "Event {} (kind {}) already stored or rejected by database", + event_id, + kind + ); + } + Err(e) => { + tracing::error!("Failed to save synced event {}: {}", event_id, e); + } + }, + PolicyResult::Reject(reason) => { + tracing::debug!( + "Rejected synced event {} (kind {}): {}", + event_id, + kind, + reason + ); + } } } } diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs new file mode 100644 index 0000000..71b5d51 --- /dev/null +++ b/src/sync/relay_connection.rs @@ -0,0 +1,185 @@ +//! Relay Connection for Proactive Sync +//! +//! This module handles connecting to external relays and receiving events +//! for the proactive sync system. + +use std::time::Duration; + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; + +use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; + +/// Events received from a relay connection +#[derive(Debug)] +pub enum RelayEvent { + /// A nostr event was received + Event(Event), + /// End of stored events (EOSE) received + EndOfStoredEvents, + /// Connection was closed + Closed(String), +} + +/// Connection to an external relay for syncing events. +/// +/// RelayConnection handles: +/// - Connecting to the relay +/// - Subscribing with appropriate filters (Layer 1 for bootstrap) +/// - Receiving events and sending them through a channel +pub struct RelayConnection { + /// The relay URL + url: String, + /// The nostr-sdk client + client: Client, +} + +impl RelayConnection { + /// Create a new relay connection. + /// + /// # Arguments + /// + /// * `url` - The WebSocket URL of the relay to connect to + pub fn new(url: String) -> Self { + // Create a client with generated keys (we're just subscribing, not publishing) + let keys = Keys::generate(); + let client = Client::new(keys); + + Self { url, client } + } + + /// Connect to the relay and subscribe with Layer 1 filter. + /// + /// Layer 1 filter syncs announcement events (30617, 30618) which are + /// the foundation for discovering repository relationships. + /// + /// Returns the notification stream for receiving events. + pub async fn connect_and_subscribe(&self) -> Result<(), String> { + // Add the relay + self.client + .add_relay(&self.url) + .await + .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; + + // Connect to relay + self.client.connect().await; + + // Wait for connection to establish + let mut connected = false; + for _ in 0..30 { + tokio::time::sleep(Duration::from_millis(100)).await; + let relays = self.client.relays().await; + if relays.values().any(|r| r.is_connected()) { + connected = true; + break; + } + } + + if !connected { + return Err(format!( + "Failed to connect to relay {} after 3 seconds", + self.url + )); + } + + tracing::info!("Connected to bootstrap relay: {}", self.url); + + // Layer 1 filter: Repository announcements and state events + // These are addressable events that define repositories + let filter = Filter::new().kinds([ + Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 + Kind::Custom(KIND_REPOSITORY_STATE), // 30618 + ]); + + // Subscribe to the filter + self.client + .subscribe(filter, None) + .await + .map_err(|e| format!("Failed to subscribe: {}", e))?; + + tracing::debug!( + "Subscribed to Layer 1 events (kinds 30617, 30618) from {}", + self.url + ); + + Ok(()) + } + + /// Run the event loop, sending received events through the channel. + /// + /// This method runs until the connection is closed or an error occurs. + /// + /// # Arguments + /// + /// * `event_sender` - Channel to send received events + pub async fn run_event_loop(self, event_sender: mpsc::Sender) { + tracing::debug!("Starting event loop for relay: {}", self.url); + + // Handle notifications + self.client + .handle_notifications(|notification| async { + match notification { + RelayPoolNotification::Event { event, .. } => { + tracing::debug!( + "Received event {} (kind {}) from {}", + event.id, + event.kind.as_u16(), + self.url + ); + if event_sender.send(RelayEvent::Event(*event)).await.is_err() { + tracing::warn!("Event channel closed, stopping relay connection"); + return Ok(true); // Stop handling + } + } + RelayPoolNotification::Message { message, .. } => { + if let RelayMessage::EndOfStoredEvents(_) = message { + tracing::debug!("EOSE received from {}", self.url); + if event_sender + .send(RelayEvent::EndOfStoredEvents) + .await + .is_err() + { + return Ok(true); // Stop handling + } + } + } + RelayPoolNotification::Shutdown => { + tracing::info!("Relay {} shutting down", self.url); + let _ = event_sender + .send(RelayEvent::Closed("Shutdown".to_string())) + .await; + return Ok(true); // Stop handling + } + } + Ok(false) // Continue handling + }) + .await + .ok(); // Ignore errors on shutdown + + // Disconnect when done + self.client.disconnect().await; + tracing::info!("Disconnected from relay: {}", self.url); + } + + /// Get the relay URL + pub fn url(&self) -> &str { + &self.url + } + + /// Subscribe to an additional filter. + /// + /// This is used to add Layer 2 filters for repo-related events after + /// the initial connection is established. + pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> { + self.client + .subscribe(filter, None) + .await + .map_err(|e| format!("Failed to subscribe with filter: {}", e))?; + Ok(()) + } + + /// Get a reference to the client for additional operations. + pub fn client(&self) -> &Client { + &self.client + } +} diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs new file mode 100644 index 0000000..0512088 --- /dev/null +++ b/src/sync/self_subscriber.rs @@ -0,0 +1,497 @@ +//! Self-Subscriber for Proactive Sync +//! +//! This module handles subscribing to our own relay to detect new events +//! and trigger relay discovery from announcements. + +use std::collections::{HashMap, HashSet}; +use std::time::Duration; + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; +use tokio::time::Instant; + +use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; + +use super::{FollowingRepoRootEvents, SyncManager, SyncRelays}; + +// ============================================================================= +// Types +// ============================================================================= + +/// Actions to be taken by the SyncManager based on self-subscription events. +#[derive(Debug, Clone)] +pub enum RelayAction { + /// Spawn a new relay connection to sync from. + /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering. + SpawnRelay { + relay_url: String, + repos_and_root_events: HashMap>, + }, + /// Add filters to an existing relay connection. + /// Contains: relay_url, additional repos to add. + AddFilters { + relay_url: String, + repos_and_new_root_event: HashMap>, + }, +} + +/// Pending updates collected during batch window. +#[derive(Debug, Default)] +struct PendingUpdates { + /// New announcements (kind 30617) - triggers relay discovery + announcements: Vec, + /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set + root_events: Vec, +} + +// ============================================================================= +// SelfSubscriber +// ============================================================================= + +/// Subscribes to our own relay to detect new events. +/// +/// The self-subscriber: +/// 1. Connects to our own relay +/// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618) +/// 3. When events arrive, batches them +/// 4. On batch timer fire, processes updates and sends relay actions +pub struct SelfSubscriber { + /// URL of our own relay to subscribe to + own_relay_url: String, + /// Our relay domain for checking if announcements list us + relay_domain: String, + /// Reference to following repo root events (shared with SyncManager) + following_repo_root_events: FollowingRepoRootEvents, + /// Reference to sync relays (shared with SyncManager) + sync_relays: SyncRelays, + /// Channel to send relay actions back to manager + action_tx: mpsc::Sender, +} + +impl SelfSubscriber { + /// Create a new self-subscriber. + pub fn new( + own_relay_url: String, + relay_domain: String, + following_repo_root_events: FollowingRepoRootEvents, + sync_relays: SyncRelays, + action_tx: mpsc::Sender, + ) -> Self { + Self { + own_relay_url, + relay_domain, + following_repo_root_events, + sync_relays, + action_tx, + } + } + + /// Get the batch window duration from environment variable. + /// + /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS + /// for faster tests (typically 200ms). + fn get_batch_window() -> Duration { + std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") + .ok() + .and_then(|s| s.parse().ok()) + .map(Duration::from_millis) + .unwrap_or(Duration::from_secs(5)) + } + + /// Run the self-subscriber event loop. + /// + /// This method: + /// 1. Connects to our own relay + /// 2. Subscribes to relevant event kinds + /// 3. Receives events and batches them + /// 4. On batch timer fire, processes and sends relay actions + pub async fn run(self) { + tracing::info!("SelfSubscriber starting for {}", self.own_relay_url); + + // Create nostr-sdk client + let keys = Keys::generate(); + let client = Client::new(keys); + + // Connect to our own relay + if let Err(e) = client.add_relay(&self.own_relay_url).await { + tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e); + return; + } + + client.connect().await; + + // Wait for connection + let mut connected = false; + for _ in 0..30 { + tokio::time::sleep(Duration::from_millis(100)).await; + let relays = client.relays().await; + if relays.values().any(|r| r.is_connected()) { + connected = true; + break; + } + } + + if !connected { + tracing::error!( + "Failed to connect to own relay {} after 3 seconds", + self.own_relay_url + ); + return; + } + + tracing::info!("SelfSubscriber connected to {}", self.own_relay_url); + + // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design) + let filter = Filter::new() + .kinds([ + Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 + Kind::GitPatch, // 1617 + Kind::Custom(KIND_PR), // 1618 + Kind::Custom(KIND_PR_UPDATE), // 1619 + Kind::GitIssue, // 1621 + ]) + .since(Timestamp::now()); + + if let Err(e) = client.subscribe(filter, None).await { + tracing::error!("Failed to subscribe to own relay: {}", e); + return; + } + + tracing::info!("SelfSubscriber subscribed to event kinds on own relay"); + + // Batch state + let mut pending = PendingUpdates::default(); + let mut batch_timer_started: Option = None; + let batch_window = Self::get_batch_window(); + + // Main event loop using notifications stream + loop { + // Calculate timeout for batch processing + let timeout = if let Some(started) = batch_timer_started { + let elapsed = started.elapsed(); + if elapsed >= batch_window { + Duration::ZERO + } else { + batch_window - elapsed + } + } else { + Duration::from_secs(60) // Long timeout when no batch pending + }; + + // Wait for notification with timeout + let notification = tokio::time::timeout(timeout, client.notifications().recv()).await; + + match notification { + Ok(Ok(notification)) => { + match notification { + RelayPoolNotification::Event { event, .. } => { + let kind = event.kind.as_u16(); + + // Start batch timer on first event (does NOT reset) + if batch_timer_started.is_none() { + batch_timer_started = Some(Instant::now()); + tracing::debug!("Batch timer started"); + } + + // Classify and add to pending + if kind == KIND_REPOSITORY_ANNOUNCEMENT { + tracing::debug!( + "SelfSubscriber received announcement {}", + event.id + ); + pending.announcements.push(*event); + } else { + tracing::debug!( + "SelfSubscriber received root event {} (kind {})", + event.id, + kind + ); + pending.root_events.push(*event); + } + } + RelayPoolNotification::Message { message, .. } => { + if let RelayMessage::EndOfStoredEvents(_) = message { + tracing::debug!("SelfSubscriber EOSE received"); + // Process any pending events after EOSE + if !pending.announcements.is_empty() + || !pending.root_events.is_empty() + { + self.process_batch(&mut pending).await; + batch_timer_started = None; + } + } + } + RelayPoolNotification::Shutdown => { + tracing::info!("SelfSubscriber shutting down"); + break; + } + } + } + Ok(Err(_)) => { + // Channel closed + tracing::warn!("SelfSubscriber notification channel closed"); + break; + } + Err(_) => { + // Timeout - check if batch should be processed + if let Some(started) = batch_timer_started { + if started.elapsed() >= batch_window { + if !pending.announcements.is_empty() || !pending.root_events.is_empty() + { + self.process_batch(&mut pending).await; + } + batch_timer_started = None; + } + } + } + } + } + + client.disconnect().await; + tracing::info!("SelfSubscriber disconnected"); + } + + /// Process a batch of pending updates. + async fn process_batch(&self, pending: &mut PendingUpdates) { + tracing::debug!( + "Processing batch: {} announcements, {} root events", + pending.announcements.len(), + pending.root_events.len() + ); + + // Process root events first (update following_repo_root_events) + for event in pending.root_events.drain(..) { + let repo_refs = SyncManager::extract_all_repo_refs(&event); + if !repo_refs.is_empty() { + let mut guard = self.following_repo_root_events.write().await; + for repo_ref in repo_refs { + guard.entry(repo_ref).or_default().insert(event.id); + } + } + } + + // Process announcements (relay discovery) + for event in pending.announcements.drain(..) { + self.process_announcement(&event).await; + } + } + + /// Process an announcement event for relay discovery. + async fn process_announcement(&self, event: &Event) { + let repo_ref = SyncManager::build_repo_ref(event); + let relay_urls = Self::extract_relay_urls_from_announcement(event); + + // Check if this announcement lists our relay + if !self.lists_our_service(event) { + tracing::debug!( + "Announcement {} does not list our service, skipping relay discovery", + event.id + ); + return; + } + + tracing::info!( + "Processing announcement {} for repo {}, found {} relay URLs", + event.id, + repo_ref, + relay_urls.len() + ); + + // Get current events for this repo from following_repo_root_events + let events = self + .following_repo_root_events + .read() + .await + .get(&repo_ref) + .cloned() + .unwrap_or_default(); + + // For each relay URL in the announcement, check if we need to spawn or update + for relay_url in relay_urls { + if self.is_own_relay(&relay_url) { + continue; // Skip our own relay + } + + let sync_relays_guard = self.sync_relays.read().await; + let exists = sync_relays_guard.contains_key(&relay_url); + drop(sync_relays_guard); + + if exists { + // Relay already known - check if we need to add this repo + let mut guard = self.sync_relays.write().await; + let relay_repos = guard.entry(relay_url.clone()).or_default(); + let is_new_repo = !relay_repos.contains_key(&repo_ref); + + if is_new_repo { + relay_repos.insert(repo_ref.clone(), events.clone()); + drop(guard); + + // Send action to add filters + let mut repos_filters = HashMap::new(); + repos_filters.insert(repo_ref.clone(), events.clone()); + + if let Err(e) = self + .action_tx + .send(RelayAction::AddFilters { + relay_url: relay_url.clone(), + repos_and_new_root_event: repos_filters, + }) + .await + { + tracing::warn!("Failed to send AddFilters action: {}", e); + } + } + } else { + // New relay - add to sync_relays and spawn + let mut guard = self.sync_relays.write().await; + let mut repos = HashMap::new(); + repos.insert(repo_ref.clone(), events.clone()); + guard.insert(relay_url.clone(), repos.clone()); + drop(guard); + + tracing::info!("Discovered new relay to sync from: {}", relay_url); + + // Send action to spawn relay + if let Err(e) = self + .action_tx + .send(RelayAction::SpawnRelay { + relay_url: relay_url.clone(), + repos_and_root_events: repos, + }) + .await + { + tracing::warn!("Failed to send SpawnRelay action: {}", e); + } + } + } + } + + /// Extract relay URLs from an announcement event. + /// + /// Looks for both 'relays' and 'clone' tags. + fn extract_relay_urls_from_announcement(event: &Event) -> Vec { + let mut urls = Vec::new(); + + // Extract from 'relays' tag + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Relays) { + let vec = tag.clone().to_vec(); + urls.extend(vec.into_iter().skip(1)); // Skip tag name + } + } + + // Extract from 'clone' tag - parse URLs to get relay hints + // Clone URLs look like: http://domain/repo.git or git://domain/repo.git + // We want to construct ws://domain from these + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Clone) { + let vec = tag.clone().to_vec(); + for url in vec.into_iter().skip(1) { + if let Some(relay_url) = Self::clone_url_to_relay_url(&url) { + if !urls.contains(&relay_url) { + urls.push(relay_url); + } + } + } + } + } + + urls + } + + /// Convert a clone URL to a potential relay URL. + /// + /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080" + fn clone_url_to_relay_url(clone_url: &str) -> Option { + // Parse the URL to extract host:port + if let Ok(url) = url::Url::parse(clone_url) { + let host = url.host_str()?; + let port = url.port(); + let scheme = if url.scheme() == "https" { "wss" } else { "ws" }; + + if let Some(port) = port { + Some(format!("{}://{}:{}", scheme, host, port)) + } else { + Some(format!("{}://{}", scheme, host)) + } + } else { + None + } + } + + /// Check if event lists our service in the relays or clone tags. + fn lists_our_service(&self, event: &Event) -> bool { + // Check relays tag + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Relays) { + let vec = tag.clone().to_vec(); + for url in vec.into_iter().skip(1) { + if self.is_own_relay(&url) { + return true; + } + } + } + } + + // Check clone tag + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Clone) { + let vec = tag.clone().to_vec(); + for url in vec.into_iter().skip(1) { + if url.contains(&self.relay_domain) { + return true; + } + } + } + } + + false + } + + /// Check if a relay URL matches our relay. + fn is_own_relay(&self, relay_url: &str) -> bool { + relay_url.contains(&self.relay_domain) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_clone_url_to_relay_url_http() { + let url = "http://127.0.0.1:8080/repo.git"; + let relay = SelfSubscriber::clone_url_to_relay_url(url); + assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string())); + } + + #[test] + fn test_clone_url_to_relay_url_https() { + let url = "https://example.com/repo.git"; + let relay = SelfSubscriber::clone_url_to_relay_url(url); + assert_eq!(relay, Some("wss://example.com".to_string())); + } + + #[test] + fn test_clone_url_to_relay_url_invalid() { + let url = "not-a-valid-url"; + let relay = SelfSubscriber::clone_url_to_relay_url(url); + assert_eq!(relay, None); + } + + #[test] + fn test_get_batch_window_default() { + // Clear env var if set + std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); + let window = SelfSubscriber::get_batch_window(); + assert_eq!(window, Duration::from_secs(5)); + } + + #[test] + fn test_get_batch_window_from_env() { + std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200"); + let window = SelfSubscriber::get_batch_window(); + assert_eq!(window, Duration::from_millis(200)); + std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); + } +} -- cgit v1.2.3