upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/filter.rs169
-rw-r--r--src/sync/manager.rs28
-rw-r--r--src/sync/mod.rs11
3 files changed, 142 insertions, 66 deletions
diff --git a/src/sync/filter.rs b/src/sync/filter.rs
index 56c531f..108c92a 100644
--- a/src/sync/filter.rs
+++ b/src/sync/filter.rs
@@ -95,7 +95,7 @@ impl FilterService {
95 && tag_vec.iter().any(|v| v.contains(remote_relay_domain)) 95 && tag_vec.iter().any(|v| v.contains(remote_relay_domain))
96 }); 96 });
97 97
98 if has_our_relay || has_remote_relay { 98 if has_our_relay && has_remote_relay {
99 // Extract the d tag (identifier) 99 // Extract the d tag (identifier)
100 if let Some(identifier) = event.tags.iter().find_map(|tag| { 100 if let Some(identifier) = event.tags.iter().find_map(|tag| {
101 let tag_vec = tag.clone().to_vec(); 101 let tag_vec = tag.clone().to_vec();
@@ -121,8 +121,8 @@ impl FilterService {
121 return Vec::new(); 121 return Vec::new();
122 } 122 }
123 123
124 // Batch coordinates into filters 124 // Batch coordinates into filters with A/a/q tags
125 Self::batch_filters_with_a_tags(coords) 125 Self::batch_layer2_filters(coords)
126 } 126 }
127 127
128 /// Get Layer 3 filters for related events 128 /// Get Layer 3 filters for related events
@@ -139,10 +139,7 @@ impl FilterService {
139 let announcements = match self.database.query(announcement_filter).await { 139 let announcements = match self.database.query(announcement_filter).await {
140 Ok(events) => events, 140 Ok(events) => events,
141 Err(e) => { 141 Err(e) => {
142 tracing::warn!( 142 tracing::warn!("Failed to query announcements for Layer 3 filters: {}", e);
143 "Failed to query announcements for Layer 3 filters: {}",
144 e
145 );
146 return Vec::new(); 143 return Vec::new();
147 } 144 }
148 }; 145 };
@@ -174,20 +171,13 @@ impl FilterService {
174 return Vec::new(); 171 return Vec::new();
175 } 172 }
176 173
177 // Query for PR events (1618) and other related events 174 // Query for PR and Patch events from our repositories
178 // that have 'a' tags pointing to our repositories 175 let repos_pr_patch_filter = Filter::new().kinds(vec![
179 let related_filter = Filter::new().kinds(vec![ 176 Kind::Custom(1617), // Patch
180 Kind::Custom(1618), // PR 177 Kind::Custom(1618), // PR
181 Kind::Custom(1619), // PR Update
182 Kind::Custom(1621), // Issue
183 Kind::Custom(1622), // Reply
184 Kind::Custom(1630), // Status (open)
185 Kind::Custom(1631), // Status (applied)
186 Kind::Custom(1632), // Status (closed)
187 Kind::Custom(1633), // Status (draft)
188 ]); 178 ]);
189 179
190 let related_events = match self.database.query(related_filter).await { 180 let related_events = match self.database.query(repos_pr_patch_filter).await {
191 Ok(events) => events, 181 Ok(events) => events,
192 Err(e) => { 182 Err(e) => {
193 tracing::warn!("Failed to query related events for Layer 3 filters: {}", e); 183 tracing::warn!("Failed to query related events for Layer 3 filters: {}", e);
@@ -212,52 +202,94 @@ impl FilterService {
212 return Vec::new(); 202 return Vec::new();
213 } 203 }
214 204
215 // Batch event IDs into filters with 'e' tags 205 // Batch event IDs into filters with 'E', 'e', and 'q' tags
216 Self::batch_filters_with_e_tags(event_ids) 206 Self::batch_layer3_filters(event_ids)
217 } 207 }
218 208
219 /// Batch a list of addressable coordinates into filters with 'a' tags 209 /// Batch a list of addressable coordinates into Layer 2 filters with 'A', 'a', and 'q' tags
220 /// 210 ///
221 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. 211 /// Different Nostr clients use different tag conventions for referencing repository
222 fn batch_filters_with_a_tags(coords: Vec<String>) -> Vec<Filter> { 212 /// announcements. This function generates THREE filters per chunk to capture all variants:
213 /// - Uppercase 'A' tags (used by some clients)
214 /// - Lowercase 'a' tags (standard addressable event reference)
215 /// - Lowercase 'q' tags (quote tags, used by some clients)
216 ///
217 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets.
218 fn batch_layer2_filters(coords: Vec<String>) -> Vec<Filter> {
223 if coords.is_empty() { 219 if coords.is_empty() {
224 return Vec::new(); 220 return Vec::new();
225 } 221 }
226 222
227 coords 223 coords
228 .chunks(MAX_TAGS_PER_FILTER) 224 .chunks(MAX_TAGS_PER_FILTER)
229 .map(|chunk| { 225 .flat_map(|chunk| {
230 let mut filter = Filter::new(); 226 // Create THREE filters per chunk - one for each tag type
231 for coord in chunk { 227 vec![
232 filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord.clone()); 228 // Uppercase A tag filter
233 } 229 Filter::new().custom_tags(
234 filter 230 SingleLetterTag::uppercase(Alphabet::A),
231 chunk.iter().cloned(),
232 ),
233 // Lowercase a tag filter
234 Filter::new().custom_tags(
235 SingleLetterTag::lowercase(Alphabet::A),
236 chunk.iter().cloned(),
237 ),
238 // Quote q tag filter
239 Filter::new().custom_tags(
240 SingleLetterTag::lowercase(Alphabet::Q),
241 chunk.iter().cloned(),
242 ),
243 ]
235 }) 244 })
236 .collect() 245 .collect()
237 } 246 }
238 247
239 /// Batch a list of event IDs into filters with 'e' tags 248 /// Batch a list of event IDs into Layer 3 filters with 'E', 'e', and 'q' tags
249 ///
250 /// Different Nostr clients use different tag conventions for referencing events.
251 /// This function generates THREE filters per chunk to capture all variants:
252 /// - Uppercase 'E' tags (used by some clients)
253 /// - Lowercase 'e' tags (standard event reference)
254 /// - Lowercase 'q' tags (quote tags, used by some clients)
240 /// 255 ///
241 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters. 256 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets.
242 fn batch_filters_with_e_tags(event_ids: Vec<String>) -> Vec<Filter> { 257 fn batch_layer3_filters(event_ids: Vec<String>) -> Vec<Filter> {
243 if event_ids.is_empty() { 258 if event_ids.is_empty() {
244 return Vec::new(); 259 return Vec::new();
245 } 260 }
246 261
247 event_ids 262 event_ids
248 .chunks(MAX_TAGS_PER_FILTER) 263 .chunks(MAX_TAGS_PER_FILTER)
249 .map(|chunk| { 264 .flat_map(|chunk| {
250 let mut filter = Filter::new(); 265 // Create THREE filters per chunk - one for each tag type
251 for event_id in chunk { 266 vec![
252 filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.clone()); 267 // Uppercase E tag filter
253 } 268 Filter::new().custom_tags(
254 filter 269 SingleLetterTag::uppercase(Alphabet::E),
270 chunk.iter().cloned(),
271 ),
272 // Lowercase e tag filter
273 Filter::new().custom_tags(
274 SingleLetterTag::lowercase(Alphabet::E),
275 chunk.iter().cloned(),
276 ),
277 // Quote q tag filter
278 Filter::new().custom_tags(
279 SingleLetterTag::lowercase(Alphabet::Q),
280 chunk.iter().cloned(),
281 ),
282 ]
255 }) 283 })
256 .collect() 284 .collect()
257 } 285 }
258 286
259 /// Discover relay URLs from stored kind 30617 announcements 287 /// Discover relay URLs from stored kind 30617 announcements
260 /// 288 ///
289 /// Only returns relay URLs from repositories that list **our** relay.
290 /// This ensures we only connect to relays where we have shared repos,
291 /// avoiding wasted connections with empty Layer 2 filters.
292 ///
261 /// Extracts unique relay URLs from `clone` and `relays` tags, 293 /// Extracts unique relay URLs from `clone` and `relays` tags,
262 /// excluding our own relay domain. 294 /// excluding our own relay domain.
263 pub async fn discover_relay_urls(&self) -> Vec<String> { 295 pub async fn discover_relay_urls(&self) -> Vec<String> {
@@ -274,6 +306,22 @@ impl FilterService {
274 let mut relay_urls: HashSet<String> = HashSet::new(); 306 let mut relay_urls: HashSet<String> = HashSet::new();
275 307
276 for event in events { 308 for event in events {
309 // First check: Does this repo list our relay?
310 // Only process repos that reference us - otherwise we'd connect to relays
311 // where we have no shared repos, resulting in empty Layer 2 filters.
312 let has_our_relay = event.tags.iter().any(|tag| {
313 let tag_vec = tag.clone().to_vec();
314 tag_vec.len() >= 2
315 && (tag_vec[0] == "clone" || tag_vec[0] == "relays")
316 && tag_vec.iter().any(|v| v.contains(&self.relay_domain))
317 });
318
319 if !has_our_relay {
320 // Skip repos that don't list our relay - no shared repos possible
321 continue;
322 }
323
324 // Extract relay URLs from repos that list us
277 for tag in event.tags.iter() { 325 for tag in event.tags.iter() {
278 let tag_vec = tag.clone().to_vec(); 326 let tag_vec = tag.clone().to_vec();
279 if tag_vec.len() < 2 { 327 if tag_vec.len() < 2 {
@@ -338,42 +386,53 @@ mod tests {
338 use super::*; 386 use super::*;
339 387
340 #[test] 388 #[test]
341 fn test_batch_filters_with_a_tags_empty() { 389 fn test_batch_layer2_filters_empty() {
342 let filters = FilterService::batch_filters_with_a_tags(vec![]); 390 let filters = FilterService::batch_layer2_filters(vec![]);
343 assert!(filters.is_empty()); 391 assert!(filters.is_empty());
344 } 392 }
345 393
346 #[test] 394 #[test]
347 fn test_batch_filters_with_a_tags_small() { 395 fn test_batch_layer2_filters_small() {
348 let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()]; 396 let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()];
349 let filters = FilterService::batch_filters_with_a_tags(coords); 397 let filters = FilterService::batch_layer2_filters(coords);
350 assert_eq!(filters.len(), 1); 398 // 1 chunk × 3 tag types (A, a, q) = 3 filters
399 assert_eq!(filters.len(), 3);
351 } 400 }
352 401
353 #[test] 402 #[test]
354 fn test_batch_filters_with_a_tags_large() { 403 fn test_batch_layer2_filters_large() {
355 // Create 250 coordinates to test batching 404 // Create 250 coordinates to test batching
356 let coords: Vec<String> = (0..250) 405 let coords: Vec<String> = (0..250)
357 .map(|i| format!("30617:pubkey{}:repo{}", i, i)) 406 .map(|i| format!("30617:pubkey{}:repo{}", i, i))
358 .collect(); 407 .collect();
359 408
360 let filters = FilterService::batch_filters_with_a_tags(coords); 409 let filters = FilterService::batch_layer2_filters(coords);
361 assert_eq!(filters.len(), 3); // 100 + 100 + 50 410 // 3 chunks (100 + 100 + 50) × 3 tag types (A, a, q) = 9 filters
411 assert_eq!(filters.len(), 9);
362 } 412 }
363 413
364 #[test] 414 #[test]
365 fn test_batch_filters_with_e_tags_empty() { 415 fn test_batch_layer3_filters_empty() {
366 let filters = FilterService::batch_filters_with_e_tags(vec![]); 416 let filters = FilterService::batch_layer3_filters(vec![]);
367 assert!(filters.is_empty()); 417 assert!(filters.is_empty());
368 } 418 }
369 419
370 #[test] 420 #[test]
371 fn test_batch_filters_with_e_tags_large() { 421 fn test_batch_layer3_filters_small() {
372 // Create 150 event IDs to test batching 422 let event_ids = vec!["eventid1".to_string(), "eventid2".to_string()];
373 let event_ids: Vec<String> = (0..150).map(|i| format!("eventid{:064}", i)).collect(); 423 let filters = FilterService::batch_layer3_filters(event_ids);
424 // 1 chunk × 3 tag types (E, e, q) = 3 filters
425 assert_eq!(filters.len(), 3);
426 }
427
428 #[test]
429 fn test_batch_layer3_filters_large() {
430 // Create 250 event IDs to test batching
431 let event_ids: Vec<String> = (0..250).map(|i| format!("eventid{:064}", i)).collect();
374 432
375 let filters = FilterService::batch_filters_with_e_tags(event_ids); 433 let filters = FilterService::batch_layer3_filters(event_ids);
376 assert_eq!(filters.len(), 2); // 100 + 50 434 // 3 chunks (100 + 100 + 50) × 3 tag types (E, e, q) = 9 filters
435 assert_eq!(filters.len(), 9);
377 } 436 }
378 437
379 #[test] 438 #[test]
@@ -389,4 +448,4 @@ mod tests {
389 // Note: We can't easily inspect Filter internals, but we can ensure it compiles 448 // Note: We can't easily inspect Filter internals, but we can ensure it compiles
390 assert!(!filter.is_empty()); 449 assert!(!filter.is_empty());
391 } 450 }
392} \ No newline at end of file 451}
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 3bc190d..97ea81a 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -25,6 +25,7 @@
25//! - Consolidation when filter count exceeds 150 25//! - Consolidation when filter count exceeds 150
26 26
27use std::collections::HashSet; 27use std::collections::HashSet;
28use std::net::{IpAddr, Ipv4Addr, SocketAddr};
28use std::sync::Arc; 29use std::sync::Arc;
29use std::time::Duration; 30use std::time::Duration;
30 31
@@ -36,13 +37,29 @@ use super::connection::{connect_with_retry, SyncedEvent};
36use super::filter::FilterService; 37use super::filter::FilterService;
37use super::health::RelayHealthTracker; 38use super::health::RelayHealthTracker;
38use super::metrics::SyncMetrics; 39use super::metrics::SyncMetrics;
39use super::SYNC_SOURCE_ADDR;
40use crate::config::Config; 40use crate::config::Config;
41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
42 42
43/// Maximum startup jitter in milliseconds (10 seconds) 43/// Maximum startup jitter in milliseconds (10 seconds)
44const MAX_STARTUP_JITTER_MS: u64 = 10_000; 44const MAX_STARTUP_JITTER_MS: u64 = 10_000;
45 45
46/// Default fallback address for sync source when bind_address cannot be parsed
47///
48/// This distinguishes synced events from directly-submitted events in logs and metrics.
49/// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker.
50pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr =
51 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
52
53/// Derive sync source address from config bind_address
54///
55/// Parses the bind_address string and returns a SocketAddr.
56/// Falls back to 127.0.0.1:8080 if parsing fails.
57fn get_sync_source_addr(bind_address: &str) -> SocketAddr {
58 bind_address
59 .parse()
60 .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR)
61}
62
46/// Coordinates proactive sync from configured and discovered relays 63/// Coordinates proactive sync from configured and discovered relays
47pub struct SyncManager { 64pub struct SyncManager {
48 /// Initial relay URL to sync from (from config) 65 /// Initial relay URL to sync from (from config)
@@ -57,6 +74,8 @@ pub struct SyncManager {
57 health_tracker: Arc<RelayHealthTracker>, 74 health_tracker: Arc<RelayHealthTracker>,
58 /// Sync metrics for Prometheus 75 /// Sync metrics for Prometheus
59 metrics: Option<SyncMetrics>, 76 metrics: Option<SyncMetrics>,
77 /// Source address for synced events (derived from config.bind_address)
78 sync_source_addr: SocketAddr,
60} 79}
61 80
62impl SyncManager { 81impl SyncManager {
@@ -82,6 +101,7 @@ impl SyncManager {
82 write_policy, 101 write_policy,
83 health_tracker: Arc::new(RelayHealthTracker::new(config)), 102 health_tracker: Arc::new(RelayHealthTracker::new(config)),
84 metrics: None, 103 metrics: None,
104 sync_source_addr: get_sync_source_addr(&config.bind_address),
85 } 105 }
86 } 106 }
87 107
@@ -109,6 +129,7 @@ impl SyncManager {
109 write_policy, 129 write_policy,
110 health_tracker: Arc::new(RelayHealthTracker::new(config)), 130 health_tracker: Arc::new(RelayHealthTracker::new(config)),
111 metrics: Some(metrics), 131 metrics: Some(metrics),
132 sync_source_addr: get_sync_source_addr(&config.bind_address),
112 } 133 }
113 } 134 }
114 135
@@ -127,6 +148,7 @@ impl SyncManager {
127 write_policy, 148 write_policy,
128 health_tracker: Arc::new(RelayHealthTracker::with_defaults()), 149 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
129 metrics: None, 150 metrics: None,
151 sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR,
130 } 152 }
131 } 153 }
132 154
@@ -320,8 +342,8 @@ impl SyncManager {
320 _ => {} 342 _ => {}
321 } 343 }
322 344
323 // Validate through write policy using SYNC_SOURCE_ADDR 345 // Validate through write policy using sync_source_addr derived from config
324 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; 346 let result = self.write_policy.admit_event(event, &self.sync_source_addr).await;
325 347
326 match result { 348 match result {
327 PolicyResult::Accept => { 349 PolicyResult::Accept => {
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 67d389e..17418d0 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -32,14 +32,9 @@ pub use metrics::SyncMetrics;
32pub use negentropy::NegentropyService; 32pub use negentropy::NegentropyService;
33pub use subscription::SubscriptionManager; 33pub use subscription::SubscriptionManager;
34 34
35use std::net::SocketAddr; 35// Re-export default sync source address for backward compatibility with modules like negentropy.rs
36 36// Manager.rs derives sync_source_addr from config.bind_address at runtime
37/// Synthetic source address used for synced events 37pub use manager::DEFAULT_SYNC_SOURCE_ADDR as SYNC_SOURCE_ADDR;
38///
39/// This distinguishes synced events from directly-submitted events in logs and metrics.
40/// Uses 127.0.0.2:0 as a recognizable "synced event" marker.
41pub const SYNC_SOURCE_ADDR: SocketAddr =
42 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0);
43 38
44/// Kind for repository state events (NIP-34) 39/// Kind for repository state events (NIP-34)
45pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file 40pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file