From 586fc2a7df1ce256469f0742d23f687ac4b075b1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 10:33:07 +0000 Subject: stub of sync v4 --- src/sync/self_subscriber.rs | 497 -------------------------------------------- 1 file changed, 497 deletions(-) delete mode 100644 src/sync/self_subscriber.rs (limited to 'src/sync/self_subscriber.rs') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs deleted file mode 100644 index 0512088..0000000 --- a/src/sync/self_subscriber.rs +++ /dev/null @@ -1,497 +0,0 @@ -//! Self-Subscriber for Proactive Sync -//! -//! This module handles subscribing to our own relay to detect new events -//! and trigger relay discovery from announcements. - -use std::collections::{HashMap, HashSet}; -use std::time::Duration; - -use nostr_sdk::prelude::*; -use tokio::sync::mpsc; -use tokio::time::Instant; - -use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; - -use super::{FollowingRepoRootEvents, SyncManager, SyncRelays}; - -// ============================================================================= -// Types -// ============================================================================= - -/// Actions to be taken by the SyncManager based on self-subscription events. -#[derive(Debug, Clone)] -pub enum RelayAction { - /// Spawn a new relay connection to sync from. - /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering. - SpawnRelay { - relay_url: String, - repos_and_root_events: HashMap>, - }, - /// Add filters to an existing relay connection. - /// Contains: relay_url, additional repos to add. - AddFilters { - relay_url: String, - repos_and_new_root_event: HashMap>, - }, -} - -/// Pending updates collected during batch window. -#[derive(Debug, Default)] -struct PendingUpdates { - /// New announcements (kind 30617) - triggers relay discovery - announcements: Vec, - /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set - root_events: Vec, -} - -// ============================================================================= -// SelfSubscriber -// ============================================================================= - -/// Subscribes to our own relay to detect new events. -/// -/// The self-subscriber: -/// 1. Connects to our own relay -/// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618) -/// 3. When events arrive, batches them -/// 4. On batch timer fire, processes updates and sends relay actions -pub struct SelfSubscriber { - /// URL of our own relay to subscribe to - own_relay_url: String, - /// Our relay domain for checking if announcements list us - relay_domain: String, - /// Reference to following repo root events (shared with SyncManager) - following_repo_root_events: FollowingRepoRootEvents, - /// Reference to sync relays (shared with SyncManager) - sync_relays: SyncRelays, - /// Channel to send relay actions back to manager - action_tx: mpsc::Sender, -} - -impl SelfSubscriber { - /// Create a new self-subscriber. - pub fn new( - own_relay_url: String, - relay_domain: String, - following_repo_root_events: FollowingRepoRootEvents, - sync_relays: SyncRelays, - action_tx: mpsc::Sender, - ) -> Self { - Self { - own_relay_url, - relay_domain, - following_repo_root_events, - sync_relays, - action_tx, - } - } - - /// Get the batch window duration from environment variable. - /// - /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS - /// for faster tests (typically 200ms). - fn get_batch_window() -> Duration { - std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") - .ok() - .and_then(|s| s.parse().ok()) - .map(Duration::from_millis) - .unwrap_or(Duration::from_secs(5)) - } - - /// Run the self-subscriber event loop. - /// - /// This method: - /// 1. Connects to our own relay - /// 2. Subscribes to relevant event kinds - /// 3. Receives events and batches them - /// 4. On batch timer fire, processes and sends relay actions - pub async fn run(self) { - tracing::info!("SelfSubscriber starting for {}", self.own_relay_url); - - // Create nostr-sdk client - let keys = Keys::generate(); - let client = Client::new(keys); - - // Connect to our own relay - if let Err(e) = client.add_relay(&self.own_relay_url).await { - tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e); - return; - } - - client.connect().await; - - // Wait for connection - let mut connected = false; - for _ in 0..30 { - tokio::time::sleep(Duration::from_millis(100)).await; - let relays = client.relays().await; - if relays.values().any(|r| r.is_connected()) { - connected = true; - break; - } - } - - if !connected { - tracing::error!( - "Failed to connect to own relay {} after 3 seconds", - self.own_relay_url - ); - return; - } - - tracing::info!("SelfSubscriber connected to {}", self.own_relay_url); - - // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design) - let filter = Filter::new() - .kinds([ - Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617 - Kind::GitPatch, // 1617 - Kind::Custom(KIND_PR), // 1618 - Kind::Custom(KIND_PR_UPDATE), // 1619 - Kind::GitIssue, // 1621 - ]) - .since(Timestamp::now()); - - if let Err(e) = client.subscribe(filter, None).await { - tracing::error!("Failed to subscribe to own relay: {}", e); - return; - } - - tracing::info!("SelfSubscriber subscribed to event kinds on own relay"); - - // Batch state - let mut pending = PendingUpdates::default(); - let mut batch_timer_started: Option = None; - let batch_window = Self::get_batch_window(); - - // Main event loop using notifications stream - loop { - // Calculate timeout for batch processing - let timeout = if let Some(started) = batch_timer_started { - let elapsed = started.elapsed(); - if elapsed >= batch_window { - Duration::ZERO - } else { - batch_window - elapsed - } - } else { - Duration::from_secs(60) // Long timeout when no batch pending - }; - - // Wait for notification with timeout - let notification = tokio::time::timeout(timeout, client.notifications().recv()).await; - - match notification { - Ok(Ok(notification)) => { - match notification { - RelayPoolNotification::Event { event, .. } => { - let kind = event.kind.as_u16(); - - // Start batch timer on first event (does NOT reset) - if batch_timer_started.is_none() { - batch_timer_started = Some(Instant::now()); - tracing::debug!("Batch timer started"); - } - - // Classify and add to pending - if kind == KIND_REPOSITORY_ANNOUNCEMENT { - tracing::debug!( - "SelfSubscriber received announcement {}", - event.id - ); - pending.announcements.push(*event); - } else { - tracing::debug!( - "SelfSubscriber received root event {} (kind {})", - event.id, - kind - ); - pending.root_events.push(*event); - } - } - RelayPoolNotification::Message { message, .. } => { - if let RelayMessage::EndOfStoredEvents(_) = message { - tracing::debug!("SelfSubscriber EOSE received"); - // Process any pending events after EOSE - if !pending.announcements.is_empty() - || !pending.root_events.is_empty() - { - self.process_batch(&mut pending).await; - batch_timer_started = None; - } - } - } - RelayPoolNotification::Shutdown => { - tracing::info!("SelfSubscriber shutting down"); - break; - } - } - } - Ok(Err(_)) => { - // Channel closed - tracing::warn!("SelfSubscriber notification channel closed"); - break; - } - Err(_) => { - // Timeout - check if batch should be processed - if let Some(started) = batch_timer_started { - if started.elapsed() >= batch_window { - if !pending.announcements.is_empty() || !pending.root_events.is_empty() - { - self.process_batch(&mut pending).await; - } - batch_timer_started = None; - } - } - } - } - } - - client.disconnect().await; - tracing::info!("SelfSubscriber disconnected"); - } - - /// Process a batch of pending updates. - async fn process_batch(&self, pending: &mut PendingUpdates) { - tracing::debug!( - "Processing batch: {} announcements, {} root events", - pending.announcements.len(), - pending.root_events.len() - ); - - // Process root events first (update following_repo_root_events) - for event in pending.root_events.drain(..) { - let repo_refs = SyncManager::extract_all_repo_refs(&event); - if !repo_refs.is_empty() { - let mut guard = self.following_repo_root_events.write().await; - for repo_ref in repo_refs { - guard.entry(repo_ref).or_default().insert(event.id); - } - } - } - - // Process announcements (relay discovery) - for event in pending.announcements.drain(..) { - self.process_announcement(&event).await; - } - } - - /// Process an announcement event for relay discovery. - async fn process_announcement(&self, event: &Event) { - let repo_ref = SyncManager::build_repo_ref(event); - let relay_urls = Self::extract_relay_urls_from_announcement(event); - - // Check if this announcement lists our relay - if !self.lists_our_service(event) { - tracing::debug!( - "Announcement {} does not list our service, skipping relay discovery", - event.id - ); - return; - } - - tracing::info!( - "Processing announcement {} for repo {}, found {} relay URLs", - event.id, - repo_ref, - relay_urls.len() - ); - - // Get current events for this repo from following_repo_root_events - let events = self - .following_repo_root_events - .read() - .await - .get(&repo_ref) - .cloned() - .unwrap_or_default(); - - // For each relay URL in the announcement, check if we need to spawn or update - for relay_url in relay_urls { - if self.is_own_relay(&relay_url) { - continue; // Skip our own relay - } - - let sync_relays_guard = self.sync_relays.read().await; - let exists = sync_relays_guard.contains_key(&relay_url); - drop(sync_relays_guard); - - if exists { - // Relay already known - check if we need to add this repo - let mut guard = self.sync_relays.write().await; - let relay_repos = guard.entry(relay_url.clone()).or_default(); - let is_new_repo = !relay_repos.contains_key(&repo_ref); - - if is_new_repo { - relay_repos.insert(repo_ref.clone(), events.clone()); - drop(guard); - - // Send action to add filters - let mut repos_filters = HashMap::new(); - repos_filters.insert(repo_ref.clone(), events.clone()); - - if let Err(e) = self - .action_tx - .send(RelayAction::AddFilters { - relay_url: relay_url.clone(), - repos_and_new_root_event: repos_filters, - }) - .await - { - tracing::warn!("Failed to send AddFilters action: {}", e); - } - } - } else { - // New relay - add to sync_relays and spawn - let mut guard = self.sync_relays.write().await; - let mut repos = HashMap::new(); - repos.insert(repo_ref.clone(), events.clone()); - guard.insert(relay_url.clone(), repos.clone()); - drop(guard); - - tracing::info!("Discovered new relay to sync from: {}", relay_url); - - // Send action to spawn relay - if let Err(e) = self - .action_tx - .send(RelayAction::SpawnRelay { - relay_url: relay_url.clone(), - repos_and_root_events: repos, - }) - .await - { - tracing::warn!("Failed to send SpawnRelay action: {}", e); - } - } - } - } - - /// Extract relay URLs from an announcement event. - /// - /// Looks for both 'relays' and 'clone' tags. - fn extract_relay_urls_from_announcement(event: &Event) -> Vec { - let mut urls = Vec::new(); - - // Extract from 'relays' tag - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Relays) { - let vec = tag.clone().to_vec(); - urls.extend(vec.into_iter().skip(1)); // Skip tag name - } - } - - // Extract from 'clone' tag - parse URLs to get relay hints - // Clone URLs look like: http://domain/repo.git or git://domain/repo.git - // We want to construct ws://domain from these - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Clone) { - let vec = tag.clone().to_vec(); - for url in vec.into_iter().skip(1) { - if let Some(relay_url) = Self::clone_url_to_relay_url(&url) { - if !urls.contains(&relay_url) { - urls.push(relay_url); - } - } - } - } - } - - urls - } - - /// Convert a clone URL to a potential relay URL. - /// - /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080" - fn clone_url_to_relay_url(clone_url: &str) -> Option { - // Parse the URL to extract host:port - if let Ok(url) = url::Url::parse(clone_url) { - let host = url.host_str()?; - let port = url.port(); - let scheme = if url.scheme() == "https" { "wss" } else { "ws" }; - - if let Some(port) = port { - Some(format!("{}://{}:{}", scheme, host, port)) - } else { - Some(format!("{}://{}", scheme, host)) - } - } else { - None - } - } - - /// Check if event lists our service in the relays or clone tags. - fn lists_our_service(&self, event: &Event) -> bool { - // Check relays tag - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Relays) { - let vec = tag.clone().to_vec(); - for url in vec.into_iter().skip(1) { - if self.is_own_relay(&url) { - return true; - } - } - } - } - - // Check clone tag - for tag in event.tags.iter() { - if matches!(tag.kind(), TagKind::Clone) { - let vec = tag.clone().to_vec(); - for url in vec.into_iter().skip(1) { - if url.contains(&self.relay_domain) { - return true; - } - } - } - } - - false - } - - /// Check if a relay URL matches our relay. - fn is_own_relay(&self, relay_url: &str) -> bool { - relay_url.contains(&self.relay_domain) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_clone_url_to_relay_url_http() { - let url = "http://127.0.0.1:8080/repo.git"; - let relay = SelfSubscriber::clone_url_to_relay_url(url); - assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string())); - } - - #[test] - fn test_clone_url_to_relay_url_https() { - let url = "https://example.com/repo.git"; - let relay = SelfSubscriber::clone_url_to_relay_url(url); - assert_eq!(relay, Some("wss://example.com".to_string())); - } - - #[test] - fn test_clone_url_to_relay_url_invalid() { - let url = "not-a-valid-url"; - let relay = SelfSubscriber::clone_url_to_relay_url(url); - assert_eq!(relay, None); - } - - #[test] - fn test_get_batch_window_default() { - // Clear env var if set - std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); - let window = SelfSubscriber::get_batch_window(); - assert_eq!(window, Duration::from_secs(5)); - } - - #[test] - fn test_get_batch_window_from_env() { - std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200"); - let window = SelfSubscriber::get_batch_window(); - assert_eq!(window, Duration::from_millis(200)); - std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS"); - } -} -- cgit v1.2.3