From efaad1e2857914b87307cf78903a957a604697a8 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 9 Dec 2025 09:28:12 +0000 Subject: basic sync stub --- src/sync/connection.rs | 473 ----------------------------- src/sync/filter.rs | 451 ---------------------------- src/sync/manager.rs | 762 ----------------------------------------------- src/sync/mod.rs | 416 ++++++++++++++++++++++++-- src/sync/negentropy.rs | 477 ----------------------------- src/sync/subscription.rs | 229 -------------- 6 files changed, 385 insertions(+), 2423 deletions(-) delete mode 100644 src/sync/connection.rs delete mode 100644 src/sync/filter.rs delete mode 100644 src/sync/manager.rs delete mode 100644 src/sync/negentropy.rs delete mode 100644 src/sync/subscription.rs (limited to 'src/sync') diff --git a/src/sync/connection.rs b/src/sync/connection.rs deleted file mode 100644 index 61a33f8..0000000 --- a/src/sync/connection.rs +++ /dev/null @@ -1,473 +0,0 @@ -//! WebSocket connection handling for sync -//! -//! Manages the connection to a source relay, subscribes to events using -//! the three-layer filter strategy, and passes them through validation. -//! -//! ## Phase 2 Features -//! -//! - Three-layer filter subscriptions: -//! 1. Layer 1: kinds 30617 + 30618 (announcements) -//! 2. Layer 2: A/a tags for repository events -//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) -//! -//! ## Phase 3 Features -//! -//! - Health tracking with success/failure reporting -//! - Exponential backoff with health-aware delays -//! - Dead relay detection and minimal retry -//! -//! ## Phase 4 Features -//! -//! - Dynamic subscription updates when new announcements/PRs arrive -//! - Per-connection subscription tracking -//! - Filter consolidation when count exceeds threshold (>150) -//! - Duplicate subscription prevention - -use std::sync::Arc; -use std::time::Duration; - -use nostr_sdk::prelude::*; -use tokio::sync::mpsc; - -use super::filter::FilterService; -use super::health::RelayHealthTracker; -use super::metrics::{event_source, SyncMetrics}; -use super::subscription::SubscriptionManager; - -/// Event received from the sync connection -#[derive(Debug, Clone)] -pub struct SyncedEvent { - pub event: Event, - pub source_url: String, -} - -/// Manages a WebSocket connection to a single relay for syncing -pub struct SyncConnection { - url: String, - client: Client, - filter_service: Arc, - remote_domain: String, - subscription_manager: SubscriptionManager, - metrics: Option, -} - -impl SyncConnection { - /// Create a new sync connection to the given relay URL - pub async fn new( - url: &str, - filter_service: Arc, - remote_domain: &str, - metrics: Option, - ) -> Result> { - let client = Client::default(); - - // Add the relay - client.add_relay(url).await?; - - // Connect to the relay - client.connect().await; - - tracing::info!("Sync connection established to {}", url); - - // Create subscription manager for this connection - let subscription_manager = - SubscriptionManager::new(filter_service.clone(), remote_domain.to_string()); - - Ok(Self { - url: url.to_string(), - client, - filter_service, - remote_domain: remote_domain.to_string(), - subscription_manager, - metrics, - }) - } - - /// Start receiving events and send them through the channel - /// - /// This method runs indefinitely, handling events from all three filter layers. - /// Dynamic subscription updates are triggered when new announcements or PRs arrive. - pub async fn run(mut self, tx: mpsc::Sender) { - // Subscribe to all three filter layers - - // Layer 1: Announcement discovery (kinds 30617 + 30618) - let layer1_filters = self.filter_service.get_layer1_filters(); - for filter in &layer1_filters { - match self.client.subscribe(filter.clone(), None).await { - Ok(output) => { - tracing::info!( - "Subscribed to Layer 1 (announcements) on {} (subscription: {})", - self.url, - output.id() - ); - } - Err(e) => { - tracing::error!("Failed to subscribe Layer 1 on {}: {}", self.url, e); - } - } - } - - // Layer 2: Repository events (A/a tags) - let layer2_filters = self - .filter_service - .get_layer2_filters(&self.remote_domain) - .await; - for filter in &layer2_filters { - match self.client.subscribe(filter.clone(), None).await { - Ok(output) => { - tracing::info!( - "Subscribed to Layer 2 (repo events) on {} (subscription: {})", - self.url, - output.id() - ); - } - Err(e) => { - tracing::error!("Failed to subscribe Layer 2 on {}: {}", self.url, e); - } - } - } - - // Layer 3: Related events (E/e tags) - let layer3_filters = self.filter_service.get_layer3_filters().await; - for filter in &layer3_filters { - match self.client.subscribe(filter.clone(), None).await { - Ok(output) => { - tracing::info!( - "Subscribed to Layer 3 (related events) on {} (subscription: {})", - self.url, - output.id() - ); - } - Err(e) => { - tracing::error!("Failed to subscribe Layer 3 on {}: {}", self.url, e); - } - } - } - - tracing::info!( - "Sync subscriptions active on {} (L1: {}, L2: {}, L3: {})", - self.url, - layer1_filters.len(), - layer2_filters.len(), - layer3_filters.len() - ); - - // Handle incoming notifications - let url = self.url.clone(); - let metrics = self.metrics.clone(); - self.client - .handle_notifications(|notification| { - let tx = tx.clone(); - let url = url.clone(); - let metrics = metrics.clone(); - async move { - match notification { - RelayPoolNotification::Event { event, .. } => { - tracing::debug!( - "Received event {} from {} (kind {})", - event.id, - url, - event.kind.as_u16() - ); - - // Record live event metric - if let Some(ref m) = metrics { - m.record_event(event_source::LIVE); - } - - // Send the event to the manager for processing - let synced = SyncedEvent { - event: (*event).clone(), - source_url: url.clone(), - }; - - if let Err(e) = tx.send(synced).await { - tracing::warn!("Failed to send synced event: {}", e); - return Ok(true); // Stop if channel is closed - } - } - RelayPoolNotification::Shutdown => { - tracing::warn!("Relay connection shutdown for {}", url); - return Ok(true); // Stop on shutdown - } - RelayPoolNotification::Message { message, .. } => { - tracing::trace!("Received message from {}: {:?}", url, message); - } - } - Ok(false) // Continue processing - } - }) - .await - .ok(); - } - - /// Handle dynamic subscription updates based on incoming event kind - /// - /// - kind 30617/30618: New announcement → add Layer 2 subscription - /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription - async fn handle_dynamic_subscription(&mut self, event: &Event) { - // Check if this is an announcement kind (triggers Layer 2 subscription) - if matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) { - if let Some(new_filters) = self.subscription_manager.add_announcement(event) { - tracing::info!( - "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", - event.id.to_hex(), - self.url, - new_filters.len(), - self.subscription_manager.get_filter_count() - ); - self.subscribe_to_filters(new_filters, "Layer 2").await; - } - } - - // Check if this is a Patch/PR/Issue kind (triggers Layer 3 subscription) - if matches!( - event.kind, - Kind::GitPatch | Kind::GitIssue | Kind::Custom(1618) - ) { - if let Some(new_filters) = self.subscription_manager.add_event(event) { - tracing::info!( - "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", - event.id.to_hex(), - self.url, - new_filters.len(), - self.subscription_manager.get_filter_count() - ); - self.subscribe_to_filters(new_filters, "Layer 3").await; - } - } - - // Check if we need to consolidate - if self.subscription_manager.should_consolidate() { - self.consolidate_subscriptions().await; - } - } - - /// Subscribe to new filters - async fn subscribe_to_filters(&self, filters: Vec, layer_name: &str) { - for filter in filters { - match self.client.subscribe(filter, None).await { - Ok(output) => { - tracing::debug!( - "Dynamic {} subscription on {} (subscription: {})", - layer_name, - self.url, - output.id() - ); - } - Err(e) => { - tracing::warn!( - "Failed to add dynamic {} subscription on {}: {}", - layer_name, - self.url, - e - ); - } - } - } - } - - /// Consolidate subscriptions back to Layer 1 only - /// - /// This is triggered when the filter count exceeds 150. - /// All existing subscriptions are closed and only Layer 1 is re-subscribed. - async fn consolidate_subscriptions(&mut self) { - tracing::warn!( - "Filter count {} exceeds threshold, consolidating subscriptions on {}", - self.subscription_manager.get_filter_count(), - self.url - ); - - // Get consolidated filters (clears tracking and returns Layer 1 only) - let layer1_filters = self.subscription_manager.consolidate(); - - // Note: nostr-sdk doesn't provide a way to close all subscriptions easily - // The client will manage subscription count internally - // We just add the new Layer 1 subscription - - for filter in layer1_filters { - match self.client.subscribe(filter, None).await { - Ok(output) => { - tracing::info!( - "Consolidated to Layer 1 subscription on {} (subscription: {})", - self.url, - output.id() - ); - } - Err(e) => { - tracing::error!( - "Failed to subscribe Layer 1 after consolidation on {}: {}", - self.url, - e - ); - } - } - } - } - - /// Get the current filter count from the subscription manager - pub fn get_filter_count(&self) -> usize { - self.subscription_manager.get_filter_count() - } - - /// Check if subscriptions have been consolidated - pub fn is_consolidated(&self) -> bool { - self.subscription_manager.is_consolidated() - } -} - -/// Reconnect loop with health-aware exponential backoff -/// -/// This function manages the connection lifecycle with health tracking: -/// - Checks health state before attempting connections -/// - Reports success/failure to the health tracker -/// - Respects backoff delays from the health tracker -/// - Handles dead relay detection (24h+ failures) -/// -/// # Arguments -/// * `url` - The relay URL to connect to -/// * `tx` - Channel sender for synced events -/// * `filter_service` - FilterService for building subscriptions -/// * `our_domain` - Our relay's domain (used to extract remote domain) -/// * `health_tracker` - Health tracker for managing connection state -/// * `metrics` - Optional sync metrics for Prometheus -pub async fn connect_with_retry( - url: &str, - tx: mpsc::Sender, - filter_service: Arc, - _our_domain: &str, - health_tracker: Arc, - metrics: Option, -) { - // Extract remote domain from URL - let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); - - loop { - // Check if we should attempt connection based on health state - if !health_tracker.should_attempt_connection(url) { - // Wait for remaining backoff - if let Some(remaining) = health_tracker.get_remaining_backoff(url) { - tracing::debug!( - "Relay {} in backoff, waiting {:?} before retry", - url, - remaining - ); - tokio::time::sleep(remaining).await; - continue; - } - } - - // Log current health state for dead relays - if health_tracker.is_dead(url) { - tracing::info!( - "Attempting reconnection to dead relay {} (daily retry)", - url - ); - } - - match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()) - .await - { - Ok(conn) => { - // Record successful connection - health_tracker.record_success(url); - - // Record metrics - if let Some(ref m) = metrics { - m.record_connection_attempt(url, true); - m.set_relay_connected(url, true); - m.inc_connected_count(); - m.record_health_state(url, health_tracker.get_state(url)); - m.record_failure_count(url, 0); - } - - tracing::info!("Sync connection established to {}", url); - - // Run the connection (this blocks until disconnection) - conn.run(tx.clone()).await; - - // Connection ended - record as failure for reconnection backoff - // (The connection ending is considered a failure even if it worked for a while) - health_tracker.record_failure(url); - - // Update metrics for disconnection - if let Some(ref m) = metrics { - m.set_relay_connected(url, false); - m.dec_connected_count(); - m.record_health_state(url, health_tracker.get_state(url)); - m.record_failure_count(url, health_tracker.get_failure_count(url)); - } - - tracing::warn!("Sync connection to {} ended, will reconnect", url); - } - Err(e) => { - // Record connection failure - health_tracker.record_failure(url); - - let failure_count = health_tracker.get_failure_count(url); - let state = health_tracker.get_state(url); - - // Record metrics - if let Some(ref m) = metrics { - m.record_connection_attempt(url, false); - m.set_relay_connected(url, false); - m.record_health_state(url, state); - m.record_failure_count(url, failure_count); - - // Track dead relays - if state == super::health::HealthState::Dead { - m.inc_dead_count(); - } - } - - tracing::error!( - "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", - url, - failure_count, - state, - e - ); - } - } - - // Get the backoff duration from health tracker - // If the health tracker has no backoff set (shouldn't happen), use a small default - let wait_duration = health_tracker - .get_remaining_backoff(url) - .unwrap_or(Duration::from_secs(5)); - - tracing::debug!("Waiting {:?} before reconnecting to {}", wait_duration, url); - tokio::time::sleep(wait_duration).await; - } -} - -/// Extract domain from a URL -fn extract_domain_from_url(url: &str) -> Option { - let url = url - .trim_start_matches("ws://") - .trim_start_matches("wss://") - .trim_start_matches("http://") - .trim_start_matches("https://"); - - // Remove path - let domain = url.split('/').next()?; - - Some(domain.to_string()) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_extract_domain() { - assert_eq!( - extract_domain_from_url("ws://127.0.0.1:8080"), - Some("127.0.0.1:8080".to_string()) - ); - assert_eq!( - extract_domain_from_url("wss://relay.example.com/path"), - Some("relay.example.com".to_string()) - ); - } -} diff --git a/src/sync/filter.rs b/src/sync/filter.rs deleted file mode 100644 index 108c92a..0000000 --- a/src/sync/filter.rs +++ /dev/null @@ -1,451 +0,0 @@ -//! Filter Service for GRASP-02 Proactive Sync -//! -//! Implements the three-layer filter strategy for comprehensive event syncing: -//! - Layer 1: Announcement discovery (kinds 30617 + 30618) -//! - Layer 2: Repository events (A/a tags pointing to shared repos) -//! - Layer 3: Related events (E/e tags pointing to Layer 2 events) - -use std::collections::HashSet; - -use nostr_sdk::prelude::*; - -use crate::nostr::builder::SharedDatabase; -use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT; - -/// Maximum number of tags per filter to stay within relay limits -const MAX_TAGS_PER_FILTER: usize = 100; - -/// Kind for maintainer metadata (NIP-34) -const KIND_MAINTAINER_LIST: u16 = 30618; - -/// FilterService builds subscription filters for proactive sync -/// -/// Uses a three-layer strategy: -/// 1. Layer 1: Discover new repository announcements and maintainer metadata -/// 2. Layer 2: Sync events directly related to repositories we track -/// 3. Layer 3: Sync discussions and updates related to Layer 2 events -#[derive(Debug)] -pub struct FilterService { - database: SharedDatabase, - /// Our relay's domain for filtering - relay_domain: String, -} - -impl FilterService { - /// Create a new FilterService - /// - /// # Arguments - /// * `database` - Shared database for querying stored events - /// * `relay_domain` - Our relay's domain (used for filtering shared repos) - pub fn new(database: SharedDatabase, relay_domain: String) -> Self { - Self { - database, - relay_domain, - } - } - - /// Get Layer 1 filters for announcement discovery - /// - /// Returns filters for kinds 30617 (repository announcements) and 30618 (maintainer metadata) - pub fn get_layer1_filters(&self) -> Vec { - vec![Filter::new().kinds(vec![ - Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), - Kind::Custom(KIND_MAINTAINER_LIST), - ])] - } - - /// Get Layer 2 filters for repository-related events - /// - /// Queries the database for kind 30617 events and builds filters for events - /// with `a` tags pointing to repositories that reference both: - /// - Our relay (from clone tags) - /// - Are stored in our database (meaning they're relevant to us) - /// - /// # Arguments - /// * `remote_relay_domain` - The domain of the remote relay we're syncing from - pub async fn get_layer2_filters(&self, remote_relay_domain: &str) -> Vec { - // Query all kind 30617 events from our database - let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)); - - let events = match self.database.query(filter).await { - Ok(events) => events, - Err(e) => { - tracing::warn!("Failed to query announcements for Layer 2 filters: {}", e); - return Vec::new(); - } - }; - - // Build a set of addressable coordinates for repos that list both relays - let mut coords: Vec = Vec::new(); - - for event in events { - // Check if this repo lists our domain in clone tags - let has_our_relay = event.tags.iter().any(|tag| { - let tag_vec = tag.clone().to_vec(); - tag_vec.len() >= 2 - && (tag_vec[0] == "clone" || tag_vec[0] == "relays") - && tag_vec.iter().any(|v| v.contains(&self.relay_domain)) - }); - - // Check if this repo lists the remote relay in clone/relays tags - let has_remote_relay = event.tags.iter().any(|tag| { - let tag_vec = tag.clone().to_vec(); - tag_vec.len() >= 2 - && (tag_vec[0] == "clone" || tag_vec[0] == "relays") - && tag_vec.iter().any(|v| v.contains(remote_relay_domain)) - }); - - if has_our_relay && has_remote_relay { - // Extract the d tag (identifier) - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].clone()) - } else { - None - } - }) { - // Build the addressable coordinate: kind:pubkey:identifier - let coord = format!( - "{}:{}:{}", - KIND_REPOSITORY_ANNOUNCEMENT, - event.pubkey.to_hex(), - identifier - ); - coords.push(coord); - } - } - } - - if coords.is_empty() { - return Vec::new(); - } - - // Batch coordinates into filters with A/a/q tags - Self::batch_layer2_filters(coords) - } - - /// Get Layer 3 filters for related events - /// - /// Queries the database for events with `a` tags (PRs, Issues, etc.) - /// and builds filters for events that reference them with `e` tags. - pub async fn get_layer3_filters(&self) -> Vec { - // Query events that reference repositories (have 'a' tags with 30617) - // These are typically PRs (1618), Issues (1621), etc. - - // First, get all kind 30617 announcements - let announcement_filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)); - - let announcements = match self.database.query(announcement_filter).await { - Ok(events) => events, - Err(e) => { - tracing::warn!("Failed to query announcements for Layer 3 filters: {}", e); - return Vec::new(); - } - }; - - // Build a set of event IDs from PRs, Issues, etc. that reference our repos - let mut event_ids: Vec = Vec::new(); - - // Get the set of valid repository coordinates - let repo_coords: HashSet = announcements - .iter() - .filter_map(|e| { - e.tags.iter().find_map(|tag| { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(format!( - "{}:{}:{}", - KIND_REPOSITORY_ANNOUNCEMENT, - e.pubkey.to_hex(), - tag_vec[1] - )) - } else { - None - } - }) - }) - .collect(); - - if repo_coords.is_empty() { - return Vec::new(); - } - - // Query for PR and Patch events from our repositories - let repos_pr_patch_filter = Filter::new().kinds(vec![ - Kind::Custom(1617), // Patch - Kind::Custom(1618), // PR - ]); - - let related_events = match self.database.query(repos_pr_patch_filter).await { - Ok(events) => events, - Err(e) => { - tracing::warn!("Failed to query related events for Layer 3 filters: {}", e); - return Vec::new(); - } - }; - - // Collect event IDs that reference our repositories - for event in related_events { - // Check if this event has an 'a' tag pointing to one of our repos - let references_our_repo = event.tags.iter().any(|tag| { - let tag_vec = tag.clone().to_vec(); - tag_vec.len() >= 2 && tag_vec[0] == "a" && repo_coords.contains(&tag_vec[1]) - }); - - if references_our_repo { - event_ids.push(event.id.to_hex()); - } - } - - if event_ids.is_empty() { - return Vec::new(); - } - - // Batch event IDs into filters with 'E', 'e', and 'q' tags - Self::batch_layer3_filters(event_ids) - } - - /// Batch a list of addressable coordinates into Layer 2 filters with 'A', 'a', and 'q' tags - /// - /// Different Nostr clients use different tag conventions for referencing repository - /// announcements. This function generates THREE filters per chunk to capture all variants: - /// - Uppercase 'A' tags (used by some clients) - /// - Lowercase 'a' tags (standard addressable event reference) - /// - Lowercase 'q' tags (quote tags, used by some clients) - /// - /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets. - fn batch_layer2_filters(coords: Vec) -> Vec { - if coords.is_empty() { - return Vec::new(); - } - - coords - .chunks(MAX_TAGS_PER_FILTER) - .flat_map(|chunk| { - // Create THREE filters per chunk - one for each tag type - vec![ - // Uppercase A tag filter - Filter::new().custom_tags( - SingleLetterTag::uppercase(Alphabet::A), - chunk.iter().cloned(), - ), - // Lowercase a tag filter - Filter::new().custom_tags( - SingleLetterTag::lowercase(Alphabet::A), - chunk.iter().cloned(), - ), - // Quote q tag filter - Filter::new().custom_tags( - SingleLetterTag::lowercase(Alphabet::Q), - chunk.iter().cloned(), - ), - ] - }) - .collect() - } - - /// Batch a list of event IDs into Layer 3 filters with 'E', 'e', and 'q' tags - /// - /// Different Nostr clients use different tag conventions for referencing events. - /// This function generates THREE filters per chunk to capture all variants: - /// - Uppercase 'E' tags (used by some clients) - /// - Lowercase 'e' tags (standard event reference) - /// - Lowercase 'q' tags (quote tags, used by some clients) - /// - /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets. - fn batch_layer3_filters(event_ids: Vec) -> Vec { - if event_ids.is_empty() { - return Vec::new(); - } - - event_ids - .chunks(MAX_TAGS_PER_FILTER) - .flat_map(|chunk| { - // Create THREE filters per chunk - one for each tag type - vec![ - // Uppercase E tag filter - Filter::new().custom_tags( - SingleLetterTag::uppercase(Alphabet::E), - chunk.iter().cloned(), - ), - // Lowercase e tag filter - Filter::new().custom_tags( - SingleLetterTag::lowercase(Alphabet::E), - chunk.iter().cloned(), - ), - // Quote q tag filter - Filter::new().custom_tags( - SingleLetterTag::lowercase(Alphabet::Q), - chunk.iter().cloned(), - ), - ] - }) - .collect() - } - - /// Discover relay URLs from stored kind 30617 announcements - /// - /// Only returns relay URLs from repositories that list **our** relay. - /// This ensures we only connect to relays where we have shared repos, - /// avoiding wasted connections with empty Layer 2 filters. - /// - /// Extracts unique relay URLs from `clone` and `relays` tags, - /// excluding our own relay domain. - pub async fn discover_relay_urls(&self) -> Vec { - let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)); - - let events = match self.database.query(filter).await { - Ok(events) => events, - Err(e) => { - tracing::warn!("Failed to query announcements for relay discovery: {}", e); - return Vec::new(); - } - }; - - let mut relay_urls: HashSet = HashSet::new(); - - for event in events { - // First check: Does this repo list our relay? - // Only process repos that reference us - otherwise we'd connect to relays - // where we have no shared repos, resulting in empty Layer 2 filters. - let has_our_relay = event.tags.iter().any(|tag| { - let tag_vec = tag.clone().to_vec(); - tag_vec.len() >= 2 - && (tag_vec[0] == "clone" || tag_vec[0] == "relays") - && tag_vec.iter().any(|v| v.contains(&self.relay_domain)) - }); - - if !has_our_relay { - // Skip repos that don't list our relay - no shared repos possible - continue; - } - - // Extract relay URLs from repos that list us - for tag in event.tags.iter() { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() < 2 { - continue; - } - - // Extract URLs from clone and relays tags - if tag_vec[0] == "clone" || tag_vec[0] == "relays" { - for value in tag_vec.iter().skip(1) { - // Check if it looks like a URL - if value.starts_with("ws://") - || value.starts_with("wss://") - || value.starts_with("http://") - || value.starts_with("https://") - { - // Exclude our own relay - if !value.contains(&self.relay_domain) { - relay_urls.insert(value.clone()); - } - } - } - } - } - } - - relay_urls.into_iter().collect() - } - - /// Extract relay URLs from a specific event's clone tags - /// - /// Returns URLs that are not our own relay. - pub fn extract_relay_urls_from_event(&self, event: &Event) -> Vec { - let mut urls = Vec::new(); - - for tag in event.tags.iter() { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() < 2 { - continue; - } - - if tag_vec[0] == "clone" || tag_vec[0] == "relays" { - for value in tag_vec.iter().skip(1) { - if value.starts_with("ws://") - || value.starts_with("wss://") - || value.starts_with("http://") - || value.starts_with("https://") - { - if !value.contains(&self.relay_domain) { - urls.push(value.clone()); - } - } - } - } - } - - urls - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_batch_layer2_filters_empty() { - let filters = FilterService::batch_layer2_filters(vec![]); - assert!(filters.is_empty()); - } - - #[test] - fn test_batch_layer2_filters_small() { - let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()]; - let filters = FilterService::batch_layer2_filters(coords); - // 1 chunk × 3 tag types (A, a, q) = 3 filters - assert_eq!(filters.len(), 3); - } - - #[test] - fn test_batch_layer2_filters_large() { - // Create 250 coordinates to test batching - let coords: Vec = (0..250) - .map(|i| format!("30617:pubkey{}:repo{}", i, i)) - .collect(); - - let filters = FilterService::batch_layer2_filters(coords); - // 3 chunks (100 + 100 + 50) × 3 tag types (A, a, q) = 9 filters - assert_eq!(filters.len(), 9); - } - - #[test] - fn test_batch_layer3_filters_empty() { - let filters = FilterService::batch_layer3_filters(vec![]); - assert!(filters.is_empty()); - } - - #[test] - fn test_batch_layer3_filters_small() { - let event_ids = vec!["eventid1".to_string(), "eventid2".to_string()]; - let filters = FilterService::batch_layer3_filters(event_ids); - // 1 chunk × 3 tag types (E, e, q) = 3 filters - assert_eq!(filters.len(), 3); - } - - #[test] - fn test_batch_layer3_filters_large() { - // Create 250 event IDs to test batching - let event_ids: Vec = (0..250).map(|i| format!("eventid{:064}", i)).collect(); - - let filters = FilterService::batch_layer3_filters(event_ids); - // 3 chunks (100 + 100 + 50) × 3 tag types (E, e, q) = 9 filters - assert_eq!(filters.len(), 9); - } - - #[test] - fn test_layer1_filters() { - // Create a mock database - we'll use a memory database for testing - // This test just verifies the filter structure - let filter = Filter::new().kinds(vec![ - Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), - Kind::Custom(KIND_MAINTAINER_LIST), - ]); - - // Verify the filter has the correct kinds - // Note: We can't easily inspect Filter internals, but we can ensure it compiles - assert!(!filter.is_empty()); - } -} diff --git a/src/sync/manager.rs b/src/sync/manager.rs deleted file mode 100644 index 6ae82ef..0000000 --- a/src/sync/manager.rs +++ /dev/null @@ -1,762 +0,0 @@ -//! SyncManager - Coordinates proactive sync operations -//! -//! The SyncManager connects to remote relays, receives events, validates them -//! through the write policy, and stores accepted events. -//! -//! ## Simplified Relay Discovery Architecture -//! -//! All relay discovery is centralized in the self-subscriber: -//! - Bootstrap relay: connected immediately (no jitter, single relay) -//! - All other relays: discovered via self-subscriber announcements (with jitter) -//! -//! ```text -//! ┌─────────────────────────────────────────────────────────────┐ -//! │ ngit-grasp │ -//! │ │ -//! │ ┌─────────────┐ broadcasts ┌───────────────┐ │ -//! │ │ Relay │ ─────────────────────▶ │ Self-Subscribe│ │ -//! │ │ Database │ │ Client │ │ -//! │ └─────────────┘ └───────┬───────┘ │ -//! │ ▲ │ │ -//! │ │ stores │ extracts │ -//! │ │ │ relay │ -//! │ ┌─────┴─────┐ │ URLs │ -//! │ │ Remote │◀────────────────────────────────┘ │ -//! │ │Connections│ spawns new │ -//! │ └───────────┘ connections (with jitter) │ -//! └─────────────────────────────────────────────────────────────┘ -//! ``` -//! -//! ## Key Design Decisions -//! -//! - **Single relay discovery path**: Only the self-subscriber discovers new relays -//! - **Jitter at point of discovery**: Applied when spawning connections from announcements -//! - **Since filter on reconnection**: Avoids re-processing old announcements after disconnect -//! - **Bootstrap relay has no jitter**: Single relay doesn't cause thundering herd -//! -//! ## Phase 2 Features -//! -//! - Relay discovery from kind 30617 announcements (via self-subscriber) -//! - Multiple simultaneous relay connections -//! - Three-layer filter strategy via FilterService -//! -//! ## Phase 3 Features -//! -//! - Health tracking with exponential backoff -//! - Dead relay detection after 24h of failures -//! - Startup jitter to prevent thundering herd -//! -//! ## Phase 4 Features -//! -//! - Dynamic subscription updates handled per-connection -//! - Each connection manages its own SubscriptionManager -//! - Announcements trigger Layer 2 subscriptions -//! - PRs/Issues trigger Layer 3 subscriptions -//! - Consolidation when filter count exceeds 150 - -use std::collections::HashSet; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::Arc; -use std::time::Duration; - -use nostr_relay_builder::prelude::*; -use nostr_sdk::prelude::{Client, Filter, Kind, RelayPoolNotification, Timestamp}; -use rand::Rng; -use tokio::sync::mpsc; - -use super::connection::{connect_with_retry, SyncedEvent}; -use super::filter::FilterService; -use super::health::RelayHealthTracker; -use super::metrics::SyncMetrics; -use crate::config::Config; -use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; -use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT; - -/// Default fallback address for sync source when bind_address cannot be parsed -/// -/// This distinguishes synced events from directly-submitted events in logs and metrics. -/// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker. -pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - -/// Derive sync source address from config bind_address -/// -/// Parses the bind_address string and returns a SocketAddr. -/// Falls back to 127.0.0.1:8080 if parsing fails. -fn get_sync_source_addr(bind_address: &str) -> SocketAddr { - bind_address - .parse() - .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) -} - -/// Derive the WebSocket URL for our own relay from bind_address -fn derive_own_relay_url(bind_address: &str) -> String { - format!("ws://{}", bind_address) -} - -/// Coordinates proactive sync from configured and discovered relays -pub struct SyncManager { - /// Bootstrap relay URL for initial sync (from config) - /// Additional relays are discovered from repository announcements that list our service - bootstrap_relay_url: Option, - /// Our relay's domain (for filtering) - relay_domain: String, - /// Our relay's WebSocket URL (for self-subscribe) - own_relay_url: String, - /// Database for storing accepted events - database: SharedDatabase, - /// Write policy for validating events - write_policy: Nip34WritePolicy, - /// Health tracker for relay connections - health_tracker: Arc, - /// Sync metrics for Prometheus - metrics: Option, - /// Source address for synced events (derived from config.bind_address) - sync_source_addr: SocketAddr, - /// Maximum startup jitter in milliseconds (from config) - startup_jitter_ms: u64, -} - -impl SyncManager { - /// Create a new SyncManager - /// - /// # Arguments - /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config - /// * `relay_domain` - Our relay's domain (used to exclude self from sync) - /// * `database` - Shared database for storing events and querying announcements - /// * `write_policy` - Write policy for validating synced events - /// * `config` - Configuration for health tracking settings - pub fn new( - bootstrap_relay_url: Option, - relay_domain: String, - database: SharedDatabase, - write_policy: Nip34WritePolicy, - config: &Config, - ) -> Self { - let own_relay_url = derive_own_relay_url(&config.bind_address); - Self { - bootstrap_relay_url, - relay_domain, - own_relay_url, - database, - write_policy, - health_tracker: Arc::new(RelayHealthTracker::new(config)), - metrics: None, - sync_source_addr: get_sync_source_addr(&config.bind_address), - startup_jitter_ms: config.sync_startup_jitter_ms, - } - } - - /// Create a new SyncManager with metrics - /// - /// # Arguments - /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config - /// * `relay_domain` - Our relay's domain (used to exclude self from sync) - /// * `database` - Shared database for storing events and querying announcements - /// * `write_policy` - Write policy for validating synced events - /// * `config` - Configuration for health tracking settings - /// * `metrics` - Sync metrics for Prometheus - pub fn with_metrics( - bootstrap_relay_url: Option, - relay_domain: String, - database: SharedDatabase, - write_policy: Nip34WritePolicy, - config: &Config, - metrics: SyncMetrics, - ) -> Self { - let own_relay_url = derive_own_relay_url(&config.bind_address); - Self { - bootstrap_relay_url, - relay_domain, - own_relay_url, - database, - write_policy, - health_tracker: Arc::new(RelayHealthTracker::new(config)), - metrics: Some(metrics), - sync_source_addr: get_sync_source_addr(&config.bind_address), - startup_jitter_ms: config.sync_startup_jitter_ms, - } - } - - /// Create a SyncManager with a single relay URL (Phase 1 compatibility) - pub fn with_single_relay( - bootstrap_url: String, - database: SharedDatabase, - write_policy: Nip34WritePolicy, - ) -> Self { - // Extract domain from URL for filtering - let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default(); - let own_relay_url = format!("ws://{}", relay_domain); - Self { - bootstrap_relay_url: Some(bootstrap_url), - relay_domain, - own_relay_url, - database, - write_policy, - health_tracker: Arc::new(RelayHealthTracker::with_defaults()), - metrics: None, - sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, - startup_jitter_ms: 10_000, // Default 10 seconds - } - } - - /// Set metrics for the sync manager - pub fn set_metrics(&mut self, metrics: SyncMetrics) { - self.metrics = Some(metrics); - } - - /// Get a reference to the metrics - pub fn metrics(&self) -> Option<&SyncMetrics> { - self.metrics.as_ref() - } - - /// Get a reference to the health tracker - pub fn health_tracker(&self) -> Arc { - self.health_tracker.clone() - } - - /// Run the sync manager - /// - /// This spawns the bootstrap relay connection (if configured), sets up a - /// self-subscriber for event-driven relay discovery, and processes incoming - /// events. The self-subscriber handles ALL relay discovery from announcements. - /// Runs indefinitely until cancelled. - /// - /// ## Simplified Relay Discovery Architecture - /// - /// All relay discovery is centralized in the self-subscriber: - /// - Bootstrap relay: connected immediately (no jitter, single relay) - /// - All other relays: discovered via self-subscriber announcements (with jitter) - /// - Jitter applied at point of discovery (not startup) - /// - /// This eliminates three redundant discovery paths: - /// 1. DB query at startup (removed) - /// 2. Remote event extraction (removed) - /// 3. Self-subscriber (sole discovery path) - pub async fn run(self) { - tracing::info!( - "Starting SyncManager (domain: {}, own_relay: {}, bootstrap relay: {:?})", - self.relay_domain, - self.own_relay_url, - self.bootstrap_relay_url - ); - - // Create the filter service - let filter_service = Arc::new(FilterService::new( - self.database.clone(), - self.relay_domain.clone(), - )); - - // Create channel for receiving events from all connections - let (tx, mut rx) = mpsc::channel::(100); - - // Track active relay URLs to avoid duplicates (wrapped in Arc for sharing) - let active_relays = Arc::new(tokio::sync::Mutex::new(HashSet::::new())); - - // Bootstrap relay - connect immediately (no jitter, just one relay) - if let Some(ref url) = self.bootstrap_relay_url { - if !self.is_own_relay(url) { - tracing::info!("Connecting to bootstrap relay: {}", url); - active_relays.lock().await.insert(url.clone()); - self.spawn_connection(url.clone(), tx.clone(), filter_service.clone(), false); - } else { - tracing::info!("Skipping bootstrap relay (is our own relay): {}", url); - } - } - - // Record initial tracked relay count - if let Some(ref metrics) = self.metrics { - let count = active_relays.lock().await.len(); - metrics.set_tracked_count(count as i64); - } - - { - let active = active_relays.lock().await; - if active.is_empty() { - tracing::info!( - "No bootstrap relay configured, waiting for announcements via self-subscriber..." - ); - } else { - tracing::info!( - "SyncManager connected to {} relay(s): {:?}", - active.len(), - *active - ); - } - } - - // Spawn self-subscriber task for ALL relay discovery - let self_subscriber_handle = self.spawn_self_subscriber( - tx.clone(), - filter_service.clone(), - active_relays.clone(), - ); - - // Process incoming events - just validate and store, NO relay discovery - // (relay discovery is handled solely by the self-subscriber) - while let Some(synced_event) = rx.recv().await { - self.process_event(synced_event).await; - } - - // Clean up self-subscriber - self_subscriber_handle.abort(); - tracing::warn!("SyncManager event channel closed, shutting down"); - } - - /// Check if a URL points to our own relay - fn is_own_relay(&self, url: &str) -> bool { - url.contains(&self.relay_domain) - } - - /// Spawn a self-subscriber task that connects to our own relay - /// and watches for kind 30617 announcements to discover new relays. - /// - /// This is the SOLE relay discovery path - all relay discovery happens here. - /// When a new announcement is saved to our database (from direct submission - /// or synced from another relay), the self-subscriber receives it immediately - /// and spawns connections to newly discovered relays (with jitter). - fn spawn_self_subscriber( - &self, - tx: mpsc::Sender, - filter_service: Arc, - active_relays: Arc>>, - ) -> tokio::task::JoinHandle<()> { - let own_relay_url = self.own_relay_url.clone(); - let relay_domain = self.relay_domain.clone(); - let metrics = self.metrics.clone(); - let health_tracker = self.health_tracker.clone(); - let startup_jitter_ms = self.startup_jitter_ms; - - tokio::spawn(async move { - Self::run_self_subscriber_loop( - own_relay_url, - relay_domain, - tx, - filter_service, - active_relays, - metrics, - health_tracker, - startup_jitter_ms, - ) - .await; - }) - } - - /// Main loop for the self-subscriber - /// - /// Connects to our own relay, subscribes to kind 30617 announcements, - /// and processes events to discover new relays. Handles reconnection - /// on disconnect. - /// - /// ## Reconnection Behavior - /// - /// - First connection: no `since` filter (get all historical announcements) - /// - Reconnections: use `since` filter (15 minutes ago) to avoid re-processing - #[allow(clippy::too_many_arguments)] - async fn run_self_subscriber_loop( - own_relay_url: String, - relay_domain: String, - tx: mpsc::Sender, - filter_service: Arc, - active_relays: Arc>>, - metrics: Option, - health_tracker: Arc, - startup_jitter_ms: u64, - ) { - let mut reconnect_delay = Duration::from_secs(1); - let max_reconnect_delay = Duration::from_secs(60); - let mut is_first_connection = true; - - loop { - tracing::info!( - "Self-subscriber connecting to own relay: {}", - own_relay_url - ); - - match Self::connect_self_subscriber(&own_relay_url).await { - Ok(client) => { - // Reset reconnect delay on successful connection - reconnect_delay = Duration::from_secs(1); - - tracing::info!( - "Self-subscriber connected to own relay, subscribing to kind {} announcements{}", - KIND_REPOSITORY_ANNOUNCEMENT, - if is_first_connection { " (initial, no since filter)" } else { " (reconnection, with since filter)" } - ); - - // Subscribe to repository announcements - // First connection: get all historical; reconnections: only last 15 minutes - let filter = if is_first_connection { - Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) - } else { - let since = Timestamp::now() - 15 * 60; // 15 minutes ago - Filter::new() - .kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) - .since(since) - }; - - is_first_connection = false; - - if let Err(e) = client.subscribe(filter, None).await { - tracing::error!( - "Self-subscriber failed to subscribe on {}: {}", - own_relay_url, - e - ); - // Will reconnect after delay - } else { - // Handle notifications until disconnect - Self::handle_self_subscriber_notifications( - &client, - &own_relay_url, - &relay_domain, - &tx, - &filter_service, - &active_relays, - &metrics, - &health_tracker, - startup_jitter_ms, - ) - .await; - } - - // Disconnect and cleanup - client.disconnect().await; - } - Err(e) => { - tracing::warn!( - "Self-subscriber failed to connect to {}: {}", - own_relay_url, - e - ); - } - } - - // Wait before reconnecting with exponential backoff - tracing::debug!( - "Self-subscriber will reconnect to {} in {:?}", - own_relay_url, - reconnect_delay - ); - tokio::time::sleep(reconnect_delay).await; - reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); - } - } - - /// Connect to our own relay for self-subscribing - async fn connect_self_subscriber( - url: &str, - ) -> Result> { - let client = Client::default(); - client.add_relay(url).await?; - client.connect().await; - - // Wait for connection to establish (with timeout) - let timeout = Duration::from_secs(10); - let start = std::time::Instant::now(); - - while start.elapsed() < timeout { - let relays = client.relays().await; - if relays.values().any(|r| r.is_connected()) { - return Ok(client); - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - - Err("Timeout waiting for self-subscriber connection".into()) - } - - /// Handle notifications from the self-subscriber client - /// - /// Processes announcement events to discover new relay URLs. - /// Applies jitter before spawning connections to prevent thundering herd. - #[allow(clippy::too_many_arguments)] - async fn handle_self_subscriber_notifications( - client: &Client, - own_relay_url: &str, - relay_domain: &str, - tx: &mpsc::Sender, - filter_service: &Arc, - active_relays: &Arc>>, - metrics: &Option, - health_tracker: &Arc, - startup_jitter_ms: u64, - ) { - let own_relay_url = own_relay_url.to_string(); - let relay_domain = relay_domain.to_string(); - let filter_service = filter_service.clone(); - let active_relays = active_relays.clone(); - let metrics = metrics.clone(); - let health_tracker = health_tracker.clone(); - let tx = tx.clone(); - - client - .handle_notifications(|notification| { - let own_relay_url = own_relay_url.clone(); - let relay_domain = relay_domain.clone(); - let filter_service = filter_service.clone(); - let active_relays = active_relays.clone(); - let metrics = metrics.clone(); - let health_tracker = health_tracker.clone(); - let tx = tx.clone(); - - async move { - match notification { - RelayPoolNotification::Event { event, .. } => { - // Only process repository announcement events - if event.kind.as_u16() != KIND_REPOSITORY_ANNOUNCEMENT { - return Ok(false); - } - - tracing::debug!( - "Self-subscriber received announcement {} from {}", - event.id, - own_relay_url - ); - - // Extract relay URLs from the announcement - let new_urls = filter_service.extract_relay_urls_from_event(&event); - - for url in new_urls { - // Check if we should connect to this relay - let should_connect = { - let mut active = active_relays.lock().await; - let is_new = !active.contains(&url); - let is_not_self = !url.contains(&relay_domain); - - if is_new && is_not_self { - active.insert(url.clone()); - true - } else { - false - } - }; - - if should_connect { - tracing::info!( - "Self-subscriber discovered new relay from announcement {}, scheduling connection: {}", - event.id, - url - ); - - // Update tracked relay count - if let Some(ref m) = metrics { - m.inc_tracked_count(); - } - - // Spawn connection to the new relay WITH jitter at point of discovery - let url_clone = url.clone(); - let tx_clone = tx.clone(); - let filter_service_clone = filter_service.clone(); - let domain_clone = relay_domain.clone(); - let health_tracker_clone = health_tracker.clone(); - let metrics_clone = metrics.clone(); - - tokio::spawn(async move { - // Apply jitter at point of discovery - if startup_jitter_ms > 0 { - let jitter_ms = rand::thread_rng().gen_range(0..startup_jitter_ms); - tracing::debug!( - "Applying {}ms jitter before connecting to discovered relay {}", - jitter_ms, - url_clone - ); - tokio::time::sleep(Duration::from_millis(jitter_ms)).await; - } - - connect_with_retry( - &url_clone, - tx_clone, - filter_service_clone, - &domain_clone, - health_tracker_clone, - metrics_clone, - ) - .await; - }); - } - } - - Ok(false) // Continue processing - } - RelayPoolNotification::Shutdown => { - tracing::warn!( - "Self-subscriber connection shutdown for {}", - own_relay_url - ); - Ok(true) // Stop on shutdown - } - RelayPoolNotification::Message { .. } => { - Ok(false) // Continue processing - } - } - } - }) - .await - .ok(); - } - - /// Spawn a connection task for a relay - /// - /// # Arguments - /// * `url` - Relay URL to connect to - /// * `tx` - Channel sender for synced events - /// * `filter_service` - Filter service for subscriptions - /// * `apply_jitter` - Whether to apply startup jitter before connecting - fn spawn_connection( - &self, - url: String, - tx: mpsc::Sender, - filter_service: Arc, - apply_jitter: bool, - ) { - let domain = self.relay_domain.clone(); - let health_tracker = self.health_tracker.clone(); - let metrics = self.metrics.clone(); - let max_jitter = self.startup_jitter_ms; - - tokio::spawn(async move { - // Apply startup jitter if requested - if apply_jitter && max_jitter > 0 { - let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); - tracing::debug!( - "Applying {}ms jitter before connecting to {}", - jitter_ms, - url - ); - tokio::time::sleep(Duration::from_millis(jitter_ms)).await; - } - - connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; - }); - } - - /// Process a single synced event - /// - /// Events are validated through the write policy and stored if accepted. - /// Dynamic subscription updates are handled by each connection's SubscriptionManager. - async fn process_event(&self, synced_event: SyncedEvent) { - let event = &synced_event.event; - let event_id = event.id.to_hex(); - let kind = event.kind.as_u16(); - - tracing::debug!( - "Processing synced event {} (kind {}) from {}", - event_id, - kind, - synced_event.source_url - ); - - // Log subscription-relevant events for debugging - match kind { - 30617 | 30618 => { - tracing::debug!( - "Received announcement {} - connection will add Layer 2 subscription", - event_id - ); - } - 1617 | 1618 | 1619 | 1621 | 1622 => { - tracing::debug!( - "Received PR/Issue {} - connection will add Layer 3 subscription", - event_id - ); - } - _ => {} - } - - // Validate through write policy using sync_source_addr derived from config - let result = self - .write_policy - .admit_event(event, &self.sync_source_addr) - .await; - - match result { - PolicyResult::Accept => { - tracing::info!( - "Synced event {} (kind {}) accepted, storing", - event_id, - event.kind.as_u16() - ); - - // Store the event in the database - if let Err(e) = self.database.save_event(event).await { - tracing::error!("Failed to store synced event {}: {}", event_id, e); - } else { - tracing::debug!("Synced event {} stored successfully", event_id); - } - } - PolicyResult::Reject(reason) => { - tracing::info!( - "Synced event {} (kind {}) rejected: {}", - event_id, - event.kind.as_u16(), - reason - ); - } - } - } -} - -/// Extract domain from a WebSocket URL -/// -/// Examples: -/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" -/// - "wss://relay.example.com" -> "relay.example.com" -fn extract_domain_from_url(url: &str) -> Option { - let url = url - .trim_start_matches("ws://") - .trim_start_matches("wss://"); - let url = url - .trim_start_matches("http://") - .trim_start_matches("https://"); - - // Remove path - let domain = url.split('/').next()?; - - Some(domain.to_string()) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_extract_domain_ws() { - assert_eq!( - extract_domain_from_url("ws://127.0.0.1:8080"), - Some("127.0.0.1:8080".to_string()) - ); - } - - #[test] - fn test_extract_domain_wss() { - assert_eq!( - extract_domain_from_url("wss://relay.example.com"), - Some("relay.example.com".to_string()) - ); - } - - #[test] - fn test_extract_domain_with_path() { - assert_eq!( - extract_domain_from_url("ws://example.com/path"), - Some("example.com".to_string()) - ); - } - - #[test] - fn test_extract_domain_http() { - assert_eq!( - extract_domain_from_url("http://example.com:3000"), - Some("example.com:3000".to_string()) - ); - } - - #[test] - fn test_derive_own_relay_url() { - assert_eq!( - derive_own_relay_url("127.0.0.1:8080"), - "ws://127.0.0.1:8080".to_string() - ); - } -} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 17418d0..aa34490 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,40 +1,394 @@ //! Proactive Sync Module for GRASP-02 //! -//! This module implements proactive synchronization of kind 30617 (repository state) -//! events from configured relay(s). Events are validated through the same write policy -//! as directly-submitted events. +//! This module implements the proactive sync system that ensures data availability +//! for repositories hosted on this relay by syncing from other relays in the ecosystem. //! -//! ## Three-Layer Filter Strategy (Phase 2) +//! ## Architecture Overview //! -//! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) -//! - **Layer 2**: Repository events (A/a tags for shared repos) -//! - **Layer 3**: Related events (E/e tags for discussions, reviews) +//! The sync system is built around two core data structures: //! -//! ## Resilience & Health Tracking (Phase 3) +//! - **FollowingRepoRootEvents**: Tracks repository root events we're following +//! - **SyncRelays**: Tracks relays we sync from, including their repos and events //! -//! - **Health tracking**: Per-relay connection health states (Healthy, Degraded, Dead) -//! - **Exponential backoff**: Smart retry delays on failures (5s -> 1h max) -//! - **Dead relay handling**: Minimal retry for 24h+ failed relays -//! - **Startup jitter**: Prevent thundering herd on launch (0-10s random delay) +//! These type aliases are colocated with SyncManager (following the pattern of +//! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity. +//! +//! ## Submodules +//! +//! - [`health`]: Relay health tracking with exponential backoff and dead relay detection +//! - [`metrics`]: Prometheus metrics for sync operations +//! +//! ## Memory Estimates (from design doc) +//! +//! At target scale (1,000 repos, 100 relays): +//! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB +//! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB +//! - **Total in-memory state**: ~10 MB +//! +//! ## Upper Bounds (triggers for redesign) +//! +//! - 10,000+ repos: Consider database-backed state +//! - 500+ sync relays: Consider connection pooling +//! - 500+ root events per repo: Consider per-repo pagination +//! +//! ## Design References +//! +//! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md) +//! for the complete design context. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use nostr_sdk::EventId; +use tokio::sync::RwLock; + +use crate::config::Config; +use crate::nostr::builder::Nip34WritePolicy; +use crate::nostr::SharedDatabase; + +// ============================================================================= +// Type Aliases for Sync State +// ============================================================================= + +/// Repository root events we're following. +/// +/// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621) +/// we need to follow for each repository we host. +/// +/// ## Key Format +/// +/// The key is a repository addressable reference in the format: +/// `"30617::"` +/// +/// For example: `"30617:abc123...def:my-project"` +/// +/// ## Value +/// +/// A set of event IDs representing root events (PRs, Issues, Patches, Status events) +/// that reference this repository via an `a` tag. +/// +/// ## Event Kinds Tracked +/// +/// - **1617**: Patches (NIP-34) +/// - **1618**: Issues (NIP-34) +/// - **1619**: PRs (Pull Requests, NIP-34) +/// - **1621**: Status events (NIP-34) +/// +/// ## Invariants +/// +/// - May include a few extra repo refs that aren't in `SyncRelays` +/// - This is acceptable - we won't query other relays for them +/// - Updated incrementally via self-subscription +/// +/// ## Thread Safety +/// +/// Wrapped in `Arc>` for safe concurrent access from multiple +/// async tasks performing sync operations. +/// +/// ## Example Usage +/// +/// ```rust,ignore +/// use ngit_grasp::sync::FollowingRepoRootEvents; +/// use std::collections::HashSet; +/// use nostr_sdk::EventId; +/// +/// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) { +/// let guard = state.read().await; +/// if let Some(events) = guard.get(repo_ref) { +/// println!("Tracking {} root events for {}", events.len(), repo_ref); +/// } +/// } +/// ``` +pub type FollowingRepoRootEvents = Arc>>>; + +/// Relays we sync from, including their repos and events. +/// +/// This structure tracks which relays we need to connect to for syncing, +/// and for each relay, which repositories and their root events we're interested in. +/// +/// ## Key Format (Outer HashMap) +/// +/// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"` +/// +/// ## Value Format (Inner HashMap) +/// +/// For each relay, we maintain a map of: +/// - Key: Repository addressable reference (`"30617::"`) +/// - Value: Set of event IDs for that repo which should be synced from this relay +/// +/// ## Relay Selection Criteria +/// +/// A relay is included if its URL appears in a repository announcement (kind 30617) +/// that **also** lists our service URL. This ensures we only sync from relays +/// for repositories that are actually hosted on our relay. +/// +/// ## Bootstrap Relay +/// +/// If configured, the bootstrap relay is always present in this map and is +/// excluded from automatic removal logic. The bootstrap relay is used for +/// initial sync and discovery even when no repositories explicitly list it. +/// +/// ## Thread Safety +/// +/// Wrapped in `Arc>` for safe concurrent access from multiple +/// async tasks performing sync operations. +/// +/// ## Example Usage +/// +/// ```rust,ignore +/// use ngit_grasp::sync::SyncRelays; +/// use std::collections::{HashMap, HashSet}; +/// +/// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) { +/// let guard = state.read().await; +/// if let Some(repos) = guard.get(relay_url) { +/// println!("Relay {} tracks {} repos", relay_url, repos.len()); +/// for (repo_ref, events) in repos { +/// println!(" {} -> {} events", repo_ref, events.len()); +/// } +/// } +/// } +/// ``` +pub type SyncRelays = Arc>>>>; + +/// Creates a new empty `FollowingRepoRootEvents` state. +/// +/// Use this to initialize the state before populating from database queries. +pub fn new_following_repo_root_events() -> FollowingRepoRootEvents { + Arc::new(RwLock::new(HashMap::new())) +} + +/// Creates a new empty `SyncRelays` state. +/// +/// Use this to initialize the state before populating from database queries. +pub fn new_sync_relays() -> SyncRelays { + Arc::new(RwLock::new(HashMap::new())) +} + +// ============================================================================= +// SyncManager +// ============================================================================= + +/// Manages proactive synchronization with external relays. +/// +/// The SyncManager is responsible for: +/// - Discovering relays from stored repository announcements +/// - Maintaining connections to sync relays +/// - Subscribing to events at external relays +/// - Applying the acceptance policy to synced events +/// +/// ## Lifecycle +/// +/// 1. `new()` - Creates manager with database and config +/// 2. `run()` - Main async loop (call in a spawned task) +/// +/// ## Current Status +/// +/// This is a stub implementation. The core data structures are: +/// - [`FollowingRepoRootEvents`]: Repository root events we're following +/// - [`SyncRelays`]: Relays we sync from with their repos and events +/// +/// Full implementation will come in later phases. +pub struct SyncManager { + /// Bootstrap relay URL if configured + #[allow(dead_code)] + bootstrap_relay_url: Option, + + /// Our service domain for filtering repo announcements + #[allow(dead_code)] + service_domain: String, + + /// Database for querying/storing events + #[allow(dead_code)] + database: SharedDatabase, + + /// Write policy for applying acceptance rules + #[allow(dead_code)] + write_policy: Nip34WritePolicy, + + /// Repository root events we're following (Phase 1 data structure) + #[allow(dead_code)] + following_repo_root_events: FollowingRepoRootEvents, + + /// Relays we sync from (Phase 1 data structure) + #[allow(dead_code)] + sync_relays: SyncRelays, + + /// Max backoff duration for relay reconnection + #[allow(dead_code)] + max_backoff_secs: u64, +} + +impl SyncManager { + /// Creates a new SyncManager. + /// + /// # Arguments + /// + /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync + /// * `service_domain` - Our domain for filtering announcements + /// * `database` - Database for event storage/queries + /// * `write_policy` - Policy for accepting events + /// * `config` - Configuration for sync parameters + pub fn new( + bootstrap_relay_url: Option, + service_domain: String, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + config: &Config, + ) -> Self { + Self { + bootstrap_relay_url, + service_domain, + database, + write_policy, + following_repo_root_events: new_following_repo_root_events(), + sync_relays: new_sync_relays(), + max_backoff_secs: config.sync_max_backoff_secs, + } + } + + /// Returns a reference to the following repo root events state. + /// + /// This is the Phase 1 data structure tracking which repository root events + /// (kinds 1617, 1618, 1619, 1621) we're following. + pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents { + &self.following_repo_root_events + } + + /// Returns a reference to the sync relays state. + /// + /// This is the Phase 1 data structure tracking which relays we sync from + /// and their associated repositories/events. + pub fn sync_relays(&self) -> &SyncRelays { + &self.sync_relays + } + + /// Runs the sync manager main loop. + /// + /// This method should be called in a spawned task: + /// + /// ```rust,ignore + /// tokio::spawn(async move { + /// sync_manager.run().await; + /// }); + /// ``` + /// + /// ## Current Status + /// + /// This is a stub that logs and then waits indefinitely. + /// Full implementation includes: + /// - Phase 2: Database initialization queries + /// - Phase 3: Self-subscription for incremental updates + /// - Phase 4-6: Filter building, connection management + /// - Phase 7: Full sync loop + pub async fn run(self) { + tracing::info!( + "SyncManager stub started (bootstrap_relay={:?}, domain={})", + self.bootstrap_relay_url, + self.service_domain + ); + + tracing::info!( + "Phase 1 data structures initialized: following_repo_root_events, sync_relays" + ); + + // Stub: just wait indefinitely until full implementation + // This prevents the spawned task from immediately completing + loop { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + } + } +} + +// ============================================================================= +// Submodules +// ============================================================================= -mod connection; -mod filter; pub mod health; -mod manager; pub mod metrics; -pub mod negentropy; -mod subscription; - -pub use filter::FilterService; -pub use health::{HealthState, RelayHealth, RelayHealthTracker}; -pub use manager::SyncManager; -pub use metrics::SyncMetrics; -pub use negentropy::NegentropyService; -pub use subscription::SubscriptionManager; - -// Re-export default sync source address for backward compatibility with modules like negentropy.rs -// Manager.rs derives sync_source_addr from config.bind_address at runtime -pub use manager::DEFAULT_SYNC_SOURCE_ADDR as SYNC_SOURCE_ADDR; - -/// Kind for repository state events (NIP-34) -pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file + +// Re-export commonly used types +pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker}; +pub use metrics::{event_source, SyncMetrics}; + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_following_repo_root_events_basic_operations() { + let state = new_following_repo_root_events(); + + // Insert some events + { + let mut guard = state.write().await; + let repo_ref = "30617:abc123:my-project".to_string(); + guard + .entry(repo_ref) + .or_default() + .insert(EventId::all_zeros()); + } + + // Read back + { + let guard = state.read().await; + assert_eq!(guard.len(), 1); + assert!(guard.contains_key("30617:abc123:my-project")); + } + } + + #[tokio::test] + async fn test_sync_relays_basic_operations() { + let state = new_sync_relays(); + + // Insert relay with repos + { + let mut guard = state.write().await; + let relay_url = "wss://relay.example.com".to_string(); + let repo_ref = "30617:abc123:my-project".to_string(); + + guard + .entry(relay_url) + .or_default() + .entry(repo_ref) + .or_default() + .insert(EventId::all_zeros()); + } + + // Read back + { + let guard = state.read().await; + assert_eq!(guard.len(), 1); + let relay_repos = guard.get("wss://relay.example.com").unwrap(); + assert_eq!(relay_repos.len(), 1); + let events = relay_repos.get("30617:abc123:my-project").unwrap(); + assert_eq!(events.len(), 1); + } + } + + #[tokio::test] + async fn test_concurrent_access() { + let state = new_following_repo_root_events(); + let state_clone = Arc::clone(&state); + + // Writer task + let writer = tokio::spawn(async move { + let mut guard = state_clone.write().await; + guard + .entry("30617:writer:repo".to_string()) + .or_default() + .insert(EventId::all_zeros()); + }); + + // Wait for writer + writer.await.unwrap(); + + // Reader should see the change + let guard = state.read().await; + assert!(guard.contains_key("30617:writer:repo")); + } +} \ No newline at end of file diff --git a/src/sync/negentropy.rs b/src/sync/negentropy.rs deleted file mode 100644 index 5c0a246..0000000 --- a/src/sync/negentropy.rs +++ /dev/null @@ -1,477 +0,0 @@ -//! Negentropy Catchup Service for GRASP-02 Phase 5 -//! -//! Implements gap-filling synchronization to ensure no events are missed during: -//! - Startup (initial sync after warm-up period) -//! - Reconnection (after connection restore) -//! - Daily maintenance (periodic full reconciliation) -//! -//! ## Note on NIP-77 -//! -//! This implementation uses a simplified gap-filling strategy (fetch and compare) -//! rather than full NIP-77 negentropy set reconciliation. The nostr-sdk 0.44 does -//! not include built-in negentropy support, so we implement an equivalent approach: -//! -//! 1. Fetch events from relay using same filters as live sync -//! 2. Compare with local database (skip already-stored events) -//! 3. Validate and store missing events through policy -//! -//! Full NIP-77 support can be added in a future release if needed. - -use std::collections::HashMap; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use nostr_relay_builder::prelude::*; -use nostr_sdk::prelude::*; -use tokio::sync::RwLock; - -use super::filter::FilterService; -use super::SYNC_SOURCE_ADDR; -use crate::config::Config; -use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; - -/// Default startup delay before first catchup (30 seconds) -const DEFAULT_STARTUP_DELAY_SECS: u64 = 30; - -/// Default delay after reconnection before catchup (10 seconds) -const DEFAULT_RECONNECT_DELAY_SECS: u64 = 10; - -/// Default lookback period for reconnect catchup (3 days) -const DEFAULT_RECONNECT_LOOKBACK_DAYS: u64 = 3; - -/// Daily catchup interval (24 hours) -const DAILY_CATCHUP_INTERVAL_SECS: u64 = 86400; - -/// Stagger delay between relays for catchup operations (5 minutes) -const RELAY_STAGGER_SECS: u64 = 300; - -/// Timeout for fetching events during catchup -const CATCHUP_FETCH_TIMEOUT_SECS: u64 = 60; - -/// Negentropy Catchup Service -/// -/// Manages gap-filling operations for different scenarios: -/// - Startup catchup after warm-up period -/// - Reconnect catchup after connection restore -/// - Daily catchup for periodic maintenance -#[derive(Debug)] -pub struct NegentropyService { - /// Database for storing and querying events - database: SharedDatabase, - /// Filter service for building catchup filters - filter_service: Arc, - /// Write policy for validating synced events - write_policy: Nip34WritePolicy, - /// Startup time of the service - startup_time: Instant, - /// Configuration values - startup_delay_secs: u64, - reconnect_delay_secs: u64, - reconnect_lookback_days: u64, - /// Whether startup catchup has been run - startup_catchup_completed: Arc>, - /// Last daily catchup time per relay - last_daily_catchup: Arc>>, -} - -impl NegentropyService { - /// Create a new NegentropyService - /// - /// # Arguments - /// * `database` - Shared database for storing events - /// * `filter_service` - Filter service for building catchup filters - /// * `write_policy` - Write policy for validating events - /// * `config` - Configuration for catchup timing - pub fn new( - database: SharedDatabase, - filter_service: Arc, - write_policy: Nip34WritePolicy, - config: &Config, - ) -> Self { - Self { - database, - filter_service, - write_policy, - startup_time: Instant::now(), - startup_delay_secs: config.sync_startup_delay_secs, - reconnect_delay_secs: config.sync_reconnect_delay_secs, - reconnect_lookback_days: config.sync_reconnect_lookback_days, - startup_catchup_completed: Arc::new(RwLock::new(false)), - last_daily_catchup: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Create a NegentropyService with default configuration - pub fn with_defaults( - database: SharedDatabase, - filter_service: Arc, - write_policy: Nip34WritePolicy, - ) -> Self { - Self { - database, - filter_service, - write_policy, - startup_time: Instant::now(), - startup_delay_secs: DEFAULT_STARTUP_DELAY_SECS, - reconnect_delay_secs: DEFAULT_RECONNECT_DELAY_SECS, - reconnect_lookback_days: DEFAULT_RECONNECT_LOOKBACK_DAYS, - startup_catchup_completed: Arc::new(RwLock::new(false)), - last_daily_catchup: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Check if startup catchup should run - /// - /// Returns true if: - /// - Startup delay has elapsed (default 30s) - /// - Startup catchup hasn't been completed yet - pub async fn should_run_startup_catchup(&self) -> bool { - let completed = *self.startup_catchup_completed.read().await; - if completed { - return false; - } - - let elapsed = self.startup_time.elapsed(); - elapsed >= Duration::from_secs(self.startup_delay_secs) - } - - /// Check if daily catchup should run for a specific relay - /// - /// Returns true if 24 hours have elapsed since last daily catchup - pub async fn should_run_daily_catchup(&self, relay_url: &str) -> bool { - let last_catchup = self.last_daily_catchup.read().await; - - match last_catchup.get(relay_url) { - None => true, // Never run, should run - Some(last_time) => { - last_time.elapsed() >= Duration::from_secs(DAILY_CATCHUP_INTERVAL_SECS) - } - } - } - - /// Get the startup delay in seconds - pub fn startup_delay_secs(&self) -> u64 { - self.startup_delay_secs - } - - /// Get the reconnect delay in seconds - pub fn reconnect_delay_secs(&self) -> u64 { - self.reconnect_delay_secs - } - - /// Get the relay stagger delay in seconds - pub fn relay_stagger_secs(&self) -> u64 { - RELAY_STAGGER_SECS - } - - /// Run startup catchup for a relay - /// - /// Fetches all events matching the sync filters and stores any missing ones. - /// This is called after the startup warm-up period (default 30s). - /// - /// Returns the count of gap events filled. - pub async fn run_startup_catchup( - &self, - relay_url: &str, - remote_domain: &str, - ) -> Result> { - tracing::info!("Starting startup catchup for {}", relay_url); - - // Run full catchup (no time restriction) - let gap_count = self - .run_catchup(relay_url, remote_domain, None, "startup") - .await?; - - // Mark startup catchup as completed - { - let mut completed = self.startup_catchup_completed.write().await; - *completed = true; - } - - if gap_count > 0 { - tracing::warn!( - "Startup catchup filled {} gaps from {}", - gap_count, - relay_url - ); - } else { - tracing::info!("Startup catchup completed for {} (no gaps)", relay_url); - } - - Ok(gap_count) - } - - /// Run reconnect catchup for a relay - /// - /// Fetches events from the last 3 days (configurable) and stores any missing ones. - /// This is called after a connection is restored (after reconnect delay). - /// - /// Returns the count of gap events filled. - pub async fn run_reconnect_catchup( - &self, - relay_url: &str, - remote_domain: &str, - ) -> Result> { - tracing::info!("Starting reconnect catchup for {}", relay_url); - - // Calculate "since" timestamp (3 days ago) - let lookback_secs = self.reconnect_lookback_days * 24 * 60 * 60; - let since = Timestamp::now() - lookback_secs; - - let gap_count = self - .run_catchup(relay_url, remote_domain, Some(since), "reconnect") - .await?; - - if gap_count > 0 { - tracing::warn!( - "Reconnect catchup filled {} gaps from {}", - gap_count, - relay_url - ); - } else { - tracing::debug!("Reconnect catchup completed for {} (no gaps)", relay_url); - } - - Ok(gap_count) - } - - /// Run daily catchup for a relay - /// - /// Performs full reconciliation and stores any missing events. - /// This is called once per day per relay (with stagger). - /// - /// Returns the count of gap events filled. - pub async fn run_daily_catchup( - &self, - relay_url: &str, - remote_domain: &str, - ) -> Result> { - tracing::info!("Starting daily catchup for {}", relay_url); - - // Run full catchup (no time restriction) - let gap_count = self - .run_catchup(relay_url, remote_domain, None, "daily") - .await?; - - // Update last daily catchup time - { - let mut last_catchup = self.last_daily_catchup.write().await; - last_catchup.insert(relay_url.to_string(), Instant::now()); - } - - if gap_count > 0 { - tracing::warn!( - "Daily catchup filled {} gaps from {}", - gap_count, - relay_url - ); - } else { - tracing::info!("Daily catchup completed for {} (no gaps)", relay_url); - } - - Ok(gap_count) - } - - /// Core catchup implementation - /// - /// Fetches events from relay matching sync filters, compares with local database, - /// validates through policy, and stores missing events. - /// - /// # Arguments - /// * `relay_url` - URL of the relay to fetch from - /// * `remote_domain` - Domain of the remote relay (for filter building) - /// * `since` - Optional timestamp to filter events (for reconnect catchup) - /// * `catchup_type` - Type of catchup for logging ("startup", "reconnect", "daily") - async fn run_catchup( - &self, - relay_url: &str, - remote_domain: &str, - since: Option, - catchup_type: &str, - ) -> Result> { - // Create a client for fetching events - let client = Client::default(); - client.add_relay(relay_url).await?; - client.connect().await; - - let mut gap_count = 0; - - // Build filters (same as live sync uses) - let mut all_filters = Vec::new(); - - // Layer 1: Announcement discovery - let layer1_filters = self.filter_service.get_layer1_filters(); - all_filters.extend(layer1_filters); - - // Layer 2: Repository events - let layer2_filters = self.filter_service.get_layer2_filters(remote_domain).await; - all_filters.extend(layer2_filters); - - // Layer 3: Related events - let layer3_filters = self.filter_service.get_layer3_filters().await; - all_filters.extend(layer3_filters); - - // Apply "since" filter if specified (for reconnect catchup) - let filters: Vec = if let Some(since_ts) = since { - all_filters - .into_iter() - .map(|f| f.since(since_ts)) - .collect() - } else { - all_filters - }; - - if filters.is_empty() { - tracing::debug!("No filters for {} catchup on {}", catchup_type, relay_url); - client.disconnect().await; - return Ok(0); - } - - tracing::debug!( - "Running {} catchup on {} with {} filters", - catchup_type, - relay_url, - filters.len() - ); - - // Fetch events for each filter - for filter in filters { - match client - .fetch_events(filter, Duration::from_secs(CATCHUP_FETCH_TIMEOUT_SECS)) - .await - { - Ok(events) => { - for event in events.into_iter() { - // Check if event already exists in local database - if self.event_exists_locally(&event).await { - continue; - } - - // Validate through write policy - let result = self - .write_policy - .admit_event(&event, &SYNC_SOURCE_ADDR) - .await; - - match result { - PolicyResult::Accept => { - // Log gap event at WARN level to distinguish from live events - tracing::warn!( - "Gap event filled via {} catchup: {} (kind {})", - catchup_type, - event.id.to_hex(), - event.kind.as_u16() - ); - - // Store the event - if let Err(e) = self.database.save_event(&event).await { - tracing::error!( - "Failed to store gap event {}: {}", - event.id.to_hex(), - e - ); - } else { - gap_count += 1; - } - } - PolicyResult::Reject(reason) => { - tracing::debug!( - "Gap event {} rejected by policy: {}", - event.id.to_hex(), - reason - ); - } - } - } - } - Err(e) => { - tracing::warn!( - "Failed to fetch events for {} catchup from {}: {}", - catchup_type, - relay_url, - e - ); - } - } - } - - client.disconnect().await; - - Ok(gap_count) - } - - /// Check if an event already exists in the local database - async fn event_exists_locally(&self, event: &Event) -> bool { - // Query for the specific event by ID - let filter = Filter::new().id(event.id); - - match self.database.query(filter).await { - Ok(events) => !events.is_empty(), - Err(e) => { - tracing::warn!( - "Failed to check if event {} exists locally: {}", - event.id.to_hex(), - e - ); - // Assume it doesn't exist to avoid skipping events on error - false - } - } - } - - /// Mark startup catchup as completed (for testing) - #[cfg(test)] - pub async fn mark_startup_completed(&self) { - let mut completed = self.startup_catchup_completed.write().await; - *completed = true; - } - - /// Reset startup catchup status (for testing) - #[cfg(test)] - pub async fn reset_startup_status(&self) { - let mut completed = self.startup_catchup_completed.write().await; - *completed = false; - } -} - -/// Create a shared NegentropyService wrapped in Arc -pub fn create_negentropy_service( - database: SharedDatabase, - filter_service: Arc, - write_policy: Nip34WritePolicy, - config: &Config, -) -> Arc { - Arc::new(NegentropyService::new( - database, - filter_service, - write_policy, - config, - )) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_default_constants() { - assert_eq!(DEFAULT_STARTUP_DELAY_SECS, 30); - assert_eq!(DEFAULT_RECONNECT_DELAY_SECS, 10); - assert_eq!(DEFAULT_RECONNECT_LOOKBACK_DAYS, 3); - assert_eq!(DAILY_CATCHUP_INTERVAL_SECS, 86400); - assert_eq!(RELAY_STAGGER_SECS, 300); - } - - #[test] - fn test_reconnect_lookback_calculation() { - // 3 days = 3 * 24 * 60 * 60 = 259,200 seconds - let lookback_days: u64 = 3; - let lookback_secs = lookback_days * 24 * 60 * 60; - assert_eq!(lookback_secs, 259200); - } - - #[test] - fn test_stagger_delay_is_5_minutes() { - assert_eq!(RELAY_STAGGER_SECS, 300); // 5 * 60 = 300 - } -} \ No newline at end of file diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs deleted file mode 100644 index bbeaa2a..0000000 --- a/src/sync/subscription.rs +++ /dev/null @@ -1,229 +0,0 @@ -//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions -//! -//! Manages dynamic subscription updates per connection, including: -//! - Tracking subscribed announcements and events -//! - Adding new subscriptions when announcements/PRs arrive -//! - Consolidating filters when count exceeds threshold -//! - Preventing duplicate subscriptions -//! -//! ## Dynamic Subscription Strategy -//! -//! Initial: Layer 1 (announcements) -//! ↓ (announcement received) -//! Add: Layer 2 (events for that repo) -//! ↓ (PR/Issue received) -//! Add: Layer 3 (events for that PR/Issue) -//! ↓ (filter count > 150) -//! Consolidate: Back to Layer 1 only - -use std::collections::HashSet; -use std::sync::Arc; - -use nostr_sdk::prelude::*; - -use super::filter::FilterService; - -/// Maximum number of filters before consolidation is triggered -const CONSOLIDATION_THRESHOLD: usize = 150; - -/// Manages subscriptions for a single relay connection -/// -/// Tracks which announcements and events have been subscribed to, -/// and handles dynamic subscription updates as new events arrive. -#[derive(Debug)] -pub struct SubscriptionManager { - /// Event IDs of announcements we've subscribed to (for Layer 2) - subscribed_announcements: HashSet, - /// Event IDs of PRs/Issues we've subscribed to (for Layer 3) - subscribed_events: HashSet, - /// Whether we've consolidated back to Layer 1 only - is_consolidated: bool, - /// FilterService for building filters - filter_service: Arc, - /// Remote relay domain for Layer 2 filters - remote_domain: String, -} - -impl SubscriptionManager { - /// Create a new SubscriptionManager - /// - /// # Arguments - /// * `filter_service` - FilterService for building subscription filters - /// * `remote_domain` - The domain of the remote relay we're syncing from - pub fn new(filter_service: Arc, remote_domain: String) -> Self { - Self { - subscribed_announcements: HashSet::new(), - subscribed_events: HashSet::new(), - is_consolidated: false, - filter_service, - remote_domain, - } - } - - /// Add an announcement and return new filters to subscribe to - /// - /// When a new announcement (kind 30617/30618) arrives, this creates - /// Layer 2 filters to subscribe to events for that repository. - /// - /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed. - pub fn add_announcement(&mut self, event: &Event) -> Option> { - let event_id = event.id.to_hex(); - - // Check if already subscribed or consolidated - if self.is_consolidated || self.subscribed_announcements.contains(&event_id) { - return None; - } - - // Add to tracked announcements - self.subscribed_announcements.insert(event_id); - - // Build Layer 2 filters for this announcement - // Layer 2 filters target events with 'a' tags pointing to this repo - let filters = self.build_layer2_filter_for_announcement(event); - - if filters.is_empty() { - None - } else { - Some(filters) - } - } - - /// Add a PR/Issue/Patch event and return new filters to subscribe to - /// - /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives, - /// this creates Layer 3 filters to subscribe to related events. - /// - /// Returns `Some(filters)` if this is a new event, `None` if already subscribed. - pub fn add_event(&mut self, event: &Event) -> Option> { - let event_id = event.id.to_hex(); - - // Check if already subscribed or consolidated - if self.is_consolidated || self.subscribed_events.contains(&event_id) { - return None; - } - - // Add to tracked events - self.subscribed_events.insert(event_id.clone()); - - // Build Layer 3 filter for this event - // Layer 3 filters target events with 'e' tags pointing to this event - let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id); - - Some(vec![filter]) - } - - /// Check if consolidation is needed - /// - /// Returns true if the total filter count exceeds the threshold (150). - pub fn should_consolidate(&self) -> bool { - !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD - } - - /// Consolidate all subscriptions back to Layer 1 only - /// - /// Clears all tracked announcements and events, marks as consolidated, - /// and returns the Layer 1 filters to re-subscribe to. - pub fn consolidate(&mut self) -> Vec { - tracing::info!( - "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only", - self.subscribed_announcements.len(), - self.subscribed_events.len() - ); - - // Clear tracked subscriptions - self.subscribed_announcements.clear(); - self.subscribed_events.clear(); - self.is_consolidated = true; - - // Return Layer 1 filters - self.filter_service.get_layer1_filters() - } - - /// Get the total count of active filters - /// - /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3), - /// plus the base Layer 1 filter count. - pub fn get_filter_count(&self) -> usize { - if self.is_consolidated { - // When consolidated, we only have Layer 1 filters - 1 - } else { - // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events) - 1 + self.subscribed_announcements.len() + self.subscribed_events.len() - } - } - - /// Check if an announcement has been subscribed to - pub fn has_announcement(&self, event_id: &str) -> bool { - self.subscribed_announcements.contains(event_id) - } - - /// Check if an event has been subscribed to - pub fn has_event(&self, event_id: &str) -> bool { - self.subscribed_events.contains(event_id) - } - - /// Check if subscriptions have been consolidated - pub fn is_consolidated(&self) -> bool { - self.is_consolidated - } - - /// Get the count of subscribed announcements - pub fn announcement_count(&self) -> usize { - self.subscribed_announcements.len() - } - - /// Get the count of subscribed events - pub fn event_count(&self) -> usize { - self.subscribed_events.len() - } - - /// Build Layer 2 filter for a specific announcement event - /// - /// Creates a filter with an 'a' tag pointing to the announcement's coordinates. - fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec { - // Extract the d tag (identifier) from the event - let identifier = event.tags.iter().find_map(|tag| { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].clone()) - } else { - None - } - }); - - let identifier = match identifier { - Some(id) => id, - None => { - tracing::warn!( - "Announcement {} has no 'd' tag, cannot build Layer 2 filter", - event.id.to_hex() - ); - return Vec::new(); - } - }; - - // Verify this is an announcement kind - if !matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) { - tracing::warn!( - "Event {} is not an announcement (kind {}), cannot build Layer 2 filter", - event.id.to_hex(), - event.kind - ); - return Vec::new(); - } - - // Build the addressable coordinate: kind:pubkey:identifier - let coord = format!( - "{}:{}:{}", - event.kind.as_u16(), - event.pubkey.to_hex(), - identifier - ); - - // Create filter with 'a' tag for this coordinate - let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord); - - vec![filter] - } -} -- cgit v1.2.3