diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:49:05 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:49:05 +0000 |
| commit | bf558b0dc17e14f96eea624ea5591315a2909154 (patch) | |
| tree | f36a9250ad329a933949c842414c3455e4679326 /src/sync/connection.rs | |
| parent | b167f1b2ae7edbcab95554b5203d22d9e372c8b5 (diff) | |
feat(sync): Phase 2 - multi-relay and complete filters
- Add relay discovery from stored announcements
- Implement FilterService with three-layer strategy
- Support multiple simultaneous relay connections
- Filter batching for large tag sets
Diffstat (limited to 'src/sync/connection.rs')
| -rw-r--r-- | src/sync/connection.rs | 148 |
1 files changed, 125 insertions, 23 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 4a79128..76cc8e8 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs | |||
| @@ -1,14 +1,22 @@ | |||
| 1 | //! WebSocket connection handling for sync | 1 | //! WebSocket connection handling for sync |
| 2 | //! | 2 | //! |
| 3 | //! Manages the connection to a source relay, subscribes to kind 30617 events, | 3 | //! Manages the connection to a source relay, subscribes to events using |
| 4 | //! and passes them through validation. | 4 | //! the three-layer filter strategy, and passes them through validation. |
| 5 | //! | ||
| 6 | //! ## Phase 2 Features | ||
| 7 | //! | ||
| 8 | //! - Three-layer filter subscriptions: | ||
| 9 | //! 1. Layer 1: kinds 30617 + 30618 (announcements) | ||
| 10 | //! 2. Layer 2: A/a tags for repository events | ||
| 11 | //! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) | ||
| 5 | 12 | ||
| 13 | use std::sync::Arc; | ||
| 6 | use std::time::Duration; | 14 | use std::time::Duration; |
| 7 | 15 | ||
| 8 | use nostr_sdk::prelude::*; | 16 | use nostr_sdk::prelude::*; |
| 9 | use tokio::sync::mpsc; | 17 | use tokio::sync::mpsc; |
| 10 | 18 | ||
| 11 | use super::KIND_REPOSITORY_STATE; | 19 | use super::filter::FilterService; |
| 12 | 20 | ||
| 13 | /// Event received from the sync connection | 21 | /// Event received from the sync connection |
| 14 | #[derive(Debug, Clone)] | 22 | #[derive(Debug, Clone)] |
| @@ -21,11 +29,17 @@ pub struct SyncedEvent { | |||
| 21 | pub struct SyncConnection { | 29 | pub struct SyncConnection { |
| 22 | url: String, | 30 | url: String, |
| 23 | client: Client, | 31 | client: Client, |
| 32 | filter_service: Arc<FilterService>, | ||
| 33 | remote_domain: String, | ||
| 24 | } | 34 | } |
| 25 | 35 | ||
| 26 | impl SyncConnection { | 36 | impl SyncConnection { |
| 27 | /// Create a new sync connection to the given relay URL | 37 | /// Create a new sync connection to the given relay URL |
| 28 | pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { | 38 | pub async fn new( |
| 39 | url: &str, | ||
| 40 | filter_service: Arc<FilterService>, | ||
| 41 | remote_domain: &str, | ||
| 42 | ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { | ||
| 29 | let client = Client::default(); | 43 | let client = Client::default(); |
| 30 | 44 | ||
| 31 | // Add the relay | 45 | // Add the relay |
| @@ -39,31 +53,78 @@ impl SyncConnection { | |||
| 39 | Ok(Self { | 53 | Ok(Self { |
| 40 | url: url.to_string(), | 54 | url: url.to_string(), |
| 41 | client, | 55 | client, |
| 56 | filter_service, | ||
| 57 | remote_domain: remote_domain.to_string(), | ||
| 42 | }) | 58 | }) |
| 43 | } | 59 | } |
| 44 | 60 | ||
| 45 | /// Start receiving events and send them through the channel | 61 | /// Start receiving events and send them through the channel |
| 46 | /// | 62 | /// |
| 47 | /// This method runs indefinitely, reconnecting as needed. | 63 | /// This method runs indefinitely, handling events from all three filter layers. |
| 48 | pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { | 64 | pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { |
| 49 | // Create filter for kind 30617 (repository state) events | 65 | // Subscribe to all three filter layers |
| 50 | let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); | 66 | |
| 51 | 67 | // Layer 1: Announcement discovery (kinds 30617 + 30618) | |
| 52 | // Subscribe to events | 68 | let layer1_filters = self.filter_service.get_layer1_filters(); |
| 53 | match self.client.subscribe(filter, None).await { | 69 | for filter in &layer1_filters { |
| 54 | Ok(output) => { | 70 | match self.client.subscribe(filter.clone(), None).await { |
| 55 | tracing::info!( | 71 | Ok(output) => { |
| 56 | "Subscribed to kind {} events on {} (subscription: {})", | 72 | tracing::info!( |
| 57 | KIND_REPOSITORY_STATE, | 73 | "Subscribed to Layer 1 (announcements) on {} (subscription: {})", |
| 58 | self.url, | 74 | self.url, |
| 59 | output.id() | 75 | output.id() |
| 60 | ); | 76 | ); |
| 77 | } | ||
| 78 | Err(e) => { | ||
| 79 | tracing::error!("Failed to subscribe Layer 1 on {}: {}", self.url, e); | ||
| 80 | } | ||
| 61 | } | 81 | } |
| 62 | Err(e) => { | 82 | } |
| 63 | tracing::error!("Failed to subscribe on {}: {}", self.url, e); | 83 | |
| 64 | return; | 84 | // Layer 2: Repository events (A/a tags) |
| 85 | let layer2_filters = self | ||
| 86 | .filter_service | ||
| 87 | .get_layer2_filters(&self.remote_domain) | ||
| 88 | .await; | ||
| 89 | for filter in &layer2_filters { | ||
| 90 | match self.client.subscribe(filter.clone(), None).await { | ||
| 91 | Ok(output) => { | ||
| 92 | tracing::info!( | ||
| 93 | "Subscribed to Layer 2 (repo events) on {} (subscription: {})", | ||
| 94 | self.url, | ||
| 95 | output.id() | ||
| 96 | ); | ||
| 97 | } | ||
| 98 | Err(e) => { | ||
| 99 | tracing::error!("Failed to subscribe Layer 2 on {}: {}", self.url, e); | ||
| 100 | } | ||
| 101 | } | ||
| 102 | } | ||
| 103 | |||
| 104 | // Layer 3: Related events (E/e tags) | ||
| 105 | let layer3_filters = self.filter_service.get_layer3_filters().await; | ||
| 106 | for filter in &layer3_filters { | ||
| 107 | match self.client.subscribe(filter.clone(), None).await { | ||
| 108 | Ok(output) => { | ||
| 109 | tracing::info!( | ||
| 110 | "Subscribed to Layer 3 (related events) on {} (subscription: {})", | ||
| 111 | self.url, | ||
| 112 | output.id() | ||
| 113 | ); | ||
| 114 | } | ||
| 115 | Err(e) => { | ||
| 116 | tracing::error!("Failed to subscribe Layer 3 on {}: {}", self.url, e); | ||
| 117 | } | ||
| 65 | } | 118 | } |
| 66 | }; | 119 | } |
| 120 | |||
| 121 | tracing::info!( | ||
| 122 | "Sync subscriptions active on {} (L1: {}, L2: {}, L3: {})", | ||
| 123 | self.url, | ||
| 124 | layer1_filters.len(), | ||
| 125 | layer2_filters.len(), | ||
| 126 | layer3_filters.len() | ||
| 127 | ); | ||
| 67 | 128 | ||
| 68 | // Handle incoming notifications | 129 | // Handle incoming notifications |
| 69 | let url = self.url.clone(); | 130 | let url = self.url.clone(); |
| @@ -106,19 +167,29 @@ impl SyncConnection { | |||
| 106 | .await | 167 | .await |
| 107 | .ok(); | 168 | .ok(); |
| 108 | } | 169 | } |
| 109 | |||
| 110 | } | 170 | } |
| 111 | 171 | ||
| 112 | /// Reconnect loop with exponential backoff | 172 | /// Reconnect loop with exponential backoff |
| 173 | /// | ||
| 174 | /// # Arguments | ||
| 175 | /// * `url` - The relay URL to connect to | ||
| 176 | /// * `tx` - Channel sender for synced events | ||
| 177 | /// * `filter_service` - FilterService for building subscriptions | ||
| 178 | /// * `our_domain` - Our relay's domain (used to extract remote domain) | ||
| 113 | pub async fn connect_with_retry( | 179 | pub async fn connect_with_retry( |
| 114 | url: &str, | 180 | url: &str, |
| 115 | tx: mpsc::Sender<SyncedEvent>, | 181 | tx: mpsc::Sender<SyncedEvent>, |
| 182 | filter_service: Arc<FilterService>, | ||
| 183 | _our_domain: &str, | ||
| 116 | ) { | 184 | ) { |
| 117 | let mut backoff = Duration::from_secs(1); | 185 | let mut backoff = Duration::from_secs(1); |
| 118 | let max_backoff = Duration::from_secs(60); | 186 | let max_backoff = Duration::from_secs(60); |
| 119 | 187 | ||
| 188 | // Extract remote domain from URL | ||
| 189 | let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); | ||
| 190 | |||
| 120 | loop { | 191 | loop { |
| 121 | match SyncConnection::new(url).await { | 192 | match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { |
| 122 | Ok(conn) => { | 193 | Ok(conn) => { |
| 123 | backoff = Duration::from_secs(1); // Reset backoff on successful connection | 194 | backoff = Duration::from_secs(1); // Reset backoff on successful connection |
| 124 | conn.run(tx.clone()).await; | 195 | conn.run(tx.clone()).await; |
| @@ -140,4 +211,35 @@ pub async fn connect_with_retry( | |||
| 140 | // Exponential backoff | 211 | // Exponential backoff |
| 141 | backoff = std::cmp::min(backoff * 2, max_backoff); | 212 | backoff = std::cmp::min(backoff * 2, max_backoff); |
| 142 | } | 213 | } |
| 214 | } | ||
| 215 | |||
| 216 | /// Extract domain from a URL | ||
| 217 | fn extract_domain_from_url(url: &str) -> Option<String> { | ||
| 218 | let url = url | ||
| 219 | .trim_start_matches("ws://") | ||
| 220 | .trim_start_matches("wss://") | ||
| 221 | .trim_start_matches("http://") | ||
| 222 | .trim_start_matches("https://"); | ||
| 223 | |||
| 224 | // Remove path | ||
| 225 | let domain = url.split('/').next()?; | ||
| 226 | |||
| 227 | Some(domain.to_string()) | ||
| 228 | } | ||
| 229 | |||
| 230 | #[cfg(test)] | ||
| 231 | mod tests { | ||
| 232 | use super::*; | ||
| 233 | |||
| 234 | #[test] | ||
| 235 | fn test_extract_domain() { | ||
| 236 | assert_eq!( | ||
| 237 | extract_domain_from_url("ws://127.0.0.1:8080"), | ||
| 238 | Some("127.0.0.1:8080".to_string()) | ||
| 239 | ); | ||
| 240 | assert_eq!( | ||
| 241 | extract_domain_from_url("wss://relay.example.com/path"), | ||
| 242 | Some("relay.example.com".to_string()) | ||
| 243 | ); | ||
| 244 | } | ||
| 143 | } \ No newline at end of file | 245 | } \ No newline at end of file |