diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:49:05 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:49:05 +0000 |
| commit | bf558b0dc17e14f96eea624ea5591315a2909154 (patch) | |
| tree | f36a9250ad329a933949c842414c3455e4679326 /src/sync/manager.rs | |
| parent | b167f1b2ae7edbcab95554b5203d22d9e372c8b5 (diff) | |
feat(sync): Phase 2 - multi-relay and complete filters
- Add relay discovery from stored announcements
- Implement FilterService with three-layer strategy
- Support multiple simultaneous relay connections
- Filter batching for large tag sets
Diffstat (limited to 'src/sync/manager.rs')
| -rw-r--r-- | src/sync/manager.rs | 187 |
1 files changed, 171 insertions, 16 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 8c883f5..8f6a9bd 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs | |||
| @@ -1,19 +1,32 @@ | |||
| 1 | //! SyncManager - Coordinates proactive sync operations | 1 | //! SyncManager - Coordinates proactive sync operations |
| 2 | //! | 2 | //! |
| 3 | //! The SyncManager spawns connections to configured relays, receives events, | 3 | //! The SyncManager discovers relays from stored announcements, spawns connections |
| 4 | //! validates them through the write policy, and stores accepted events. | 4 | //! to each relay, receives events, validates them through the write policy, |
| 5 | //! and stores accepted events. | ||
| 6 | //! | ||
| 7 | //! ## Phase 2 Features | ||
| 8 | //! | ||
| 9 | //! - Relay discovery from stored kind 30617 announcements | ||
| 10 | //! - Multiple simultaneous relay connections | ||
| 11 | //! - Three-layer filter strategy via FilterService | ||
| 12 | |||
| 13 | use std::collections::HashSet; | ||
| 14 | use std::sync::Arc; | ||
| 5 | 15 | ||
| 6 | use nostr_relay_builder::prelude::*; | 16 | use nostr_relay_builder::prelude::*; |
| 7 | use tokio::sync::mpsc; | 17 | use tokio::sync::mpsc; |
| 8 | 18 | ||
| 9 | use super::connection::{connect_with_retry, SyncedEvent}; | 19 | use super::connection::{connect_with_retry, SyncedEvent}; |
| 20 | use super::filter::FilterService; | ||
| 10 | use super::SYNC_SOURCE_ADDR; | 21 | use super::SYNC_SOURCE_ADDR; |
| 11 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 22 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 12 | 23 | ||
| 13 | /// Coordinates proactive sync from configured relays | 24 | /// Coordinates proactive sync from configured and discovered relays |
| 14 | pub struct SyncManager { | 25 | pub struct SyncManager { |
| 15 | /// URL of the relay to sync from | 26 | /// Initial relay URL to sync from (from config) |
| 16 | sync_relay_url: String, | 27 | initial_relay_url: Option<String>, |
| 28 | /// Our relay's domain (for filtering) | ||
| 29 | relay_domain: String, | ||
| 17 | /// Database for storing accepted events | 30 | /// Database for storing accepted events |
| 18 | database: SharedDatabase, | 31 | database: SharedDatabase, |
| 19 | /// Write policy for validating events | 32 | /// Write policy for validating events |
| @@ -22,13 +35,37 @@ pub struct SyncManager { | |||
| 22 | 35 | ||
| 23 | impl SyncManager { | 36 | impl SyncManager { |
| 24 | /// Create a new SyncManager | 37 | /// Create a new SyncManager |
| 38 | /// | ||
| 39 | /// # Arguments | ||
| 40 | /// * `initial_relay_url` - Optional initial relay URL from config | ||
| 41 | /// * `relay_domain` - Our relay's domain (used to exclude self from sync) | ||
| 42 | /// * `database` - Shared database for storing events and querying announcements | ||
| 43 | /// * `write_policy` - Write policy for validating synced events | ||
| 25 | pub fn new( | 44 | pub fn new( |
| 45 | initial_relay_url: Option<String>, | ||
| 46 | relay_domain: String, | ||
| 47 | database: SharedDatabase, | ||
| 48 | write_policy: Nip34WritePolicy, | ||
| 49 | ) -> Self { | ||
| 50 | Self { | ||
| 51 | initial_relay_url, | ||
| 52 | relay_domain, | ||
| 53 | database, | ||
| 54 | write_policy, | ||
| 55 | } | ||
| 56 | } | ||
| 57 | |||
| 58 | /// Create a SyncManager with a single relay URL (Phase 1 compatibility) | ||
| 59 | pub fn with_single_relay( | ||
| 26 | sync_relay_url: String, | 60 | sync_relay_url: String, |
| 27 | database: SharedDatabase, | 61 | database: SharedDatabase, |
| 28 | write_policy: Nip34WritePolicy, | 62 | write_policy: Nip34WritePolicy, |
| 29 | ) -> Self { | 63 | ) -> Self { |
| 64 | // Extract domain from URL for filtering | ||
| 65 | let relay_domain = extract_domain_from_url(&sync_relay_url).unwrap_or_default(); | ||
| 30 | Self { | 66 | Self { |
| 31 | sync_relay_url, | 67 | initial_relay_url: Some(sync_relay_url), |
| 68 | relay_domain, | ||
| 32 | database, | 69 | database, |
| 33 | write_policy, | 70 | write_policy, |
| 34 | } | 71 | } |
| @@ -36,28 +73,94 @@ impl SyncManager { | |||
| 36 | 73 | ||
| 37 | /// Run the sync manager | 74 | /// Run the sync manager |
| 38 | /// | 75 | /// |
| 39 | /// This spawns a connection task and processes incoming events. | 76 | /// This discovers relays from stored announcements, spawns connection tasks, |
| 40 | /// Runs indefinitely until the task is cancelled. | 77 | /// and processes incoming events. Runs indefinitely until cancelled. |
| 41 | pub async fn run(self) { | 78 | pub async fn run(self) { |
| 42 | tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); | 79 | tracing::info!( |
| 80 | "Starting SyncManager (domain: {}, initial relay: {:?})", | ||
| 81 | self.relay_domain, | ||
| 82 | self.initial_relay_url | ||
| 83 | ); | ||
| 84 | |||
| 85 | // Create the filter service | ||
| 86 | let filter_service = Arc::new(FilterService::new( | ||
| 87 | self.database.clone(), | ||
| 88 | self.relay_domain.clone(), | ||
| 89 | )); | ||
| 43 | 90 | ||
| 44 | // Create channel for receiving events from connection | 91 | // Create channel for receiving events from all connections |
| 45 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); | 92 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); |
| 46 | 93 | ||
| 47 | // Spawn connection task with auto-reconnect | 94 | // Track active relay URLs to avoid duplicates |
| 48 | let url = self.sync_relay_url.clone(); | 95 | let mut active_relays: HashSet<String> = HashSet::new(); |
| 49 | tokio::spawn(async move { | 96 | |
| 50 | connect_with_retry(&url, tx).await; | 97 | // Start with initial relay if configured |
| 51 | }); | 98 | if let Some(ref url) = self.initial_relay_url { |
| 99 | if !self.is_own_relay(url) { | ||
| 100 | tracing::info!("Connecting to initial sync relay: {}", url); | ||
| 101 | active_relays.insert(url.clone()); | ||
| 102 | self.spawn_connection(url.clone(), tx.clone(), filter_service.clone()); | ||
| 103 | } else { | ||
| 104 | tracing::info!("Skipping initial relay (is our own relay): {}", url); | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | // Discover additional relays from stored announcements | ||
| 109 | let discovered_urls = filter_service.discover_relay_urls().await; | ||
| 110 | for url in discovered_urls { | ||
| 111 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { | ||
| 112 | tracing::info!("Connecting to discovered relay: {}", url); | ||
| 113 | active_relays.insert(url.clone()); | ||
| 114 | self.spawn_connection(url, tx.clone(), filter_service.clone()); | ||
| 115 | } | ||
| 116 | } | ||
| 117 | |||
| 118 | if active_relays.is_empty() { | ||
| 119 | tracing::warn!("No sync relays configured or discovered, SyncManager idle"); | ||
| 120 | } else { | ||
| 121 | tracing::info!( | ||
| 122 | "SyncManager connected to {} relays: {:?}", | ||
| 123 | active_relays.len(), | ||
| 124 | active_relays | ||
| 125 | ); | ||
| 126 | } | ||
| 52 | 127 | ||
| 53 | // Process incoming events | 128 | // Process incoming events from all connections |
| 54 | while let Some(synced_event) = rx.recv().await { | 129 | while let Some(synced_event) = rx.recv().await { |
| 130 | // Check if this event reveals new relays to sync from | ||
| 131 | let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event); | ||
| 132 | for url in new_urls { | ||
| 133 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { | ||
| 134 | tracing::info!("Discovered new relay from event, connecting: {}", url); | ||
| 135 | active_relays.insert(url.clone()); | ||
| 136 | self.spawn_connection(url, tx.clone(), filter_service.clone()); | ||
| 137 | } | ||
| 138 | } | ||
| 139 | |||
| 55 | self.process_event(synced_event).await; | 140 | self.process_event(synced_event).await; |
| 56 | } | 141 | } |
| 57 | 142 | ||
| 58 | tracing::warn!("SyncManager event channel closed, shutting down"); | 143 | tracing::warn!("SyncManager event channel closed, shutting down"); |
| 59 | } | 144 | } |
| 60 | 145 | ||
| 146 | /// Check if a URL points to our own relay | ||
| 147 | fn is_own_relay(&self, url: &str) -> bool { | ||
| 148 | url.contains(&self.relay_domain) | ||
| 149 | } | ||
| 150 | |||
| 151 | /// Spawn a connection task for a relay | ||
| 152 | fn spawn_connection( | ||
| 153 | &self, | ||
| 154 | url: String, | ||
| 155 | tx: mpsc::Sender<SyncedEvent>, | ||
| 156 | filter_service: Arc<FilterService>, | ||
| 157 | ) { | ||
| 158 | let domain = self.relay_domain.clone(); | ||
| 159 | tokio::spawn(async move { | ||
| 160 | connect_with_retry(&url, tx, filter_service, &domain).await; | ||
| 161 | }); | ||
| 162 | } | ||
| 163 | |||
| 61 | /// Process a single synced event | 164 | /// Process a single synced event |
| 62 | async fn process_event(&self, synced_event: SyncedEvent) { | 165 | async fn process_event(&self, synced_event: SyncedEvent) { |
| 63 | let event = &synced_event.event; | 166 | let event = &synced_event.event; |
| @@ -98,4 +201,56 @@ impl SyncManager { | |||
| 98 | } | 201 | } |
| 99 | } | 202 | } |
| 100 | } | 203 | } |
| 204 | } | ||
| 205 | |||
| 206 | /// Extract domain from a WebSocket URL | ||
| 207 | /// | ||
| 208 | /// Examples: | ||
| 209 | /// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" | ||
| 210 | /// - "wss://relay.example.com" -> "relay.example.com" | ||
| 211 | fn extract_domain_from_url(url: &str) -> Option<String> { | ||
| 212 | let url = url.trim_start_matches("ws://").trim_start_matches("wss://"); | ||
| 213 | let url = url.trim_start_matches("http://").trim_start_matches("https://"); | ||
| 214 | |||
| 215 | // Remove path | ||
| 216 | let domain = url.split('/').next()?; | ||
| 217 | |||
| 218 | Some(domain.to_string()) | ||
| 219 | } | ||
| 220 | |||
| 221 | #[cfg(test)] | ||
| 222 | mod tests { | ||
| 223 | use super::*; | ||
| 224 | |||
| 225 | #[test] | ||
| 226 | fn test_extract_domain_ws() { | ||
| 227 | assert_eq!( | ||
| 228 | extract_domain_from_url("ws://127.0.0.1:8080"), | ||
| 229 | Some("127.0.0.1:8080".to_string()) | ||
| 230 | ); | ||
| 231 | } | ||
| 232 | |||
| 233 | #[test] | ||
| 234 | fn test_extract_domain_wss() { | ||
| 235 | assert_eq!( | ||
| 236 | extract_domain_from_url("wss://relay.example.com"), | ||
| 237 | Some("relay.example.com".to_string()) | ||
| 238 | ); | ||
| 239 | } | ||
| 240 | |||
| 241 | #[test] | ||
| 242 | fn test_extract_domain_with_path() { | ||
| 243 | assert_eq!( | ||
| 244 | extract_domain_from_url("ws://example.com/path"), | ||
| 245 | Some("example.com".to_string()) | ||
| 246 | ); | ||
| 247 | } | ||
| 248 | |||
| 249 | #[test] | ||
| 250 | fn test_extract_domain_http() { | ||
| 251 | assert_eq!( | ||
| 252 | extract_domain_from_url("http://example.com:3000"), | ||
| 253 | Some("example.com:3000".to_string()) | ||
| 254 | ); | ||
| 255 | } | ||
| 101 | } \ No newline at end of file | 256 | } \ No newline at end of file |