upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:49:05 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:49:05 +0000
commitbf558b0dc17e14f96eea624ea5591315a2909154 (patch)
treef36a9250ad329a933949c842414c3455e4679326 /src/sync
parentb167f1b2ae7edbcab95554b5203d22d9e372c8b5 (diff)
feat(sync): Phase 2 - multi-relay and complete filters
- Add relay discovery from stored announcements - Implement FilterService with three-layer strategy - Support multiple simultaneous relay connections - Filter batching for large tag sets
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/connection.rs148
-rw-r--r--src/sync/filter.rs391
-rw-r--r--src/sync/manager.rs187
-rw-r--r--src/sync/mod.rs8
4 files changed, 695 insertions, 39 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index 4a79128..76cc8e8 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -1,14 +1,22 @@
1//! WebSocket connection handling for sync 1//! WebSocket connection handling for sync
2//! 2//!
3//! Manages the connection to a source relay, subscribes to kind 30617 events, 3//! Manages the connection to a source relay, subscribes to events using
4//! and passes them through validation. 4//! the three-layer filter strategy, and passes them through validation.
5//!
6//! ## Phase 2 Features
7//!
8//! - Three-layer filter subscriptions:
9//! 1. Layer 1: kinds 30617 + 30618 (announcements)
10//! 2. Layer 2: A/a tags for repository events
11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.)
5 12
13use std::sync::Arc;
6use std::time::Duration; 14use std::time::Duration;
7 15
8use nostr_sdk::prelude::*; 16use nostr_sdk::prelude::*;
9use tokio::sync::mpsc; 17use tokio::sync::mpsc;
10 18
11use super::KIND_REPOSITORY_STATE; 19use super::filter::FilterService;
12 20
13/// Event received from the sync connection 21/// Event received from the sync connection
14#[derive(Debug, Clone)] 22#[derive(Debug, Clone)]
@@ -21,11 +29,17 @@ pub struct SyncedEvent {
21pub struct SyncConnection { 29pub struct SyncConnection {
22 url: String, 30 url: String,
23 client: Client, 31 client: Client,
32 filter_service: Arc<FilterService>,
33 remote_domain: String,
24} 34}
25 35
26impl SyncConnection { 36impl SyncConnection {
27 /// Create a new sync connection to the given relay URL 37 /// Create a new sync connection to the given relay URL
28 pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { 38 pub async fn new(
39 url: &str,
40 filter_service: Arc<FilterService>,
41 remote_domain: &str,
42 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
29 let client = Client::default(); 43 let client = Client::default();
30 44
31 // Add the relay 45 // Add the relay
@@ -39,31 +53,78 @@ impl SyncConnection {
39 Ok(Self { 53 Ok(Self {
40 url: url.to_string(), 54 url: url.to_string(),
41 client, 55 client,
56 filter_service,
57 remote_domain: remote_domain.to_string(),
42 }) 58 })
43 } 59 }
44 60
45 /// Start receiving events and send them through the channel 61 /// Start receiving events and send them through the channel
46 /// 62 ///
47 /// This method runs indefinitely, reconnecting as needed. 63 /// This method runs indefinitely, handling events from all three filter layers.
48 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { 64 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) {
49 // Create filter for kind 30617 (repository state) events 65 // Subscribe to all three filter layers
50 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); 66
51 67 // Layer 1: Announcement discovery (kinds 30617 + 30618)
52 // Subscribe to events 68 let layer1_filters = self.filter_service.get_layer1_filters();
53 match self.client.subscribe(filter, None).await { 69 for filter in &layer1_filters {
54 Ok(output) => { 70 match self.client.subscribe(filter.clone(), None).await {
55 tracing::info!( 71 Ok(output) => {
56 "Subscribed to kind {} events on {} (subscription: {})", 72 tracing::info!(
57 KIND_REPOSITORY_STATE, 73 "Subscribed to Layer 1 (announcements) on {} (subscription: {})",
58 self.url, 74 self.url,
59 output.id() 75 output.id()
60 ); 76 );
77 }
78 Err(e) => {
79 tracing::error!("Failed to subscribe Layer 1 on {}: {}", self.url, e);
80 }
61 } 81 }
62 Err(e) => { 82 }
63 tracing::error!("Failed to subscribe on {}: {}", self.url, e); 83
64 return; 84 // Layer 2: Repository events (A/a tags)
85 let layer2_filters = self
86 .filter_service
87 .get_layer2_filters(&self.remote_domain)
88 .await;
89 for filter in &layer2_filters {
90 match self.client.subscribe(filter.clone(), None).await {
91 Ok(output) => {
92 tracing::info!(
93 "Subscribed to Layer 2 (repo events) on {} (subscription: {})",
94 self.url,
95 output.id()
96 );
97 }
98 Err(e) => {
99 tracing::error!("Failed to subscribe Layer 2 on {}: {}", self.url, e);
100 }
101 }
102 }
103
104 // Layer 3: Related events (E/e tags)
105 let layer3_filters = self.filter_service.get_layer3_filters().await;
106 for filter in &layer3_filters {
107 match self.client.subscribe(filter.clone(), None).await {
108 Ok(output) => {
109 tracing::info!(
110 "Subscribed to Layer 3 (related events) on {} (subscription: {})",
111 self.url,
112 output.id()
113 );
114 }
115 Err(e) => {
116 tracing::error!("Failed to subscribe Layer 3 on {}: {}", self.url, e);
117 }
65 } 118 }
66 }; 119 }
120
121 tracing::info!(
122 "Sync subscriptions active on {} (L1: {}, L2: {}, L3: {})",
123 self.url,
124 layer1_filters.len(),
125 layer2_filters.len(),
126 layer3_filters.len()
127 );
67 128
68 // Handle incoming notifications 129 // Handle incoming notifications
69 let url = self.url.clone(); 130 let url = self.url.clone();
@@ -106,19 +167,29 @@ impl SyncConnection {
106 .await 167 .await
107 .ok(); 168 .ok();
108 } 169 }
109
110} 170}
111 171
112/// Reconnect loop with exponential backoff 172/// Reconnect loop with exponential backoff
173///
174/// # Arguments
175/// * `url` - The relay URL to connect to
176/// * `tx` - Channel sender for synced events
177/// * `filter_service` - FilterService for building subscriptions
178/// * `our_domain` - Our relay's domain (used to extract remote domain)
113pub async fn connect_with_retry( 179pub async fn connect_with_retry(
114 url: &str, 180 url: &str,
115 tx: mpsc::Sender<SyncedEvent>, 181 tx: mpsc::Sender<SyncedEvent>,
182 filter_service: Arc<FilterService>,
183 _our_domain: &str,
116) { 184) {
117 let mut backoff = Duration::from_secs(1); 185 let mut backoff = Duration::from_secs(1);
118 let max_backoff = Duration::from_secs(60); 186 let max_backoff = Duration::from_secs(60);
119 187
188 // Extract remote domain from URL
189 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
190
120 loop { 191 loop {
121 match SyncConnection::new(url).await { 192 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await {
122 Ok(conn) => { 193 Ok(conn) => {
123 backoff = Duration::from_secs(1); // Reset backoff on successful connection 194 backoff = Duration::from_secs(1); // Reset backoff on successful connection
124 conn.run(tx.clone()).await; 195 conn.run(tx.clone()).await;
@@ -140,4 +211,35 @@ pub async fn connect_with_retry(
140 // Exponential backoff 211 // Exponential backoff
141 backoff = std::cmp::min(backoff * 2, max_backoff); 212 backoff = std::cmp::min(backoff * 2, max_backoff);
142 } 213 }
214}
215
216/// Extract domain from a URL
217fn extract_domain_from_url(url: &str) -> Option<String> {
218 let url = url
219 .trim_start_matches("ws://")
220 .trim_start_matches("wss://")
221 .trim_start_matches("http://")
222 .trim_start_matches("https://");
223
224 // Remove path
225 let domain = url.split('/').next()?;
226
227 Some(domain.to_string())
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_extract_domain() {
236 assert_eq!(
237 extract_domain_from_url("ws://127.0.0.1:8080"),
238 Some("127.0.0.1:8080".to_string())
239 );
240 assert_eq!(
241 extract_domain_from_url("wss://relay.example.com/path"),
242 Some("relay.example.com".to_string())
243 );
244 }
143} \ No newline at end of file 245} \ No newline at end of file
diff --git a/src/sync/filter.rs b/src/sync/filter.rs
new file mode 100644
index 0000000..7168f72
--- /dev/null
+++ b/src/sync/filter.rs
@@ -0,0 +1,391 @@
1//! Filter Service for GRASP-02 Proactive Sync
2//!
3//! Implements the three-layer filter strategy for comprehensive event syncing:
4//! - Layer 1: Announcement discovery (kinds 30617 + 30618)
5//! - Layer 2: Repository events (A/a tags pointing to shared repos)
6//! - Layer 3: Related events (E/e tags pointing to Layer 2 events)
7
8use std::collections::HashSet;
9
10use nostr_sdk::prelude::*;
11
12use crate::nostr::builder::SharedDatabase;
13use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT;
14
15/// Maximum number of tags per filter to stay within relay limits
16const MAX_TAGS_PER_FILTER: usize = 100;
17
18/// Kind for maintainer metadata (NIP-34)
19const KIND_MAINTAINER_LIST: u16 = 30618;
20
21/// FilterService builds subscription filters for proactive sync
22///
23/// Uses a three-layer strategy:
24/// 1. Layer 1: Discover new repository announcements and maintainer metadata
25/// 2. Layer 2: Sync events directly related to repositories we track
26/// 3. Layer 3: Sync discussions and updates related to Layer 2 events
27pub struct FilterService {
28 database: SharedDatabase,
29 /// Our relay's domain for filtering
30 relay_domain: String,
31}
32
33impl FilterService {
34 /// Create a new FilterService
35 ///
36 /// # Arguments
37 /// * `database` - Shared database for querying stored events
38 /// * `relay_domain` - Our relay's domain (used for filtering shared repos)
39 pub fn new(database: SharedDatabase, relay_domain: String) -> Self {
40 Self {
41 database,
42 relay_domain,
43 }
44 }
45
46 /// Get Layer 1 filters for announcement discovery
47 ///
48 /// Returns filters for kinds 30617 (repository announcements) and 30618 (maintainer metadata)
49 pub fn get_layer1_filters(&self) -> Vec<Filter> {
50 vec![Filter::new().kinds(vec![
51 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT),
52 Kind::Custom(KIND_MAINTAINER_LIST),
53 ])]
54 }
55
56 /// Get Layer 2 filters for repository-related events
57 ///
58 /// Queries the database for kind 30617 events and builds filters for events
59 /// with `a` tags pointing to repositories that reference both:
60 /// - Our relay (from clone tags)
61 /// - Are stored in our database (meaning they're relevant to us)
62 ///
63 /// # Arguments
64 /// * `remote_relay_domain` - The domain of the remote relay we're syncing from
65 pub async fn get_layer2_filters(&self, remote_relay_domain: &str) -> Vec<Filter> {
66 // Query all kind 30617 events from our database
67 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT));
68
69 let events = match self.database.query(filter).await {
70 Ok(events) => events,
71 Err(e) => {
72 tracing::warn!("Failed to query announcements for Layer 2 filters: {}", e);
73 return Vec::new();
74 }
75 };
76
77 // Build a set of addressable coordinates for repos that list both relays
78 let mut coords: Vec<String> = Vec::new();
79
80 for event in events {
81 // Check if this repo lists our domain in clone tags
82 let has_our_relay = event.tags.iter().any(|tag| {
83 let tag_vec = tag.clone().to_vec();
84 tag_vec.len() >= 2
85 && (tag_vec[0] == "clone" || tag_vec[0] == "relays")
86 && tag_vec.iter().any(|v| v.contains(&self.relay_domain))
87 });
88
89 // Check if this repo lists the remote relay in clone/relays tags
90 let has_remote_relay = event.tags.iter().any(|tag| {
91 let tag_vec = tag.clone().to_vec();
92 tag_vec.len() >= 2
93 && (tag_vec[0] == "clone" || tag_vec[0] == "relays")
94 && tag_vec.iter().any(|v| v.contains(remote_relay_domain))
95 });
96
97 if has_our_relay || has_remote_relay {
98 // Extract the d tag (identifier)
99 if let Some(identifier) = event.tags.iter().find_map(|tag| {
100 let tag_vec = tag.clone().to_vec();
101 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
102 Some(tag_vec[1].clone())
103 } else {
104 None
105 }
106 }) {
107 // Build the addressable coordinate: kind:pubkey:identifier
108 let coord = format!(
109 "{}:{}:{}",
110 KIND_REPOSITORY_ANNOUNCEMENT,
111 event.pubkey.to_hex(),
112 identifier
113 );
114 coords.push(coord);
115 }
116 }
117 }
118
119 if coords.is_empty() {
120 return Vec::new();
121 }
122
123 // Batch coordinates into filters
124 Self::batch_filters_with_a_tags(coords)
125 }
126
127 /// Get Layer 3 filters for related events
128 ///
129 /// Queries the database for events with `a` tags (PRs, Issues, etc.)
130 /// and builds filters for events that reference them with `e` tags.
131 pub async fn get_layer3_filters(&self) -> Vec<Filter> {
132 // Query events that reference repositories (have 'a' tags with 30617)
133 // These are typically PRs (1618), Issues (1621), etc.
134
135 // First, get all kind 30617 announcements
136 let announcement_filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT));
137
138 let announcements = match self.database.query(announcement_filter).await {
139 Ok(events) => events,
140 Err(e) => {
141 tracing::warn!(
142 "Failed to query announcements for Layer 3 filters: {}",
143 e
144 );
145 return Vec::new();
146 }
147 };
148
149 // Build a set of event IDs from PRs, Issues, etc. that reference our repos
150 let mut event_ids: Vec<String> = Vec::new();
151
152 // Get the set of valid repository coordinates
153 let repo_coords: HashSet<String> = announcements
154 .iter()
155 .filter_map(|e| {
156 e.tags.iter().find_map(|tag| {
157 let tag_vec = tag.clone().to_vec();
158 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
159 Some(format!(
160 "{}:{}:{}",
161 KIND_REPOSITORY_ANNOUNCEMENT,
162 e.pubkey.to_hex(),
163 tag_vec[1]
164 ))
165 } else {
166 None
167 }
168 })
169 })
170 .collect();
171
172 if repo_coords.is_empty() {
173 return Vec::new();
174 }
175
176 // Query for PR events (1618) and other related events
177 // that have 'a' tags pointing to our repositories
178 let related_filter = Filter::new().kinds(vec![
179 Kind::Custom(1618), // PR
180 Kind::Custom(1619), // PR Update
181 Kind::Custom(1621), // Issue
182 Kind::Custom(1622), // Reply
183 Kind::Custom(1630), // Status (open)
184 Kind::Custom(1631), // Status (applied)
185 Kind::Custom(1632), // Status (closed)
186 Kind::Custom(1633), // Status (draft)
187 ]);
188
189 let related_events = match self.database.query(related_filter).await {
190 Ok(events) => events,
191 Err(e) => {
192 tracing::warn!("Failed to query related events for Layer 3 filters: {}", e);
193 return Vec::new();
194 }
195 };
196
197 // Collect event IDs that reference our repositories
198 for event in related_events {
199 // Check if this event has an 'a' tag pointing to one of our repos
200 let references_our_repo = event.tags.iter().any(|tag| {
201 let tag_vec = tag.clone().to_vec();
202 tag_vec.len() >= 2 && tag_vec[0] == "a" && repo_coords.contains(&tag_vec[1])
203 });
204
205 if references_our_repo {
206 event_ids.push(event.id.to_hex());
207 }
208 }
209
210 if event_ids.is_empty() {
211 return Vec::new();
212 }
213
214 // Batch event IDs into filters with 'e' tags
215 Self::batch_filters_with_e_tags(event_ids)
216 }
217
218 /// Batch a list of addressable coordinates into filters with 'a' tags
219 ///
220 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters.
221 fn batch_filters_with_a_tags(coords: Vec<String>) -> Vec<Filter> {
222 if coords.is_empty() {
223 return Vec::new();
224 }
225
226 coords
227 .chunks(MAX_TAGS_PER_FILTER)
228 .map(|chunk| {
229 let mut filter = Filter::new();
230 for coord in chunk {
231 filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord.clone());
232 }
233 filter
234 })
235 .collect()
236 }
237
238 /// Batch a list of event IDs into filters with 'e' tags
239 ///
240 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filters.
241 fn batch_filters_with_e_tags(event_ids: Vec<String>) -> Vec<Filter> {
242 if event_ids.is_empty() {
243 return Vec::new();
244 }
245
246 event_ids
247 .chunks(MAX_TAGS_PER_FILTER)
248 .map(|chunk| {
249 let mut filter = Filter::new();
250 for event_id in chunk {
251 filter = filter.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.clone());
252 }
253 filter
254 })
255 .collect()
256 }
257
258 /// Discover relay URLs from stored kind 30617 announcements
259 ///
260 /// Extracts unique relay URLs from `clone` and `relays` tags,
261 /// excluding our own relay domain.
262 pub async fn discover_relay_urls(&self) -> Vec<String> {
263 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT));
264
265 let events = match self.database.query(filter).await {
266 Ok(events) => events,
267 Err(e) => {
268 tracing::warn!("Failed to query announcements for relay discovery: {}", e);
269 return Vec::new();
270 }
271 };
272
273 let mut relay_urls: HashSet<String> = HashSet::new();
274
275 for event in events {
276 for tag in event.tags.iter() {
277 let tag_vec = tag.clone().to_vec();
278 if tag_vec.len() < 2 {
279 continue;
280 }
281
282 // Extract URLs from clone and relays tags
283 if tag_vec[0] == "clone" || tag_vec[0] == "relays" {
284 for value in tag_vec.iter().skip(1) {
285 // Check if it looks like a URL
286 if value.starts_with("ws://")
287 || value.starts_with("wss://")
288 || value.starts_with("http://")
289 || value.starts_with("https://")
290 {
291 // Exclude our own relay
292 if !value.contains(&self.relay_domain) {
293 relay_urls.insert(value.clone());
294 }
295 }
296 }
297 }
298 }
299 }
300
301 relay_urls.into_iter().collect()
302 }
303
304 /// Extract relay URLs from a specific event's clone tags
305 ///
306 /// Returns URLs that are not our own relay.
307 pub fn extract_relay_urls_from_event(&self, event: &Event) -> Vec<String> {
308 let mut urls = Vec::new();
309
310 for tag in event.tags.iter() {
311 let tag_vec = tag.clone().to_vec();
312 if tag_vec.len() < 2 {
313 continue;
314 }
315
316 if tag_vec[0] == "clone" || tag_vec[0] == "relays" {
317 for value in tag_vec.iter().skip(1) {
318 if value.starts_with("ws://")
319 || value.starts_with("wss://")
320 || value.starts_with("http://")
321 || value.starts_with("https://")
322 {
323 if !value.contains(&self.relay_domain) {
324 urls.push(value.clone());
325 }
326 }
327 }
328 }
329 }
330
331 urls
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn test_batch_filters_with_a_tags_empty() {
341 let filters = FilterService::batch_filters_with_a_tags(vec![]);
342 assert!(filters.is_empty());
343 }
344
345 #[test]
346 fn test_batch_filters_with_a_tags_small() {
347 let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()];
348 let filters = FilterService::batch_filters_with_a_tags(coords);
349 assert_eq!(filters.len(), 1);
350 }
351
352 #[test]
353 fn test_batch_filters_with_a_tags_large() {
354 // Create 250 coordinates to test batching
355 let coords: Vec<String> = (0..250)
356 .map(|i| format!("30617:pubkey{}:repo{}", i, i))
357 .collect();
358
359 let filters = FilterService::batch_filters_with_a_tags(coords);
360 assert_eq!(filters.len(), 3); // 100 + 100 + 50
361 }
362
363 #[test]
364 fn test_batch_filters_with_e_tags_empty() {
365 let filters = FilterService::batch_filters_with_e_tags(vec![]);
366 assert!(filters.is_empty());
367 }
368
369 #[test]
370 fn test_batch_filters_with_e_tags_large() {
371 // Create 150 event IDs to test batching
372 let event_ids: Vec<String> = (0..150).map(|i| format!("eventid{:064}", i)).collect();
373
374 let filters = FilterService::batch_filters_with_e_tags(event_ids);
375 assert_eq!(filters.len(), 2); // 100 + 50
376 }
377
378 #[test]
379 fn test_layer1_filters() {
380 // Create a mock database - we'll use a memory database for testing
381 // This test just verifies the filter structure
382 let filter = Filter::new().kinds(vec![
383 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT),
384 Kind::Custom(KIND_MAINTAINER_LIST),
385 ]);
386
387 // Verify the filter has the correct kinds
388 // Note: We can't easily inspect Filter internals, but we can ensure it compiles
389 assert!(!filter.is_empty());
390 }
391} \ No newline at end of file
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 8c883f5..8f6a9bd 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -1,19 +1,32 @@
1//! SyncManager - Coordinates proactive sync operations 1//! SyncManager - Coordinates proactive sync operations
2//! 2//!
3//! The SyncManager spawns connections to configured relays, receives events, 3//! The SyncManager discovers relays from stored announcements, spawns connections
4//! validates them through the write policy, and stores accepted events. 4//! to each relay, receives events, validates them through the write policy,
5//! and stores accepted events.
6//!
7//! ## Phase 2 Features
8//!
9//! - Relay discovery from stored kind 30617 announcements
10//! - Multiple simultaneous relay connections
11//! - Three-layer filter strategy via FilterService
12
13use std::collections::HashSet;
14use std::sync::Arc;
5 15
6use nostr_relay_builder::prelude::*; 16use nostr_relay_builder::prelude::*;
7use tokio::sync::mpsc; 17use tokio::sync::mpsc;
8 18
9use super::connection::{connect_with_retry, SyncedEvent}; 19use super::connection::{connect_with_retry, SyncedEvent};
20use super::filter::FilterService;
10use super::SYNC_SOURCE_ADDR; 21use super::SYNC_SOURCE_ADDR;
11use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 22use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
12 23
13/// Coordinates proactive sync from configured relays 24/// Coordinates proactive sync from configured and discovered relays
14pub struct SyncManager { 25pub struct SyncManager {
15 /// URL of the relay to sync from 26 /// Initial relay URL to sync from (from config)
16 sync_relay_url: String, 27 initial_relay_url: Option<String>,
28 /// Our relay's domain (for filtering)
29 relay_domain: String,
17 /// Database for storing accepted events 30 /// Database for storing accepted events
18 database: SharedDatabase, 31 database: SharedDatabase,
19 /// Write policy for validating events 32 /// Write policy for validating events
@@ -22,13 +35,37 @@ pub struct SyncManager {
22 35
23impl SyncManager { 36impl SyncManager {
24 /// Create a new SyncManager 37 /// Create a new SyncManager
38 ///
39 /// # Arguments
40 /// * `initial_relay_url` - Optional initial relay URL from config
41 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
42 /// * `database` - Shared database for storing events and querying announcements
43 /// * `write_policy` - Write policy for validating synced events
25 pub fn new( 44 pub fn new(
45 initial_relay_url: Option<String>,
46 relay_domain: String,
47 database: SharedDatabase,
48 write_policy: Nip34WritePolicy,
49 ) -> Self {
50 Self {
51 initial_relay_url,
52 relay_domain,
53 database,
54 write_policy,
55 }
56 }
57
58 /// Create a SyncManager with a single relay URL (Phase 1 compatibility)
59 pub fn with_single_relay(
26 sync_relay_url: String, 60 sync_relay_url: String,
27 database: SharedDatabase, 61 database: SharedDatabase,
28 write_policy: Nip34WritePolicy, 62 write_policy: Nip34WritePolicy,
29 ) -> Self { 63 ) -> Self {
64 // Extract domain from URL for filtering
65 let relay_domain = extract_domain_from_url(&sync_relay_url).unwrap_or_default();
30 Self { 66 Self {
31 sync_relay_url, 67 initial_relay_url: Some(sync_relay_url),
68 relay_domain,
32 database, 69 database,
33 write_policy, 70 write_policy,
34 } 71 }
@@ -36,28 +73,94 @@ impl SyncManager {
36 73
37 /// Run the sync manager 74 /// Run the sync manager
38 /// 75 ///
39 /// This spawns a connection task and processes incoming events. 76 /// This discovers relays from stored announcements, spawns connection tasks,
40 /// Runs indefinitely until the task is cancelled. 77 /// and processes incoming events. Runs indefinitely until cancelled.
41 pub async fn run(self) { 78 pub async fn run(self) {
42 tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); 79 tracing::info!(
80 "Starting SyncManager (domain: {}, initial relay: {:?})",
81 self.relay_domain,
82 self.initial_relay_url
83 );
84
85 // Create the filter service
86 let filter_service = Arc::new(FilterService::new(
87 self.database.clone(),
88 self.relay_domain.clone(),
89 ));
43 90
44 // Create channel for receiving events from connection 91 // Create channel for receiving events from all connections
45 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); 92 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
46 93
47 // Spawn connection task with auto-reconnect 94 // Track active relay URLs to avoid duplicates
48 let url = self.sync_relay_url.clone(); 95 let mut active_relays: HashSet<String> = HashSet::new();
49 tokio::spawn(async move { 96
50 connect_with_retry(&url, tx).await; 97 // Start with initial relay if configured
51 }); 98 if let Some(ref url) = self.initial_relay_url {
99 if !self.is_own_relay(url) {
100 tracing::info!("Connecting to initial sync relay: {}", url);
101 active_relays.insert(url.clone());
102 self.spawn_connection(url.clone(), tx.clone(), filter_service.clone());
103 } else {
104 tracing::info!("Skipping initial relay (is our own relay): {}", url);
105 }
106 }
107
108 // Discover additional relays from stored announcements
109 let discovered_urls = filter_service.discover_relay_urls().await;
110 for url in discovered_urls {
111 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
112 tracing::info!("Connecting to discovered relay: {}", url);
113 active_relays.insert(url.clone());
114 self.spawn_connection(url, tx.clone(), filter_service.clone());
115 }
116 }
117
118 if active_relays.is_empty() {
119 tracing::warn!("No sync relays configured or discovered, SyncManager idle");
120 } else {
121 tracing::info!(
122 "SyncManager connected to {} relays: {:?}",
123 active_relays.len(),
124 active_relays
125 );
126 }
52 127
53 // Process incoming events 128 // Process incoming events from all connections
54 while let Some(synced_event) = rx.recv().await { 129 while let Some(synced_event) = rx.recv().await {
130 // Check if this event reveals new relays to sync from
131 let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event);
132 for url in new_urls {
133 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
134 tracing::info!("Discovered new relay from event, connecting: {}", url);
135 active_relays.insert(url.clone());
136 self.spawn_connection(url, tx.clone(), filter_service.clone());
137 }
138 }
139
55 self.process_event(synced_event).await; 140 self.process_event(synced_event).await;
56 } 141 }
57 142
58 tracing::warn!("SyncManager event channel closed, shutting down"); 143 tracing::warn!("SyncManager event channel closed, shutting down");
59 } 144 }
60 145
146 /// Check if a URL points to our own relay
147 fn is_own_relay(&self, url: &str) -> bool {
148 url.contains(&self.relay_domain)
149 }
150
151 /// Spawn a connection task for a relay
152 fn spawn_connection(
153 &self,
154 url: String,
155 tx: mpsc::Sender<SyncedEvent>,
156 filter_service: Arc<FilterService>,
157 ) {
158 let domain = self.relay_domain.clone();
159 tokio::spawn(async move {
160 connect_with_retry(&url, tx, filter_service, &domain).await;
161 });
162 }
163
61 /// Process a single synced event 164 /// Process a single synced event
62 async fn process_event(&self, synced_event: SyncedEvent) { 165 async fn process_event(&self, synced_event: SyncedEvent) {
63 let event = &synced_event.event; 166 let event = &synced_event.event;
@@ -98,4 +201,56 @@ impl SyncManager {
98 } 201 }
99 } 202 }
100 } 203 }
204}
205
206/// Extract domain from a WebSocket URL
207///
208/// Examples:
209/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080"
210/// - "wss://relay.example.com" -> "relay.example.com"
211fn extract_domain_from_url(url: &str) -> Option<String> {
212 let url = url.trim_start_matches("ws://").trim_start_matches("wss://");
213 let url = url.trim_start_matches("http://").trim_start_matches("https://");
214
215 // Remove path
216 let domain = url.split('/').next()?;
217
218 Some(domain.to_string())
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn test_extract_domain_ws() {
227 assert_eq!(
228 extract_domain_from_url("ws://127.0.0.1:8080"),
229 Some("127.0.0.1:8080".to_string())
230 );
231 }
232
233 #[test]
234 fn test_extract_domain_wss() {
235 assert_eq!(
236 extract_domain_from_url("wss://relay.example.com"),
237 Some("relay.example.com".to_string())
238 );
239 }
240
241 #[test]
242 fn test_extract_domain_with_path() {
243 assert_eq!(
244 extract_domain_from_url("ws://example.com/path"),
245 Some("example.com".to_string())
246 );
247 }
248
249 #[test]
250 fn test_extract_domain_http() {
251 assert_eq!(
252 extract_domain_from_url("http://example.com:3000"),
253 Some("example.com:3000".to_string())
254 );
255 }
101} \ No newline at end of file 256} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 279471b..1155eaf 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -3,10 +3,18 @@
3//! This module implements proactive synchronization of kind 30617 (repository state) 3//! This module implements proactive synchronization of kind 30617 (repository state)
4//! events from configured relay(s). Events are validated through the same write policy 4//! events from configured relay(s). Events are validated through the same write policy
5//! as directly-submitted events. 5//! as directly-submitted events.
6//!
7//! ## Three-Layer Filter Strategy (Phase 2)
8//!
9//! - **Layer 1**: Announcement discovery (kinds 30617 + 30618)
10//! - **Layer 2**: Repository events (A/a tags for shared repos)
11//! - **Layer 3**: Related events (E/e tags for discussions, reviews)
6 12
7mod connection; 13mod connection;
14mod filter;
8mod manager; 15mod manager;
9 16
17pub use filter::FilterService;
10pub use manager::SyncManager; 18pub use manager::SyncManager;
11 19
12use std::net::SocketAddr; 20use std::net::SocketAddr;