From 83ede29fb2ce563fe53ee4dc62334c03c67026cb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 5 Dec 2025 08:57:57 +0000 Subject: sync fixes --- src/sync/filter.rs | 169 +++++++++++++++++++++++++++++++++++----------------- src/sync/manager.rs | 28 ++++++++- src/sync/mod.rs | 11 +--- 3 files changed, 142 insertions(+), 66 deletions(-) (limited to 'src/sync') diff --git a/src/sync/filter.rs b/src/sync/filter.rs index 56c531f..108c92a 100644 --- a/src/sync/filter.rs +++ b/src/sync/filter.rs @@ -95,7 +95,7 @@ impl FilterService { && tag_vec.iter().any(|v| v.contains(remote_relay_domain)) }); - if has_our_relay || has_remote_relay { + if has_our_relay && has_remote_relay { // Extract the d tag (identifier) if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); @@ -121,8 +121,8 @@ impl FilterService { return Vec::new(); } - // Batch coordinates into filters - Self::batch_filters_with_a_tags(coords) + // Batch coordinates into filters with A/a/q tags + Self::batch_layer2_filters(coords) } /// Get Layer 3 filters for related events @@ -139,10 +139,7 @@ impl FilterService { let announcements = match self.database.query(announcement_filter).await { Ok(events) => events, Err(e) => { - tracing::warn!( - "Failed to query announcements for Layer 3 filters: {}", - e - ); + tracing::warn!("Failed to query announcements for Layer 3 filters: {}", e); return Vec::new(); } }; @@ -174,20 +171,13 @@ impl FilterService { return Vec::new(); } - // Query for PR events (1618) and other related events - // that have 'a' tags pointing to our repositories - let related_filter = Filter::new().kinds(vec![ - Kind::Custom(1618), // PR - Kind::Custom(1619), // PR Update - Kind::Custom(1621), // Issue - Kind::Custom(1622), // Reply - Kind::Custom(1630), // Status (open) - Kind::Custom(1631), // Status (applied) - Kind::Custom(1632), // Status (closed) - Kind::Custom(1633), // Status (draft) + // Query for PR and Patch events from our repositories + let repos_pr_patch_filter = Filter::new().kinds(vec![ + Kind::Custom(1617), // Patch + Kind::Custom(1618), // PR ]); - let related_events = match self.database.query(related_filter).await { + let related_events = match self.database.query(repos_pr_patch_filter).await { Ok(events) => events, Err(e) => { tracing::warn!("Failed to query related events for Layer 3 filters: {}", e); @@ -212,52 +202,94 @@ impl FilterService { return Vec::new(); } - // Batch event IDs into filters with 'e' tags - Self::batch_filters_with_e_tags(event_ids) + // Batch event IDs into filters with 'E', 'e', and 'q' tags + Self::batch_layer3_filters(event_ids) } - /// Batch a list of addressable coordinates into filters with 'a' tags + /// Batch a list of addressable coordinates into Layer 2 filters with 'A', 'a', and 'q' tags /// - /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. - fn batch_filters_with_a_tags(coords: Vec) -> Vec { + /// Different Nostr clients use different tag conventions for referencing repository + /// announcements. This function generates THREE filters per chunk to capture all variants: + /// - Uppercase 'A' tags (used by some clients) + /// - Lowercase 'a' tags (standard addressable event reference) + /// - Lowercase 'q' tags (quote tags, used by some clients) + /// + /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets. + fn batch_layer2_filters(coords: Vec) -> Vec { if coords.is_empty() { return Vec::new(); } coords .chunks(MAX_TAGS_PER_FILTER) - .map(|chunk| { - let mut filter = Filter::new(); - for coord in chunk { - filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord.clone()); - } - filter + .flat_map(|chunk| { + // Create THREE filters per chunk - one for each tag type + vec![ + // Uppercase A tag filter + Filter::new().custom_tags( + SingleLetterTag::uppercase(Alphabet::A), + chunk.iter().cloned(), + ), + // Lowercase a tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::A), + chunk.iter().cloned(), + ), + // Quote q tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::Q), + chunk.iter().cloned(), + ), + ] }) .collect() } - /// Batch a list of event IDs into filters with 'e' tags + /// Batch a list of event IDs into Layer 3 filters with 'E', 'e', and 'q' tags + /// + /// Different Nostr clients use different tag conventions for referencing events. + /// This function generates THREE filters per chunk to capture all variants: + /// - Uppercase 'E' tags (used by some clients) + /// - Lowercase 'e' tags (standard event reference) + /// - Lowercase 'q' tags (quote tags, used by some clients) /// - /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. - fn batch_filters_with_e_tags(event_ids: Vec) -> Vec { + /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets. + fn batch_layer3_filters(event_ids: Vec) -> Vec { if event_ids.is_empty() { return Vec::new(); } event_ids .chunks(MAX_TAGS_PER_FILTER) - .map(|chunk| { - let mut filter = Filter::new(); - for event_id in chunk { - filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.clone()); - } - filter + .flat_map(|chunk| { + // Create THREE filters per chunk - one for each tag type + vec![ + // Uppercase E tag filter + Filter::new().custom_tags( + SingleLetterTag::uppercase(Alphabet::E), + chunk.iter().cloned(), + ), + // Lowercase e tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::E), + chunk.iter().cloned(), + ), + // Quote q tag filter + Filter::new().custom_tags( + SingleLetterTag::lowercase(Alphabet::Q), + chunk.iter().cloned(), + ), + ] }) .collect() } /// Discover relay URLs from stored kind 30617 announcements /// + /// Only returns relay URLs from repositories that list **our** relay. + /// This ensures we only connect to relays where we have shared repos, + /// avoiding wasted connections with empty Layer 2 filters. + /// /// Extracts unique relay URLs from `clone` and `relays` tags, /// excluding our own relay domain. pub async fn discover_relay_urls(&self) -> Vec { @@ -274,6 +306,22 @@ impl FilterService { let mut relay_urls: HashSet = HashSet::new(); for event in events { + // First check: Does this repo list our relay? + // Only process repos that reference us - otherwise we'd connect to relays + // where we have no shared repos, resulting in empty Layer 2 filters. + let has_our_relay = event.tags.iter().any(|tag| { + let tag_vec = tag.clone().to_vec(); + tag_vec.len() >= 2 + && (tag_vec[0] == "clone" || tag_vec[0] == "relays") + && tag_vec.iter().any(|v| v.contains(&self.relay_domain)) + }); + + if !has_our_relay { + // Skip repos that don't list our relay - no shared repos possible + continue; + } + + // Extract relay URLs from repos that list us for tag in event.tags.iter() { let tag_vec = tag.clone().to_vec(); if tag_vec.len() < 2 { @@ -338,42 +386,53 @@ mod tests { use super::*; #[test] - fn test_batch_filters_with_a_tags_empty() { - let filters = FilterService::batch_filters_with_a_tags(vec![]); + fn test_batch_layer2_filters_empty() { + let filters = FilterService::batch_layer2_filters(vec![]); assert!(filters.is_empty()); } #[test] - fn test_batch_filters_with_a_tags_small() { + fn test_batch_layer2_filters_small() { let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()]; - let filters = FilterService::batch_filters_with_a_tags(coords); - assert_eq!(filters.len(), 1); + let filters = FilterService::batch_layer2_filters(coords); + // 1 chunk × 3 tag types (A, a, q) = 3 filters + assert_eq!(filters.len(), 3); } #[test] - fn test_batch_filters_with_a_tags_large() { + fn test_batch_layer2_filters_large() { // Create 250 coordinates to test batching let coords: Vec = (0..250) .map(|i| format!("30617:pubkey{}:repo{}", i, i)) .collect(); - let filters = FilterService::batch_filters_with_a_tags(coords); - assert_eq!(filters.len(), 3); // 100 + 100 + 50 + let filters = FilterService::batch_layer2_filters(coords); + // 3 chunks (100 + 100 + 50) × 3 tag types (A, a, q) = 9 filters + assert_eq!(filters.len(), 9); } #[test] - fn test_batch_filters_with_e_tags_empty() { - let filters = FilterService::batch_filters_with_e_tags(vec![]); + fn test_batch_layer3_filters_empty() { + let filters = FilterService::batch_layer3_filters(vec![]); assert!(filters.is_empty()); } #[test] - fn test_batch_filters_with_e_tags_large() { - // Create 150 event IDs to test batching - let event_ids: Vec = (0..150).map(|i| format!("eventid{:064}", i)).collect(); + fn test_batch_layer3_filters_small() { + let event_ids = vec!["eventid1".to_string(), "eventid2".to_string()]; + let filters = FilterService::batch_layer3_filters(event_ids); + // 1 chunk × 3 tag types (E, e, q) = 3 filters + assert_eq!(filters.len(), 3); + } + + #[test] + fn test_batch_layer3_filters_large() { + // Create 250 event IDs to test batching + let event_ids: Vec = (0..250).map(|i| format!("eventid{:064}", i)).collect(); - let filters = FilterService::batch_filters_with_e_tags(event_ids); - assert_eq!(filters.len(), 2); // 100 + 50 + let filters = FilterService::batch_layer3_filters(event_ids); + // 3 chunks (100 + 100 + 50) × 3 tag types (E, e, q) = 9 filters + assert_eq!(filters.len(), 9); } #[test] @@ -389,4 +448,4 @@ mod tests { // Note: We can't easily inspect Filter internals, but we can ensure it compiles assert!(!filter.is_empty()); } -} \ No newline at end of file +} diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 3bc190d..97ea81a 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -25,6 +25,7 @@ //! - Consolidation when filter count exceeds 150 use std::collections::HashSet; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; @@ -36,13 +37,29 @@ use super::connection::{connect_with_retry, SyncedEvent}; use super::filter::FilterService; use super::health::RelayHealthTracker; use super::metrics::SyncMetrics; -use super::SYNC_SOURCE_ADDR; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; /// Maximum startup jitter in milliseconds (10 seconds) const MAX_STARTUP_JITTER_MS: u64 = 10_000; +/// Default fallback address for sync source when bind_address cannot be parsed +/// +/// This distinguishes synced events from directly-submitted events in logs and metrics. +/// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker. +pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + +/// Derive sync source address from config bind_address +/// +/// Parses the bind_address string and returns a SocketAddr. +/// Falls back to 127.0.0.1:8080 if parsing fails. +fn get_sync_source_addr(bind_address: &str) -> SocketAddr { + bind_address + .parse() + .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) +} + /// Coordinates proactive sync from configured and discovered relays pub struct SyncManager { /// Initial relay URL to sync from (from config) @@ -57,6 +74,8 @@ pub struct SyncManager { health_tracker: Arc, /// Sync metrics for Prometheus metrics: Option, + /// Source address for synced events (derived from config.bind_address) + sync_source_addr: SocketAddr, } impl SyncManager { @@ -82,6 +101,7 @@ impl SyncManager { write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), metrics: None, + sync_source_addr: get_sync_source_addr(&config.bind_address), } } @@ -109,6 +129,7 @@ impl SyncManager { write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), metrics: Some(metrics), + sync_source_addr: get_sync_source_addr(&config.bind_address), } } @@ -127,6 +148,7 @@ impl SyncManager { write_policy, health_tracker: Arc::new(RelayHealthTracker::with_defaults()), metrics: None, + sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, } } @@ -320,8 +342,8 @@ impl SyncManager { _ => {} } - // Validate through write policy using SYNC_SOURCE_ADDR - let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; + // Validate through write policy using sync_source_addr derived from config + let result = self.write_policy.admit_event(event, &self.sync_source_addr).await; match result { PolicyResult::Accept => { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 67d389e..17418d0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -32,14 +32,9 @@ pub use metrics::SyncMetrics; pub use negentropy::NegentropyService; pub use subscription::SubscriptionManager; -use std::net::SocketAddr; - -/// Synthetic source address used for synced events -/// -/// This distinguishes synced events from directly-submitted events in logs and metrics. -/// Uses 127.0.0.2:0 as a recognizable "synced event" marker. -pub const SYNC_SOURCE_ADDR: SocketAddr = - SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0); +// Re-export default sync source address for backward compatibility with modules like negentropy.rs +// Manager.rs derives sync_source_addr from config.bind_address at runtime +pub use manager::DEFAULT_SYNC_SOURCE_ADDR as SYNC_SOURCE_ADDR; /// Kind for repository state events (NIP-34) pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file -- cgit v1.2.3