upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/connection.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:49:05 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:49:05 +0000
commitbf558b0dc17e14f96eea624ea5591315a2909154 (patch)
treef36a9250ad329a933949c842414c3455e4679326 /src/sync/connection.rs
parentb167f1b2ae7edbcab95554b5203d22d9e372c8b5 (diff)
feat(sync): Phase 2 - multi-relay and complete filters
- Add relay discovery from stored announcements - Implement FilterService with three-layer strategy - Support multiple simultaneous relay connections - Filter batching for large tag sets
Diffstat (limited to 'src/sync/connection.rs')
-rw-r--r--src/sync/connection.rs148
1 files changed, 125 insertions, 23 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index 4a79128..76cc8e8 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -1,14 +1,22 @@
1//! WebSocket connection handling for sync 1//! WebSocket connection handling for sync
2//! 2//!
3//! Manages the connection to a source relay, subscribes to kind 30617 events, 3//! Manages the connection to a source relay, subscribes to events using
4//! and passes them through validation. 4//! the three-layer filter strategy, and passes them through validation.
5//!
6//! ## Phase 2 Features
7//!
8//! - Three-layer filter subscriptions:
9//! 1. Layer 1: kinds 30617 + 30618 (announcements)
10//! 2. Layer 2: A/a tags for repository events
11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.)
5 12
13use std::sync::Arc;
6use std::time::Duration; 14use std::time::Duration;
7 15
8use nostr_sdk::prelude::*; 16use nostr_sdk::prelude::*;
9use tokio::sync::mpsc; 17use tokio::sync::mpsc;
10 18
11use super::KIND_REPOSITORY_STATE; 19use super::filter::FilterService;
12 20
13/// Event received from the sync connection 21/// Event received from the sync connection
14#[derive(Debug, Clone)] 22#[derive(Debug, Clone)]
@@ -21,11 +29,17 @@ pub struct SyncedEvent {
21pub struct SyncConnection { 29pub struct SyncConnection {
22 url: String, 30 url: String,
23 client: Client, 31 client: Client,
32 filter_service: Arc<FilterService>,
33 remote_domain: String,
24} 34}
25 35
26impl SyncConnection { 36impl SyncConnection {
27 /// Create a new sync connection to the given relay URL 37 /// Create a new sync connection to the given relay URL
28 pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { 38 pub async fn new(
39 url: &str,
40 filter_service: Arc<FilterService>,
41 remote_domain: &str,
42 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
29 let client = Client::default(); 43 let client = Client::default();
30 44
31 // Add the relay 45 // Add the relay
@@ -39,31 +53,78 @@ impl SyncConnection {
39 Ok(Self { 53 Ok(Self {
40 url: url.to_string(), 54 url: url.to_string(),
41 client, 55 client,
56 filter_service,
57 remote_domain: remote_domain.to_string(),
42 }) 58 })
43 } 59 }
44 60
45 /// Start receiving events and send them through the channel 61 /// Start receiving events and send them through the channel
46 /// 62 ///
47 /// This method runs indefinitely, reconnecting as needed. 63 /// This method runs indefinitely, handling events from all three filter layers.
48 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { 64 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) {
49 // Create filter for kind 30617 (repository state) events 65 // Subscribe to all three filter layers
50 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); 66
51 67 // Layer 1: Announcement discovery (kinds 30617 + 30618)
52 // Subscribe to events 68 let layer1_filters = self.filter_service.get_layer1_filters();
53 match self.client.subscribe(filter, None).await { 69 for filter in &layer1_filters {
54 Ok(output) => { 70 match self.client.subscribe(filter.clone(), None).await {
55 tracing::info!( 71 Ok(output) => {
56 "Subscribed to kind {} events on {} (subscription: {})", 72 tracing::info!(
57 KIND_REPOSITORY_STATE, 73 "Subscribed to Layer 1 (announcements) on {} (subscription: {})",
58 self.url, 74 self.url,
59 output.id() 75 output.id()
60 ); 76 );
77 }
78 Err(e) => {
79 tracing::error!("Failed to subscribe Layer 1 on {}: {}", self.url, e);
80 }
61 } 81 }
62 Err(e) => { 82 }
63 tracing::error!("Failed to subscribe on {}: {}", self.url, e); 83
64 return; 84 // Layer 2: Repository events (A/a tags)
85 let layer2_filters = self
86 .filter_service
87 .get_layer2_filters(&self.remote_domain)
88 .await;
89 for filter in &layer2_filters {
90 match self.client.subscribe(filter.clone(), None).await {
91 Ok(output) => {
92 tracing::info!(
93 "Subscribed to Layer 2 (repo events) on {} (subscription: {})",
94 self.url,
95 output.id()
96 );
97 }
98 Err(e) => {
99 tracing::error!("Failed to subscribe Layer 2 on {}: {}", self.url, e);
100 }
101 }
102 }
103
104 // Layer 3: Related events (E/e tags)
105 let layer3_filters = self.filter_service.get_layer3_filters().await;
106 for filter in &layer3_filters {
107 match self.client.subscribe(filter.clone(), None).await {
108 Ok(output) => {
109 tracing::info!(
110 "Subscribed to Layer 3 (related events) on {} (subscription: {})",
111 self.url,
112 output.id()
113 );
114 }
115 Err(e) => {
116 tracing::error!("Failed to subscribe Layer 3 on {}: {}", self.url, e);
117 }
65 } 118 }
66 }; 119 }
120
121 tracing::info!(
122 "Sync subscriptions active on {} (L1: {}, L2: {}, L3: {})",
123 self.url,
124 layer1_filters.len(),
125 layer2_filters.len(),
126 layer3_filters.len()
127 );
67 128
68 // Handle incoming notifications 129 // Handle incoming notifications
69 let url = self.url.clone(); 130 let url = self.url.clone();
@@ -106,19 +167,29 @@ impl SyncConnection {
106 .await 167 .await
107 .ok(); 168 .ok();
108 } 169 }
109
110} 170}
111 171
112/// Reconnect loop with exponential backoff 172/// Reconnect loop with exponential backoff
173///
174/// # Arguments
175/// * `url` - The relay URL to connect to
176/// * `tx` - Channel sender for synced events
177/// * `filter_service` - FilterService for building subscriptions
178/// * `our_domain` - Our relay's domain (used to extract remote domain)
113pub async fn connect_with_retry( 179pub async fn connect_with_retry(
114 url: &str, 180 url: &str,
115 tx: mpsc::Sender<SyncedEvent>, 181 tx: mpsc::Sender<SyncedEvent>,
182 filter_service: Arc<FilterService>,
183 _our_domain: &str,
116) { 184) {
117 let mut backoff = Duration::from_secs(1); 185 let mut backoff = Duration::from_secs(1);
118 let max_backoff = Duration::from_secs(60); 186 let max_backoff = Duration::from_secs(60);
119 187
188 // Extract remote domain from URL
189 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
190
120 loop { 191 loop {
121 match SyncConnection::new(url).await { 192 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await {
122 Ok(conn) => { 193 Ok(conn) => {
123 backoff = Duration::from_secs(1); // Reset backoff on successful connection 194 backoff = Duration::from_secs(1); // Reset backoff on successful connection
124 conn.run(tx.clone()).await; 195 conn.run(tx.clone()).await;
@@ -140,4 +211,35 @@ pub async fn connect_with_retry(
140 // Exponential backoff 211 // Exponential backoff
141 backoff = std::cmp::min(backoff * 2, max_backoff); 212 backoff = std::cmp::min(backoff * 2, max_backoff);
142 } 213 }
214}
215
216/// Extract domain from a URL
217fn extract_domain_from_url(url: &str) -> Option<String> {
218 let url = url
219 .trim_start_matches("ws://")
220 .trim_start_matches("wss://")
221 .trim_start_matches("http://")
222 .trim_start_matches("https://");
223
224 // Remove path
225 let domain = url.split('/').next()?;
226
227 Some(domain.to_string())
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_extract_domain() {
236 assert_eq!(
237 extract_domain_from_url("ws://127.0.0.1:8080"),
238 Some("127.0.0.1:8080".to_string())
239 );
240 assert_eq!(
241 extract_domain_from_url("wss://relay.example.com/path"),
242 Some("relay.example.com".to_string())
243 );
244 }
143} \ No newline at end of file 245} \ No newline at end of file