From 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 09:07:54 +0000 Subject: improve sync design --- src/sync/self_subscriber.rs | 497 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 497 insertions(+) create mode 100644 src/sync/self_subscriber.rs (limited to 'src/sync/self_subscriber.rs') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs new file mode 100644 index 0000000..0512088 --- /dev/null +++ b/src/sync/self_subscriber.rs @@ -0,0 +1,497 @@ +//! Self-Subscriber for Proactive Sync +//! +//! This module handles subscribing to our own relay to detect new events +//! and trigger relay discovery from announcements. + +use std::collections::{HashMap, HashSet}; +use std::time::Duration; + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; +use tokio::time::Instant; + +use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; + +use super::{FollowingRepoRootEvents, SyncManager, SyncRelays}; + +// ============================================================================= +// Types +// ============================================================================= + +/// Actions to be taken by the SyncManager based on self-subscription events. +#[derive(Debug, Clone)] +pub enum RelayAction { + /// Spawn a new relay connection to sync from. + /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering. + SpawnRelay { + relay_url: String, + repos_and_root_events: HashMap>, + }, + /// Add filters to an existing relay connection. + /// Contains: relay_url, additional repos to add. + AddFilters { + relay_url: String, + repos_and_new_root_event: HashMap>, + }, +} + +/// Pending updates collected during batch window. +#[derive(Debug, Default)] +struct PendingUpdates { + /// New announcements (kind 30617) - triggers relay discovery + announcements: Vec, + /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set + root_events: Vec, +} + +// ============================================================================= +// SelfSubscriber +// ============================================================================= + +/// Subscribes to our own relay to detect new events. +/// +/// The self-subscriber: +/// 1. Connects to our own relay +/// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618) +/// 3. When events arrive, batches them +/// 4. On batch timer fire, processes updates and sends relay actions +pub struct SelfSubscriber { + /// URL of our own relay to subscribe to + own_relay_url: String, + /// Our relay domain for checking if announcements list us + relay_domain: String, + /// Reference to following repo root events (shared with SyncManager) + following_repo_root_events: FollowingRepoRootEvents, + /// Reference to sync relays (shared with SyncManager) + sync_relays: SyncRelays, + /// Channel to send relay actions back to manager + action_tx: mpsc::Sender, +} + +impl SelfSubscriber { + /// Create a new self-subscriber. + pub fn new( + own_relay_url: String, + relay_domain: String, + following_repo_root_events: FollowingRepoRootEvents, + sync_relays: SyncRelays, + action_tx: mpsc::Sender, + ) -> Self { + Self { + own_relay_url, + relay_domain, + following_repo_root_events, + sync_relays, + action_tx, + } + } + + /// Get the batch window duration from environment variable. + /// + /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS + /// for faster tests (typically 200ms). + fn get_batch_window() -> Duration { + std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") + .ok() + .and_then(|s| s.parse().ok()) + .map(Duration::from_millis) + .unwrap_or(Duration::from_secs(5)) + } + + /// Run the self-subscriber event loop. + /// + /// This method: + /// 1. Connects to our own relay + /// 2. Subscribes to relevant event kinds + /// 3. Receives events and batches them + /// 4. On batch timer fire, processes and sends relay actions + pub async fn run(self) { + tracing::info!("SelfSubscriber starting for {}", self.own_relay_url); + + // Create nostr-sdk client + let keys = Keys::generate(); + let client = Client::new(keys); + + // Connect to our own relay + if let Err(e) = client.add_relay(&self.own_relay_url).await { + tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e); + return; + } + + client.connect().await; + + // Wait for connection + let mut connected = false; + for _ in 0..30 { + tokio::time::sleep(Duration::from_millis(100)).await; + let relays = client.relays().await; + if relays.values().any(|r| r.is_connected()) { + connected = true; + break; + } + } + + if !connected { + tracing::error!( + "Failed to connect to own relay {} after 3 seconds", + self.own_relay_url + ); + return; + } + + tracing::info!("SelfSubscriber connected to {}", self.own_relay_url); + + // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design) + let filter = Filter::new() + .kinds([ + Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 + Kind::GitPatch, // 1617 + Kind::Custom(KIND_PR), // 1618 + Kind::Custom(KIND_PR_UPDATE), // 1619 + Kind::GitIssue, // 1621 + ]) + .since(Timestamp::now()); + + if let Err(e) = client.subscribe(filter, None).await { + tracing::error!("Failed to subscribe to own relay: {}", e); + return; + } + + tracing::info!("SelfSubscriber subscribed to event kinds on own relay"); + + // Batch state + let mut pending = PendingUpdates::default(); + let mut batch_timer_started: Option = None; + let batch_window = Self::get_batch_window(); + + // Main event loop using notifications stream + loop { + // Calculate timeout for batch processing + let timeout = if let Some(started) = batch_timer_started { + let elapsed = started.elapsed(); + if elapsed >= batch_window { + Duration::ZERO + } else { + batch_window - elapsed + } + } else { + Duration::from_secs(60) // Long timeout when no batch pending + }; + + // Wait for notification with timeout + let notification = tokio::time::timeout(timeout, client.notifications().recv()).await; + + match notification { + Ok(Ok(notification)) => { + match notification { + RelayPoolNotification::Event { event, .. } => { + let kind = event.kind.as_u16(); + + // Start batch timer on first event (does NOT reset) + if batch_timer_started.is_none() { + batch_timer_started = Some(Instant::now()); + tracing::debug!("Batch timer started"); + } + + // Classify and add to pending + if kind == KIND_REPOSITORY_ANNOUNCEMENT { + tracing::debug!( + "SelfSubscriber received announcement {}", + event.id + ); + pending.announcements.push(*event); + } else { + tracing::debug!( + "SelfSubscriber received root event {} (kind {})", + event.id, + kind + ); + pending.root_events.push(*event); + } + } + RelayPoolNotification::Message { message, .. } => { + if let RelayMessage::EndOfStoredEvents(_) = message { + tracing::debug!("SelfSubscriber EOSE received"); + // Process any pending events after EOSE + if !pending.announcements.is_empty() + || !pending.root_events.is_empty() + { + self.process_batch(&mut pending).await; + batch_timer_started = None; + } + } + } + RelayPoolNotification::Shutdown => { + tracing::info!("SelfSubscriber shutting down"); + break; + } + } + } + Ok(Err(_)) => { + // Channel closed + tracing::warn!("SelfSubscriber notification channel closed"); + break; + } + Err(_) => { + // Timeout - check if batch should be processed + if let Some(started) = batch_timer_started { + if started.elapsed() >= batch_window { + if !pending.announcements.is_empty() || !pending.root_events.is_empty() + { + self.process_batch(&mut pending).await; + } + batch_timer_started = None; + } + } + } + } + } + + client.disconnect().await; + tracing::info!("SelfSubscriber disconnected"); + } + + /// Process a batch of pending updates. + async fn process_batch(&self, pending: &mut PendingUpdates) { + tracing::debug!( + "Processing batch: {} announcements, {} root events", + pending.announcements.len(), + pending.root_events.len() + ); + + // Process root events first (update following_repo_root_events) + for event in pending.root_events.drain(..) { + let repo_refs = SyncManager::extract_all_repo_refs(&event); + if !repo_refs.is_empty() { + let mut guard = self.following_repo_root_events.write().await; + for repo_ref in repo_refs { + guard.entry(repo_ref).or_default().insert(event.id); + } + } + } + + // Process announcements (relay discovery) + for event in pending.announcements.drain(..) { + self.process_announcement(&event).await; + } + } + + /// Process an announcement event for relay discovery. + async fn process_announcement(&self, event: &Event) { + let repo_ref = SyncManager::build_repo_ref(event); + let relay_urls = Self::extract_relay_urls_from_announcement(event); + + // Check if this announcement lists our relay + if !self.lists_our_service(event) { + tracing::debug!( + "Announcement {} does not list our service, skipping relay discovery", + event.id + ); + return; + } + + tracing::info!( + "Processing announcement {} for repo {}, found {} relay URLs", + event.id, + repo_ref, + relay_urls.len() + ); + + // Get current events for this repo from following_repo_root_events + let events = self + .following_repo_root_events + .read() + .await + .get(&repo_ref) + .cloned() + .unwrap_or_default(); + + // For each relay URL in the announcement, check if we need to spawn or update + for relay_url in relay_urls { + if self.is_own_relay(&relay_url) { + continue; // Skip our own relay + } + + let sync_relays_guard = self.sync_relays.read().await; + let exists = sync_relays_guard.contains_key(&relay_url); + drop(sync_relays_guard); + + if exists { + // Relay already known - check if we need to add this repo + let mut guard = self.sync_relays.write().await; + let relay_repos = guard.entry(relay_url.clone()).or_default(); + let is_new_repo = !relay_repos.contains_key(&repo_ref); + + if is_new_repo { + relay_repos.insert(repo_ref.clone(), events.clone()); + drop(guard); + + // Send action to add filters + let mut repos_filters = HashMap::new(); + repos_filters.insert(repo_ref.clone(), events.clone()); + + if let Err(e) = self + .action_tx + .send(RelayAction::AddFilters { + relay_url: relay_url.clone(), + repos_and_new_root_event: repos_filters, + }) + .await + { + tracing::warn!("Failed to send AddFilters action: {}", e); + } + } + } else { + // New relay - add to sync_relays and spawn + let mut guard = self.sync_relays.write().await; + let mut repos = HashMap::new(); + repos.insert(repo_ref.clone(), events.clone()); + guard.insert(relay_url.clone(), repos.clone()); + drop(guard); + + tracing::info!("Discovered new relay to sync from: {}", relay_url); + + // Send action to spawn relay + if let Err(e) = self + .action_tx + .send(RelayAction::SpawnRelay { + relay_url: relay_url.clone(), + repos_and_root_events: repos, + }) + .await + { + tracing::warn!("Failed to send SpawnRelay action: {}", e); + } + } + } + } + + /// Extract relay URLs from an announcement event. + /// + /// Looks for both 'relays' and 'clone' tags. + fn extract_relay_urls_from_announcement(event: &Event) -> Vec { + let mut urls = Vec::new(); + + // Extract from 'relays' tag + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Relays) { + let vec = tag.clone().to_vec(); + urls.extend(vec.into_iter().skip(1)); // Skip tag name + } + } + + // Extract from 'clone' tag - parse URLs to get relay hints + // Clone URLs look like: http://domain/repo.git or git://domain/repo.git + // We want to construct ws://domain from these + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Clone) { + let vec = tag.clone().to_vec(); + for url in vec.into_iter().skip(1) { + if let Some(relay_url) = Self::clone_url_to_relay_url(&url) { + if !urls.contains(&relay_url) { + urls.push(relay_url); + } + } + } + } + } + + urls + } + + /// Convert a clone URL to a potential relay URL. + /// + /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080" + fn clone_url_to_relay_url(clone_url: &str) -> Option { + // Parse the URL to extract host:port + if let Ok(url) = url::Url::parse(clone_url) { + let host = url.host_str()?; + let port = url.port(); + let scheme = if url.scheme() == "https" { "wss" } else { "ws" }; + + if let Some(port) = port { + Some(format!("{}://{}:{}", scheme, host, port)) + } else { + Some(format!("{}://{}", scheme, host)) + } + } else { + None + } + } + + /// Check if event lists our service in the relays or clone tags. + fn lists_our_service(&self, event: &Event) -> bool { + // Check relays tag + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Relays) { + let vec = tag.clone().to_vec(); + for url in vec.into_iter().skip(1) { + if self.is_own_relay(&url) { + return true; + } + } + } + } + + // Check clone tag + for tag in event.tags.iter() { + if matches!(tag.kind(), TagKind::Clone) { + let vec = tag.clone().to_vec(); + for url in vec.into_iter().skip(1) { + if url.contains(&self.relay_domain) { + return true; + } + } + } + } + + false + } + + /// Check if a relay URL matches our relay. + fn is_own_relay(&self, relay_url: &str) -> bool { + relay_url.contains(&self.relay_domain) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_clone_url_to_relay_url_http() { + let url = "http://127.0.0.1:8080/repo.git"; + let relay = SelfSubscriber::clone_url_to_relay_url(url); + assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string())); + } + + #[test] + fn test_clone_url_to_relay_url_https() { + let url = "https://example.com/repo.git"; + let relay = SelfSubscriber::clone_url_to_relay_url(url); + assert_eq!(relay, Some("wss://example.com".to_string())); + } + + #[test] + fn test_clone_url_to_relay_url_invalid() { + let url = "not-a-valid-url"; + let relay = SelfSubscriber::clone_url_to_relay_url(url); + assert_eq!(relay, None); + } + + #[test] + fn test_get_batch_window_default() { + // Clear env var if set + std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); + let window = SelfSubscriber::get_batch_window(); + assert_eq!(window, Duration::from_secs(5)); + } + + #[test] + fn test_get_batch_window_from_env() { + std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200"); + let window = SelfSubscriber::get_batch_window(); + assert_eq!(window, Duration::from_millis(200)); + std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); + } +} -- cgit v1.2.3