From bf558b0dc17e14f96eea624ea5591315a2909154 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 17:49:05 +0000 Subject: feat(sync): Phase 2 - multi-relay and complete filters - Add relay discovery from stored announcements - Implement FilterService with three-layer strategy - Support multiple simultaneous relay connections - Filter batching for large tag sets --- src/sync/connection.rs | 148 ++++++++++++++++--- src/sync/filter.rs | 391 +++++++++++++++++++++++++++++++++++++++++++++++++ src/sync/manager.rs | 187 +++++++++++++++++++++-- src/sync/mod.rs | 8 + 4 files changed, 695 insertions(+), 39 deletions(-) create mode 100644 src/sync/filter.rs (limited to 'src/sync') diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 4a79128..76cc8e8 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -1,14 +1,22 @@ //! WebSocket connection handling for sync //! -//! Manages the connection to a source relay, subscribes to kind 30617 events, -//! and passes them through validation. +//! Manages the connection to a source relay, subscribes to events using +//! the three-layer filter strategy, and passes them through validation. +//! +//! ## Phase 2 Features +//! +//! - Three-layer filter subscriptions: +//! 1. Layer 1: kinds 30617 + 30618 (announcements) +//! 2. Layer 2: A/a tags for repository events +//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) +use std::sync::Arc; use std::time::Duration; use nostr_sdk::prelude::*; use tokio::sync::mpsc; -use super::KIND_REPOSITORY_STATE; +use super::filter::FilterService; /// Event received from the sync connection #[derive(Debug, Clone)] @@ -21,11 +29,17 @@ pub struct SyncedEvent { pub struct SyncConnection { url: String, client: Client, + filter_service: Arc, + remote_domain: String, } impl SyncConnection { /// Create a new sync connection to the given relay URL - pub async fn new(url: &str) -> Result> { + pub async fn new( + url: &str, + filter_service: Arc, + remote_domain: &str, + ) -> Result> { let client = Client::default(); // Add the relay @@ -39,31 +53,78 @@ impl SyncConnection { Ok(Self { url: url.to_string(), client, + filter_service, + remote_domain: remote_domain.to_string(), }) } /// Start receiving events and send them through the channel /// - /// This method runs indefinitely, reconnecting as needed. + /// This method runs indefinitely, handling events from all three filter layers. pub async fn run(self, tx: mpsc::Sender) { - // Create filter for kind 30617 (repository state) events - let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); - - // Subscribe to events - match self.client.subscribe(filter, None).await { - Ok(output) => { - tracing::info!( - "Subscribed to kind {} events on {} (subscription: {})", - KIND_REPOSITORY_STATE, - self.url, - output.id() - ); + // Subscribe to all three filter layers + + // Layer 1: Announcement discovery (kinds 30617 + 30618) + let layer1_filters = self.filter_service.get_layer1_filters(); + for filter in &layer1_filters { + match self.client.subscribe(filter.clone(), None).await { + Ok(output) => { + tracing::info!( + "Subscribed to Layer 1 (announcements) on {} (subscription: {})", + self.url, + output.id() + ); + } + Err(e) => { + tracing::error!("Failed to subscribe Layer 1 on {}: {}", self.url, e); + } } - Err(e) => { - tracing::error!("Failed to subscribe on {}: {}", self.url, e); - return; + } + + // Layer 2: Repository events (A/a tags) + let layer2_filters = self + .filter_service + .get_layer2_filters(&self.remote_domain) + .await; + for filter in &layer2_filters { + match self.client.subscribe(filter.clone(), None).await { + Ok(output) => { + tracing::info!( + "Subscribed to Layer 2 (repo events) on {} (subscription: {})", + self.url, + output.id() + ); + } + Err(e) => { + tracing::error!("Failed to subscribe Layer 2 on {}: {}", self.url, e); + } + } + } + + // Layer 3: Related events (E/e tags) + let layer3_filters = self.filter_service.get_layer3_filters().await; + for filter in &layer3_filters { + match self.client.subscribe(filter.clone(), None).await { + Ok(output) => { + tracing::info!( + "Subscribed to Layer 3 (related events) on {} (subscription: {})", + self.url, + output.id() + ); + } + Err(e) => { + tracing::error!("Failed to subscribe Layer 3 on {}: {}", self.url, e); + } } - }; + } + + tracing::info!( + "Sync subscriptions active on {} (L1: {}, L2: {}, L3: {})", + self.url, + layer1_filters.len(), + layer2_filters.len(), + layer3_filters.len() + ); // Handle incoming notifications let url = self.url.clone(); @@ -106,19 +167,29 @@ impl SyncConnection { .await .ok(); } - } /// Reconnect loop with exponential backoff +/// +/// # Arguments +/// * `url` - The relay URL to connect to +/// * `tx` - Channel sender for synced events +/// * `filter_service` - FilterService for building subscriptions +/// * `our_domain` - Our relay's domain (used to extract remote domain) pub async fn connect_with_retry( url: &str, tx: mpsc::Sender, + filter_service: Arc, + _our_domain: &str, ) { let mut backoff = Duration::from_secs(1); let max_backoff = Duration::from_secs(60); + // Extract remote domain from URL + let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); + loop { - match SyncConnection::new(url).await { + match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { Ok(conn) => { backoff = Duration::from_secs(1); // Reset backoff on successful connection conn.run(tx.clone()).await; @@ -140,4 +211,35 @@ pub async fn connect_with_retry( // Exponential backoff backoff = std::cmp::min(backoff * 2, max_backoff); } +} + +/// Extract domain from a URL +fn extract_domain_from_url(url: &str) -> Option { + let url = url + .trim_start_matches("ws://") + .trim_start_matches("wss://") + .trim_start_matches("http://") + .trim_start_matches("https://"); + + // Remove path + let domain = url.split('/').next()?; + + Some(domain.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_domain() { + assert_eq!( + extract_domain_from_url("ws://127.0.0.1:8080"), + Some("127.0.0.1:8080".to_string()) + ); + assert_eq!( + extract_domain_from_url("wss://relay.example.com/path"), + Some("relay.example.com".to_string()) + ); + } } \ No newline at end of file diff --git a/src/sync/filter.rs b/src/sync/filter.rs new file mode 100644 index 0000000..7168f72 --- /dev/null +++ b/src/sync/filter.rs @@ -0,0 +1,391 @@ +//! Filter Service for GRASP-02 Proactive Sync +//! +//! Implements the three-layer filter strategy for comprehensive event syncing: +//! - Layer 1: Announcement discovery (kinds 30617 + 30618) +//! - Layer 2: Repository events (A/a tags pointing to shared repos) +//! - Layer 3: Related events (E/e tags pointing to Layer 2 events) + +use std::collections::HashSet; + +use nostr_sdk::prelude::*; + +use crate::nostr::builder::SharedDatabase; +use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT; + +/// Maximum number of tags per filter to stay within relay limits +const MAX_TAGS_PER_FILTER: usize = 100; + +/// Kind for maintainer metadata (NIP-34) +const KIND_MAINTAINER_LIST: u16 = 30618; + +/// FilterService builds subscription filters for proactive sync +/// +/// Uses a three-layer strategy: +/// 1. Layer 1: Discover new repository announcements and maintainer metadata +/// 2. Layer 2: Sync events directly related to repositories we track +/// 3. Layer 3: Sync discussions and updates related to Layer 2 events +pub struct FilterService { + database: SharedDatabase, + /// Our relay's domain for filtering + relay_domain: String, +} + +impl FilterService { + /// Create a new FilterService + /// + /// # Arguments + /// * `database` - Shared database for querying stored events + /// * `relay_domain` - Our relay's domain (used for filtering shared repos) + pub fn new(database: SharedDatabase, relay_domain: String) -> Self { + Self { + database, + relay_domain, + } + } + + /// Get Layer 1 filters for announcement discovery + /// + /// Returns filters for kinds 30617 (repository announcements) and 30618 (maintainer metadata) + pub fn get_layer1_filters(&self) -> Vec { + vec![Filter::new().kinds(vec![ + Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), + Kind::Custom(KIND_MAINTAINER_LIST), + ])] + } + + /// Get Layer 2 filters for repository-related events + /// + /// Queries the database for kind 30617 events and builds filters for events + /// with `a` tags pointing to repositories that reference both: + /// - Our relay (from clone tags) + /// - Are stored in our database (meaning they're relevant to us) + /// + /// # Arguments + /// * `remote_relay_domain` - The domain of the remote relay we're syncing from + pub async fn get_layer2_filters(&self, remote_relay_domain: &str) -> Vec { + // Query all kind 30617 events from our database + let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)); + + let events = match self.database.query(filter).await { + Ok(events) => events, + Err(e) => { + tracing::warn!("Failed to query announcements for Layer 2 filters: {}", e); + return Vec::new(); + } + }; + + // Build a set of addressable coordinates for repos that list both relays + let mut coords: Vec = Vec::new(); + + for event in events { + // Check if this repo lists our domain in clone tags + let has_our_relay = event.tags.iter().any(|tag| { + let tag_vec = tag.clone().to_vec(); + tag_vec.len() >= 2 + && (tag_vec[0] == "clone" || tag_vec[0] == "relays") + && tag_vec.iter().any(|v| v.contains(&self.relay_domain)) + }); + + // Check if this repo lists the remote relay in clone/relays tags + let has_remote_relay = event.tags.iter().any(|tag| { + let tag_vec = tag.clone().to_vec(); + tag_vec.len() >= 2 + && (tag_vec[0] == "clone" || tag_vec[0] == "relays") + && tag_vec.iter().any(|v| v.contains(remote_relay_domain)) + }); + + if has_our_relay || has_remote_relay { + // Extract the d tag (identifier) + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].clone()) + } else { + None + } + }) { + // Build the addressable coordinate: kind:pubkey:identifier + let coord = format!( + "{}:{}:{}", + KIND_REPOSITORY_ANNOUNCEMENT, + event.pubkey.to_hex(), + identifier + ); + coords.push(coord); + } + } + } + + if coords.is_empty() { + return Vec::new(); + } + + // Batch coordinates into filters + Self::batch_filters_with_a_tags(coords) + } + + /// Get Layer 3 filters for related events + /// + /// Queries the database for events with `a` tags (PRs, Issues, etc.) + /// and builds filters for events that reference them with `e` tags. + pub async fn get_layer3_filters(&self) -> Vec { + // Query events that reference repositories (have 'a' tags with 30617) + // These are typically PRs (1618), Issues (1621), etc. + + // First, get all kind 30617 announcements + let announcement_filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)); + + let announcements = match self.database.query(announcement_filter).await { + Ok(events) => events, + Err(e) => { + tracing::warn!( + "Failed to query announcements for Layer 3 filters: {}", + e + ); + return Vec::new(); + } + }; + + // Build a set of event IDs from PRs, Issues, etc. that reference our repos + let mut event_ids: Vec = Vec::new(); + + // Get the set of valid repository coordinates + let repo_coords: HashSet = announcements + .iter() + .filter_map(|e| { + e.tags.iter().find_map(|tag| { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(format!( + "{}:{}:{}", + KIND_REPOSITORY_ANNOUNCEMENT, + e.pubkey.to_hex(), + tag_vec[1] + )) + } else { + None + } + }) + }) + .collect(); + + if repo_coords.is_empty() { + return Vec::new(); + } + + // Query for PR events (1618) and other related events + // that have 'a' tags pointing to our repositories + let related_filter = Filter::new().kinds(vec![ + Kind::Custom(1618), // PR + Kind::Custom(1619), // PR Update + Kind::Custom(1621), // Issue + Kind::Custom(1622), // Reply + Kind::Custom(1630), // Status (open) + Kind::Custom(1631), // Status (applied) + Kind::Custom(1632), // Status (closed) + Kind::Custom(1633), // Status (draft) + ]); + + let related_events = match self.database.query(related_filter).await { + Ok(events) => events, + Err(e) => { + tracing::warn!("Failed to query related events for Layer 3 filters: {}", e); + return Vec::new(); + } + }; + + // Collect event IDs that reference our repositories + for event in related_events { + // Check if this event has an 'a' tag pointing to one of our repos + let references_our_repo = event.tags.iter().any(|tag| { + let tag_vec = tag.clone().to_vec(); + tag_vec.len() >= 2 && tag_vec[0] == "a" && repo_coords.contains(&tag_vec[1]) + }); + + if references_our_repo { + event_ids.push(event.id.to_hex()); + } + } + + if event_ids.is_empty() { + return Vec::new(); + } + + // Batch event IDs into filters with 'e' tags + Self::batch_filters_with_e_tags(event_ids) + } + + /// Batch a list of addressable coordinates into filters with 'a' tags + /// + /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. + fn batch_filters_with_a_tags(coords: Vec) -> Vec { + if coords.is_empty() { + return Vec::new(); + } + + coords + .chunks(MAX_TAGS_PER_FILTER) + .map(|chunk| { + let mut filter = Filter::new(); + for coord in chunk { + filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord.clone()); + } + filter + }) + .collect() + } + + /// Batch a list of event IDs into filters with 'e' tags + /// + /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. + fn batch_filters_with_e_tags(event_ids: Vec) -> Vec { + if event_ids.is_empty() { + return Vec::new(); + } + + event_ids + .chunks(MAX_TAGS_PER_FILTER) + .map(|chunk| { + let mut filter = Filter::new(); + for event_id in chunk { + filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.clone()); + } + filter + }) + .collect() + } + + /// Discover relay URLs from stored kind 30617 announcements + /// + /// Extracts unique relay URLs from `clone` and `relays` tags, + /// excluding our own relay domain. + pub async fn discover_relay_urls(&self) -> Vec { + let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)); + + let events = match self.database.query(filter).await { + Ok(events) => events, + Err(e) => { + tracing::warn!("Failed to query announcements for relay discovery: {}", e); + return Vec::new(); + } + }; + + let mut relay_urls: HashSet = HashSet::new(); + + for event in events { + for tag in event.tags.iter() { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() < 2 { + continue; + } + + // Extract URLs from clone and relays tags + if tag_vec[0] == "clone" || tag_vec[0] == "relays" { + for value in tag_vec.iter().skip(1) { + // Check if it looks like a URL + if value.starts_with("ws://") + || value.starts_with("wss://") + || value.starts_with("http://") + || value.starts_with("https://") + { + // Exclude our own relay + if !value.contains(&self.relay_domain) { + relay_urls.insert(value.clone()); + } + } + } + } + } + } + + relay_urls.into_iter().collect() + } + + /// Extract relay URLs from a specific event's clone tags + /// + /// Returns URLs that are not our own relay. + pub fn extract_relay_urls_from_event(&self, event: &Event) -> Vec { + let mut urls = Vec::new(); + + for tag in event.tags.iter() { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() < 2 { + continue; + } + + if tag_vec[0] == "clone" || tag_vec[0] == "relays" { + for value in tag_vec.iter().skip(1) { + if value.starts_with("ws://") + || value.starts_with("wss://") + || value.starts_with("http://") + || value.starts_with("https://") + { + if !value.contains(&self.relay_domain) { + urls.push(value.clone()); + } + } + } + } + } + + urls + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_batch_filters_with_a_tags_empty() { + let filters = FilterService::batch_filters_with_a_tags(vec![]); + assert!(filters.is_empty()); + } + + #[test] + fn test_batch_filters_with_a_tags_small() { + let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()]; + let filters = FilterService::batch_filters_with_a_tags(coords); + assert_eq!(filters.len(), 1); + } + + #[test] + fn test_batch_filters_with_a_tags_large() { + // Create 250 coordinates to test batching + let coords: Vec = (0..250) + .map(|i| format!("30617:pubkey{}:repo{}", i, i)) + .collect(); + + let filters = FilterService::batch_filters_with_a_tags(coords); + assert_eq!(filters.len(), 3); // 100 + 100 + 50 + } + + #[test] + fn test_batch_filters_with_e_tags_empty() { + let filters = FilterService::batch_filters_with_e_tags(vec![]); + assert!(filters.is_empty()); + } + + #[test] + fn test_batch_filters_with_e_tags_large() { + // Create 150 event IDs to test batching + let event_ids: Vec = (0..150).map(|i| format!("eventid{:064}", i)).collect(); + + let filters = FilterService::batch_filters_with_e_tags(event_ids); + assert_eq!(filters.len(), 2); // 100 + 50 + } + + #[test] + fn test_layer1_filters() { + // Create a mock database - we'll use a memory database for testing + // This test just verifies the filter structure + let filter = Filter::new().kinds(vec![ + Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), + Kind::Custom(KIND_MAINTAINER_LIST), + ]); + + // Verify the filter has the correct kinds + // Note: We can't easily inspect Filter internals, but we can ensure it compiles + assert!(!filter.is_empty()); + } +} \ No newline at end of file diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 8c883f5..8f6a9bd 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -1,19 +1,32 @@ //! SyncManager - Coordinates proactive sync operations //! -//! The SyncManager spawns connections to configured relays, receives events, -//! validates them through the write policy, and stores accepted events. +//! The SyncManager discovers relays from stored announcements, spawns connections +//! to each relay, receives events, validates them through the write policy, +//! and stores accepted events. +//! +//! ## Phase 2 Features +//! +//! - Relay discovery from stored kind 30617 announcements +//! - Multiple simultaneous relay connections +//! - Three-layer filter strategy via FilterService + +use std::collections::HashSet; +use std::sync::Arc; use nostr_relay_builder::prelude::*; use tokio::sync::mpsc; use super::connection::{connect_with_retry, SyncedEvent}; +use super::filter::FilterService; use super::SYNC_SOURCE_ADDR; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; -/// Coordinates proactive sync from configured relays +/// Coordinates proactive sync from configured and discovered relays pub struct SyncManager { - /// URL of the relay to sync from - sync_relay_url: String, + /// Initial relay URL to sync from (from config) + initial_relay_url: Option, + /// Our relay's domain (for filtering) + relay_domain: String, /// Database for storing accepted events database: SharedDatabase, /// Write policy for validating events @@ -22,13 +35,37 @@ pub struct SyncManager { impl SyncManager { /// Create a new SyncManager + /// + /// # Arguments + /// * `initial_relay_url` - Optional initial relay URL from config + /// * `relay_domain` - Our relay's domain (used to exclude self from sync) + /// * `database` - Shared database for storing events and querying announcements + /// * `write_policy` - Write policy for validating synced events pub fn new( + initial_relay_url: Option, + relay_domain: String, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + ) -> Self { + Self { + initial_relay_url, + relay_domain, + database, + write_policy, + } + } + + /// Create a SyncManager with a single relay URL (Phase 1 compatibility) + pub fn with_single_relay( sync_relay_url: String, database: SharedDatabase, write_policy: Nip34WritePolicy, ) -> Self { + // Extract domain from URL for filtering + let relay_domain = extract_domain_from_url(&sync_relay_url).unwrap_or_default(); Self { - sync_relay_url, + initial_relay_url: Some(sync_relay_url), + relay_domain, database, write_policy, } @@ -36,28 +73,94 @@ impl SyncManager { /// Run the sync manager /// - /// This spawns a connection task and processes incoming events. - /// Runs indefinitely until the task is cancelled. + /// This discovers relays from stored announcements, spawns connection tasks, + /// and processes incoming events. Runs indefinitely until cancelled. pub async fn run(self) { - tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); + tracing::info!( + "Starting SyncManager (domain: {}, initial relay: {:?})", + self.relay_domain, + self.initial_relay_url + ); + + // Create the filter service + let filter_service = Arc::new(FilterService::new( + self.database.clone(), + self.relay_domain.clone(), + )); - // Create channel for receiving events from connection + // Create channel for receiving events from all connections let (tx, mut rx) = mpsc::channel::(100); - // Spawn connection task with auto-reconnect - let url = self.sync_relay_url.clone(); - tokio::spawn(async move { - connect_with_retry(&url, tx).await; - }); + // Track active relay URLs to avoid duplicates + let mut active_relays: HashSet = HashSet::new(); + + // Start with initial relay if configured + if let Some(ref url) = self.initial_relay_url { + if !self.is_own_relay(url) { + tracing::info!("Connecting to initial sync relay: {}", url); + active_relays.insert(url.clone()); + self.spawn_connection(url.clone(), tx.clone(), filter_service.clone()); + } else { + tracing::info!("Skipping initial relay (is our own relay): {}", url); + } + } + + // Discover additional relays from stored announcements + let discovered_urls = filter_service.discover_relay_urls().await; + for url in discovered_urls { + if !active_relays.contains(&url) && !self.is_own_relay(&url) { + tracing::info!("Connecting to discovered relay: {}", url); + active_relays.insert(url.clone()); + self.spawn_connection(url, tx.clone(), filter_service.clone()); + } + } + + if active_relays.is_empty() { + tracing::warn!("No sync relays configured or discovered, SyncManager idle"); + } else { + tracing::info!( + "SyncManager connected to {} relays: {:?}", + active_relays.len(), + active_relays + ); + } - // Process incoming events + // Process incoming events from all connections while let Some(synced_event) = rx.recv().await { + // Check if this event reveals new relays to sync from + let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event); + for url in new_urls { + if !active_relays.contains(&url) && !self.is_own_relay(&url) { + tracing::info!("Discovered new relay from event, connecting: {}", url); + active_relays.insert(url.clone()); + self.spawn_connection(url, tx.clone(), filter_service.clone()); + } + } + self.process_event(synced_event).await; } tracing::warn!("SyncManager event channel closed, shutting down"); } + /// Check if a URL points to our own relay + fn is_own_relay(&self, url: &str) -> bool { + url.contains(&self.relay_domain) + } + + /// Spawn a connection task for a relay + fn spawn_connection( + &self, + url: String, + tx: mpsc::Sender, + filter_service: Arc, + ) { + let domain = self.relay_domain.clone(); + tokio::spawn(async move { + connect_with_retry(&url, tx, filter_service, &domain).await; + }); + } + /// Process a single synced event async fn process_event(&self, synced_event: SyncedEvent) { let event = &synced_event.event; @@ -98,4 +201,56 @@ impl SyncManager { } } } +} + +/// Extract domain from a WebSocket URL +/// +/// Examples: +/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" +/// - "wss://relay.example.com" -> "relay.example.com" +fn extract_domain_from_url(url: &str) -> Option { + let url = url.trim_start_matches("ws://").trim_start_matches("wss://"); + let url = url.trim_start_matches("http://").trim_start_matches("https://"); + + // Remove path + let domain = url.split('/').next()?; + + Some(domain.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_domain_ws() { + assert_eq!( + extract_domain_from_url("ws://127.0.0.1:8080"), + Some("127.0.0.1:8080".to_string()) + ); + } + + #[test] + fn test_extract_domain_wss() { + assert_eq!( + extract_domain_from_url("wss://relay.example.com"), + Some("relay.example.com".to_string()) + ); + } + + #[test] + fn test_extract_domain_with_path() { + assert_eq!( + extract_domain_from_url("ws://example.com/path"), + Some("example.com".to_string()) + ); + } + + #[test] + fn test_extract_domain_http() { + assert_eq!( + extract_domain_from_url("http://example.com:3000"), + Some("example.com:3000".to_string()) + ); + } } \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 279471b..1155eaf 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -3,10 +3,18 @@ //! This module implements proactive synchronization of kind 30617 (repository state) //! events from configured relay(s). Events are validated through the same write policy //! as directly-submitted events. +//! +//! ## Three-Layer Filter Strategy (Phase 2) +//! +//! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) +//! - **Layer 2**: Repository events (A/a tags for shared repos) +//! - **Layer 3**: Related events (E/e tags for discussions, reviews) mod connection; +mod filter; mod manager; +pub use filter::FilterService; pub use manager::SyncManager; use std::net::SocketAddr; -- cgit v1.2.3