upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/manager.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:49:05 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:49:05 +0000
commitbf558b0dc17e14f96eea624ea5591315a2909154 (patch)
treef36a9250ad329a933949c842414c3455e4679326 /src/sync/manager.rs
parentb167f1b2ae7edbcab95554b5203d22d9e372c8b5 (diff)
feat(sync): Phase 2 - multi-relay and complete filters
- Add relay discovery from stored announcements - Implement FilterService with three-layer strategy - Support multiple simultaneous relay connections - Filter batching for large tag sets
Diffstat (limited to 'src/sync/manager.rs')
-rw-r--r--src/sync/manager.rs187
1 files changed, 171 insertions, 16 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 8c883f5..8f6a9bd 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -1,19 +1,32 @@
1//! SyncManager - Coordinates proactive sync operations 1//! SyncManager - Coordinates proactive sync operations
2//! 2//!
3//! The SyncManager spawns connections to configured relays, receives events, 3//! The SyncManager discovers relays from stored announcements, spawns connections
4//! validates them through the write policy, and stores accepted events. 4//! to each relay, receives events, validates them through the write policy,
5//! and stores accepted events.
6//!
7//! ## Phase 2 Features
8//!
9//! - Relay discovery from stored kind 30617 announcements
10//! - Multiple simultaneous relay connections
11//! - Three-layer filter strategy via FilterService
12
13use std::collections::HashSet;
14use std::sync::Arc;
5 15
6use nostr_relay_builder::prelude::*; 16use nostr_relay_builder::prelude::*;
7use tokio::sync::mpsc; 17use tokio::sync::mpsc;
8 18
9use super::connection::{connect_with_retry, SyncedEvent}; 19use super::connection::{connect_with_retry, SyncedEvent};
20use super::filter::FilterService;
10use super::SYNC_SOURCE_ADDR; 21use super::SYNC_SOURCE_ADDR;
11use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 22use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
12 23
13/// Coordinates proactive sync from configured relays 24/// Coordinates proactive sync from configured and discovered relays
14pub struct SyncManager { 25pub struct SyncManager {
15 /// URL of the relay to sync from 26 /// Initial relay URL to sync from (from config)
16 sync_relay_url: String, 27 initial_relay_url: Option<String>,
28 /// Our relay's domain (for filtering)
29 relay_domain: String,
17 /// Database for storing accepted events 30 /// Database for storing accepted events
18 database: SharedDatabase, 31 database: SharedDatabase,
19 /// Write policy for validating events 32 /// Write policy for validating events
@@ -22,13 +35,37 @@ pub struct SyncManager {
22 35
23impl SyncManager { 36impl SyncManager {
24 /// Create a new SyncManager 37 /// Create a new SyncManager
38 ///
39 /// # Arguments
40 /// * `initial_relay_url` - Optional initial relay URL from config
41 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
42 /// * `database` - Shared database for storing events and querying announcements
43 /// * `write_policy` - Write policy for validating synced events
25 pub fn new( 44 pub fn new(
45 initial_relay_url: Option<String>,
46 relay_domain: String,
47 database: SharedDatabase,
48 write_policy: Nip34WritePolicy,
49 ) -> Self {
50 Self {
51 initial_relay_url,
52 relay_domain,
53 database,
54 write_policy,
55 }
56 }
57
58 /// Create a SyncManager with a single relay URL (Phase 1 compatibility)
59 pub fn with_single_relay(
26 sync_relay_url: String, 60 sync_relay_url: String,
27 database: SharedDatabase, 61 database: SharedDatabase,
28 write_policy: Nip34WritePolicy, 62 write_policy: Nip34WritePolicy,
29 ) -> Self { 63 ) -> Self {
64 // Extract domain from URL for filtering
65 let relay_domain = extract_domain_from_url(&sync_relay_url).unwrap_or_default();
30 Self { 66 Self {
31 sync_relay_url, 67 initial_relay_url: Some(sync_relay_url),
68 relay_domain,
32 database, 69 database,
33 write_policy, 70 write_policy,
34 } 71 }
@@ -36,28 +73,94 @@ impl SyncManager {
36 73
37 /// Run the sync manager 74 /// Run the sync manager
38 /// 75 ///
39 /// This spawns a connection task and processes incoming events. 76 /// This discovers relays from stored announcements, spawns connection tasks,
40 /// Runs indefinitely until the task is cancelled. 77 /// and processes incoming events. Runs indefinitely until cancelled.
41 pub async fn run(self) { 78 pub async fn run(self) {
42 tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); 79 tracing::info!(
80 "Starting SyncManager (domain: {}, initial relay: {:?})",
81 self.relay_domain,
82 self.initial_relay_url
83 );
84
85 // Create the filter service
86 let filter_service = Arc::new(FilterService::new(
87 self.database.clone(),
88 self.relay_domain.clone(),
89 ));
43 90
44 // Create channel for receiving events from connection 91 // Create channel for receiving events from all connections
45 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); 92 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
46 93
47 // Spawn connection task with auto-reconnect 94 // Track active relay URLs to avoid duplicates
48 let url = self.sync_relay_url.clone(); 95 let mut active_relays: HashSet<String> = HashSet::new();
49 tokio::spawn(async move { 96
50 connect_with_retry(&url, tx).await; 97 // Start with initial relay if configured
51 }); 98 if let Some(ref url) = self.initial_relay_url {
99 if !self.is_own_relay(url) {
100 tracing::info!("Connecting to initial sync relay: {}", url);
101 active_relays.insert(url.clone());
102 self.spawn_connection(url.clone(), tx.clone(), filter_service.clone());
103 } else {
104 tracing::info!("Skipping initial relay (is our own relay): {}", url);
105 }
106 }
107
108 // Discover additional relays from stored announcements
109 let discovered_urls = filter_service.discover_relay_urls().await;
110 for url in discovered_urls {
111 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
112 tracing::info!("Connecting to discovered relay: {}", url);
113 active_relays.insert(url.clone());
114 self.spawn_connection(url, tx.clone(), filter_service.clone());
115 }
116 }
117
118 if active_relays.is_empty() {
119 tracing::warn!("No sync relays configured or discovered, SyncManager idle");
120 } else {
121 tracing::info!(
122 "SyncManager connected to {} relays: {:?}",
123 active_relays.len(),
124 active_relays
125 );
126 }
52 127
53 // Process incoming events 128 // Process incoming events from all connections
54 while let Some(synced_event) = rx.recv().await { 129 while let Some(synced_event) = rx.recv().await {
130 // Check if this event reveals new relays to sync from
131 let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event);
132 for url in new_urls {
133 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
134 tracing::info!("Discovered new relay from event, connecting: {}", url);
135 active_relays.insert(url.clone());
136 self.spawn_connection(url, tx.clone(), filter_service.clone());
137 }
138 }
139
55 self.process_event(synced_event).await; 140 self.process_event(synced_event).await;
56 } 141 }
57 142
58 tracing::warn!("SyncManager event channel closed, shutting down"); 143 tracing::warn!("SyncManager event channel closed, shutting down");
59 } 144 }
60 145
146 /// Check if a URL points to our own relay
147 fn is_own_relay(&self, url: &str) -> bool {
148 url.contains(&self.relay_domain)
149 }
150
151 /// Spawn a connection task for a relay
152 fn spawn_connection(
153 &self,
154 url: String,
155 tx: mpsc::Sender<SyncedEvent>,
156 filter_service: Arc<FilterService>,
157 ) {
158 let domain = self.relay_domain.clone();
159 tokio::spawn(async move {
160 connect_with_retry(&url, tx, filter_service, &domain).await;
161 });
162 }
163
61 /// Process a single synced event 164 /// Process a single synced event
62 async fn process_event(&self, synced_event: SyncedEvent) { 165 async fn process_event(&self, synced_event: SyncedEvent) {
63 let event = &synced_event.event; 166 let event = &synced_event.event;
@@ -98,4 +201,56 @@ impl SyncManager {
98 } 201 }
99 } 202 }
100 } 203 }
204}
205
206/// Extract domain from a WebSocket URL
207///
208/// Examples:
209/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080"
210/// - "wss://relay.example.com" -> "relay.example.com"
211fn extract_domain_from_url(url: &str) -> Option<String> {
212 let url = url.trim_start_matches("ws://").trim_start_matches("wss://");
213 let url = url.trim_start_matches("http://").trim_start_matches("https://");
214
215 // Remove path
216 let domain = url.split('/').next()?;
217
218 Some(domain.to_string())
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn test_extract_domain_ws() {
227 assert_eq!(
228 extract_domain_from_url("ws://127.0.0.1:8080"),
229 Some("127.0.0.1:8080".to_string())
230 );
231 }
232
233 #[test]
234 fn test_extract_domain_wss() {
235 assert_eq!(
236 extract_domain_from_url("wss://relay.example.com"),
237 Some("relay.example.com".to_string())
238 );
239 }
240
241 #[test]
242 fn test_extract_domain_with_path() {
243 assert_eq!(
244 extract_domain_from_url("ws://example.com/path"),
245 Some("example.com".to_string())
246 );
247 }
248
249 #[test]
250 fn test_extract_domain_http() {
251 assert_eq!(
252 extract_domain_from_url("http://example.com:3000"),
253 Some("example.com:3000".to_string())
254 );
255 }
101} \ No newline at end of file 256} \ No newline at end of file