From 83ede29fb2ce563fe53ee4dc62334c03c67026cb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 5 Dec 2025 08:57:57 +0000 Subject: sync fixes --- src/nostr/builder.rs | 41 +------------ src/sync/filter.rs | 169 ++++++++++++++++++++++++++++++++++----------------- src/sync/manager.rs | 28 ++++++++- src/sync/mod.rs | 11 +--- 4 files changed, 145 insertions(+), 104 deletions(-) (limited to 'src') diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 732fd9d..2284c18 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -19,7 +19,6 @@ use crate::nostr::policy::{ AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, RelatedEventPolicy, ReferenceResult, StatePolicy, StateResult, }; -use crate::sync::SYNC_SOURCE_ADDR; /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; @@ -69,39 +68,9 @@ impl Nip34WritePolicy { } /// Handle repository announcement event - /// - /// # Arguments - /// * `event` - The announcement event to validate - /// * `from_sync` - Whether this event came from GRASP-02 sync (bypasses domain validation) - async fn handle_announcement(&self, event: &Event, from_sync: bool) -> PolicyResult { + async fn handle_announcement(&self, event: &Event) -> PolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); - // GRASP-02: Accept Layer 1 events from sync without domain validation - // This enables relay discovery chain - synced announcements are stored - // for relay URL extraction even if they don't list our domain - if from_sync { - // Still validate basic structure - match RepositoryAnnouncement::from_event(event.clone()) { - Ok(_announcement) => { - tracing::debug!( - "Accepted synced repository announcement: {} (domain validation bypassed)", - event_id_str - ); - // Don't create bare repository for external announcements - return PolicyResult::Accept; - } - Err(e) => { - tracing::warn!( - "Rejected malformed synced announcement {}: {}", - event_id_str, - e - ); - return PolicyResult::Reject(format!("Failed to parse announcement: {}", e)); - } - } - } - - // Normal validation path - requires domain to be listed match self.announcement_policy.validate(event).await { AnnouncementResult::Accept => { // Parse announcement to get repository details @@ -287,15 +256,11 @@ impl WritePolicy for Nip34WritePolicy { fn admit_event<'a>( &'a self, event: &'a nostr_relay_builder::prelude::Event, - addr: &'a SocketAddr, + _addr: &'a SocketAddr, ) -> BoxedFuture<'a, PolicyResult> { Box::pin(async move { - // GRASP-02: Detect sync source for Layer 1 domain validation bypass - // Synced events use SYNC_SOURCE_ADDR (127.0.0.2:0) to identify them - let from_sync = *addr == SYNC_SOURCE_ADDR; - match event.kind.as_u16() { - KIND_REPOSITORY_ANNOUNCEMENT => self.handle_announcement(event, from_sync).await, + KIND_REPOSITORY_ANNOUNCEMENT => self.handle_announcement(event).await, KIND_REPOSITORY_STATE => self.handle_state(event).await, KIND_PR | KIND_PR_UPDATE => self.handle_pr_event(event).await, _ => self.handle_related_event(event, "Event").await, diff --git a/src/sync/filter.rs b/src/sync/filter.rs index 56c531f..108c92a 100644 --- a/src/sync/filter.rs +++ b/src/sync/filter.rs @@ -95,7 +95,7 @@ impl FilterService { && tag_vec.iter().any(|v| v.contains(remote_relay_domain)) }); - if has_our_relay || has_remote_relay { + if has_our_relay && has_remote_relay { // Extract the d tag (identifier) if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); @@ -121,8 +121,8 @@ impl FilterService { return Vec::new(); } - // Batch coordinates into filters - Self::batch_filters_with_a_tags(coords) + // Batch coordinates into filters with A/a/q tags + Self::batch_layer2_filters(coords) } /// Get Layer 3 filters for related events @@ -139,10 +139,7 @@ impl FilterService { let announcements = match self.database.query(announcement_filter).await { Ok(events) => events, Err(e) => { - tracing::warn!( - "Failed to query announcements for Layer 3 filters: {}", - e - ); + tracing::warn!("Failed to query announcements for Layer 3 filters: {}", e); return Vec::new(); } }; @@ -174,20 +171,13 @@ impl FilterService { return Vec::new(); } - // Query for PR events (1618) and other related events - // that have 'a' tags pointing to our repositories - let related_filter = Filter::new().kinds(vec![ - Kind::Custom(1618), // PR - Kind::Custom(1619), // PR Update - Kind::Custom(1621), // Issue - Kind::Custom(1622), // Reply - Kind::Custom(1630), // Status (open) - Kind::Custom(1631), // Status (applied) - Kind::Custom(1632), // Status (closed) - Kind::Custom(1633), // Status (draft) + // Query for PR and Patch events from our repositories + let repos_pr_patch_filter = Filter::new().kinds(vec![ + Kind::Custom(1617), // Patch + Kind::Custom(1618), // PR ]); - let related_events = match self.database.query(related_filter).await { + let related_events = match self.database.query(repos_pr_patch_filter).await { Ok(events) => events, Err(e) => { tracing::warn!("Failed to query related events for Layer 3 filters: {}", e); @@ -212,52 +202,94 @@ impl FilterService { return Vec::new(); } - // Batch event IDs into filters with 'e' tags - Self::batch_filters_with_e_tags(event_ids) + // Batch event IDs into filters with 'E', 'e', and 'q' tags + Self::batch_layer3_filters(event_ids) } - /// Batch a list of addressable coordinates into filters with 'a' tags + /// Batch a list of addressable coordinates into Layer 2 filters with 'A', 'a', and 'q' tags /// - /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. - fn batch_filters_with_a_tags(coords: Vec) -> Vec { + /// Different Nostr clients use different tag conventions for referencing repository + /// announcements. This function generates THREE filters per chunk to capture all variants: + /// - Uppercase 'A' tags (used by some clients) + /// - Lowercase 'a' tags (standard addressable event reference) + /// - Lowercase 'q' tags (quote tags, used by some clients) + /// + /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets. + fn batch_layer2_filters(coords: Vec) -> Vec { if coords.is_empty() { return Vec::new(); } coords .chunks(MAX_TAGS_PER_FILTER) - .map(|chunk| { - let mut filter = Filter::new(); - for coord in chunk { - filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord.clone()); - } - filter + .flat_map(|chunk| { + // Create THREE filters per chunk - one for each tag type + vec![ + // Uppercase A tag filter + Filter::new().custom_tags( + SingleLetterTag::uppercase(Alphabet::A), + chunk.iter().cloned(), + ), + // Lowercase a tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::A), + chunk.iter().cloned(), + ), + // Quote q tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::Q), + chunk.iter().cloned(), + ), + ] }) .collect() } - /// Batch a list of event IDs into filters with 'e' tags + /// Batch a list of event IDs into Layer 3 filters with 'E', 'e', and 'q' tags + /// + /// Different Nostr clients use different tag conventions for referencing events. + /// This function generates THREE filters per chunk to capture all variants: + /// - Uppercase 'E' tags (used by some clients) + /// - Lowercase 'e' tags (standard event reference) + /// - Lowercase 'q' tags (quote tags, used by some clients) /// - /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. - fn batch_filters_with_e_tags(event_ids: Vec) -> Vec { + /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets. + fn batch_layer3_filters(event_ids: Vec) -> Vec { if event_ids.is_empty() { return Vec::new(); } event_ids .chunks(MAX_TAGS_PER_FILTER) - .map(|chunk| { - let mut filter = Filter::new(); - for event_id in chunk { - filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.clone()); - } - filter + .flat_map(|chunk| { + // Create THREE filters per chunk - one for each tag type + vec![ + // Uppercase E tag filter + Filter::new().custom_tags( + SingleLetterTag::uppercase(Alphabet::E), + chunk.iter().cloned(), + ), + // Lowercase e tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::E), + chunk.iter().cloned(), + ), + // Quote q tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::Q), + chunk.iter().cloned(), + ), + ] }) .collect() } /// Discover relay URLs from stored kind 30617 announcements /// + /// Only returns relay URLs from repositories that list **our** relay. + /// This ensures we only connect to relays where we have shared repos, + /// avoiding wasted connections with empty Layer 2 filters. + /// /// Extracts unique relay URLs from `clone` and `relays` tags, /// excluding our own relay domain. pub async fn discover_relay_urls(&self) -> Vec { @@ -274,6 +306,22 @@ impl FilterService { let mut relay_urls: HashSet = HashSet::new(); for event in events { + // First check: Does this repo list our relay? + // Only process repos that reference us - otherwise we'd connect to relays + // where we have no shared repos, resulting in empty Layer 2 filters. + let has_our_relay = event.tags.iter().any(|tag| { + let tag_vec = tag.clone().to_vec(); + tag_vec.len() >= 2 + && (tag_vec[0] == "clone" || tag_vec[0] == "relays") + && tag_vec.iter().any(|v| v.contains(&self.relay_domain)) + }); + + if !has_our_relay { + // Skip repos that don't list our relay - no shared repos possible + continue; + } + + // Extract relay URLs from repos that list us for tag in event.tags.iter() { let tag_vec = tag.clone().to_vec(); if tag_vec.len() < 2 { @@ -338,42 +386,53 @@ mod tests { use super::*; #[test] - fn test_batch_filters_with_a_tags_empty() { - let filters = FilterService::batch_filters_with_a_tags(vec![]); + fn test_batch_layer2_filters_empty() { + let filters = FilterService::batch_layer2_filters(vec![]); assert!(filters.is_empty()); } #[test] - fn test_batch_filters_with_a_tags_small() { + fn test_batch_layer2_filters_small() { let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()]; - let filters = FilterService::batch_filters_with_a_tags(coords); - assert_eq!(filters.len(), 1); + let filters = FilterService::batch_layer2_filters(coords); + // 1 chunk × 3 tag types (A, a, q) = 3 filters + assert_eq!(filters.len(), 3); } #[test] - fn test_batch_filters_with_a_tags_large() { + fn test_batch_layer2_filters_large() { // Create 250 coordinates to test batching let coords: Vec = (0..250) .map(|i| format!("30617:pubkey{}:repo{}", i, i)) .collect(); - let filters = FilterService::batch_filters_with_a_tags(coords); - assert_eq!(filters.len(), 3); // 100 + 100 + 50 + let filters = FilterService::batch_layer2_filters(coords); + // 3 chunks (100 + 100 + 50) × 3 tag types (A, a, q) = 9 filters + assert_eq!(filters.len(), 9); } #[test] - fn test_batch_filters_with_e_tags_empty() { - let filters = FilterService::batch_filters_with_e_tags(vec![]); + fn test_batch_layer3_filters_empty() { + let filters = FilterService::batch_layer3_filters(vec![]); assert!(filters.is_empty()); } #[test] - fn test_batch_filters_with_e_tags_large() { - // Create 150 event IDs to test batching - let event_ids: Vec = (0..150).map(|i| format!("eventid{:064}", i)).collect(); + fn test_batch_layer3_filters_small() { + let event_ids = vec!["eventid1".to_string(), "eventid2".to_string()]; + let filters = FilterService::batch_layer3_filters(event_ids); + // 1 chunk × 3 tag types (E, e, q) = 3 filters + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_batch_layer3_filters_large() { + // Create 250 event IDs to test batching + let event_ids: Vec = (0..250).map(|i| format!("eventid{:064}", i)).collect(); - let filters = FilterService::batch_filters_with_e_tags(event_ids); - assert_eq!(filters.len(), 2); // 100 + 50 + let filters = FilterService::batch_layer3_filters(event_ids); + // 3 chunks (100 + 100 + 50) × 3 tag types (E, e, q) = 9 filters + assert_eq!(filters.len(), 9); } #[test] @@ -389,4 +448,4 @@ mod tests { // Note: We can't easily inspect Filter internals, but we can ensure it compiles assert!(!filter.is_empty()); } -} \ No newline at end of file +} diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 3bc190d..97ea81a 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -25,6 +25,7 @@ //! - Consolidation when filter count exceeds 150 use std::collections::HashSet; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; @@ -36,13 +37,29 @@ use super::connection::{connect_with_retry, SyncedEvent}; use super::filter::FilterService; use super::health::RelayHealthTracker; use super::metrics::SyncMetrics; -use super::SYNC_SOURCE_ADDR; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; /// Maximum startup jitter in milliseconds (10 seconds) const MAX_STARTUP_JITTER_MS: u64 = 10_000; +/// Default fallback address for sync source when bind_address cannot be parsed +/// +/// This distinguishes synced events from directly-submitted events in logs and metrics. +/// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker. +pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + +/// Derive sync source address from config bind_address +/// +/// Parses the bind_address string and returns a SocketAddr. +/// Falls back to 127.0.0.1:8080 if parsing fails. +fn get_sync_source_addr(bind_address: &str) -> SocketAddr { + bind_address + .parse() + .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) +} + /// Coordinates proactive sync from configured and discovered relays pub struct SyncManager { /// Initial relay URL to sync from (from config) @@ -57,6 +74,8 @@ pub struct SyncManager { health_tracker: Arc, /// Sync metrics for Prometheus metrics: Option, + /// Source address for synced events (derived from config.bind_address) + sync_source_addr: SocketAddr, } impl SyncManager { @@ -82,6 +101,7 @@ impl SyncManager { write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), metrics: None, + sync_source_addr: get_sync_source_addr(&config.bind_address), } } @@ -109,6 +129,7 @@ impl SyncManager { write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), metrics: Some(metrics), + sync_source_addr: get_sync_source_addr(&config.bind_address), } } @@ -127,6 +148,7 @@ impl SyncManager { write_policy, health_tracker: Arc::new(RelayHealthTracker::with_defaults()), metrics: None, + sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, } } @@ -320,8 +342,8 @@ impl SyncManager { _ => {} } - // Validate through write policy using SYNC_SOURCE_ADDR - let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; + // Validate through write policy using sync_source_addr derived from config + let result = self.write_policy.admit_event(event, &self.sync_source_addr).await; match result { PolicyResult::Accept => { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 67d389e..17418d0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -32,14 +32,9 @@ pub use metrics::SyncMetrics; pub use negentropy::NegentropyService; pub use subscription::SubscriptionManager; -use std::net::SocketAddr; - -/// Synthetic source address used for synced events -/// -/// This distinguishes synced events from directly-submitted events in logs and metrics. -/// Uses 127.0.0.2:0 as a recognizable "synced event" marker. -pub const SYNC_SOURCE_ADDR: SocketAddr = - SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0); +// Re-export default sync source address for backward compatibility with modules like negentropy.rs +// Manager.rs derives sync_source_addr from config.bind_address at runtime +pub use manager::DEFAULT_SYNC_SOURCE_ADDR as SYNC_SOURCE_ADDR; /// Kind for repository state events (NIP-34) pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file -- cgit v1.2.3