upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/connection.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:12 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:18 +0000
commitefaad1e2857914b87307cf78903a957a604697a8 (patch)
treedadd0285727b324328166d06d86a6e1e6fb935cf /src/sync/connection.rs
parent91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff)
basic sync stub
Diffstat (limited to 'src/sync/connection.rs')
-rw-r--r--src/sync/connection.rs473
1 files changed, 0 insertions, 473 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
deleted file mode 100644
index 61a33f8..0000000
--- a/src/sync/connection.rs
+++ /dev/null
@@ -1,473 +0,0 @@
1//! WebSocket connection handling for sync
2//!
3//! Manages the connection to a source relay, subscribes to events using
4//! the three-layer filter strategy, and passes them through validation.
5//!
6//! ## Phase 2 Features
7//!
8//! - Three-layer filter subscriptions:
9//! 1. Layer 1: kinds 30617 + 30618 (announcements)
10//! 2. Layer 2: A/a tags for repository events
11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.)
12//!
13//! ## Phase 3 Features
14//!
15//! - Health tracking with success/failure reporting
16//! - Exponential backoff with health-aware delays
17//! - Dead relay detection and minimal retry
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates when new announcements/PRs arrive
22//! - Per-connection subscription tracking
23//! - Filter consolidation when count exceeds threshold (>150)
24//! - Duplicate subscription prevention
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use nostr_sdk::prelude::*;
30use tokio::sync::mpsc;
31
32use super::filter::FilterService;
33use super::health::RelayHealthTracker;
34use super::metrics::{event_source, SyncMetrics};
35use super::subscription::SubscriptionManager;
36
37/// Event received from the sync connection
38#[derive(Debug, Clone)]
39pub struct SyncedEvent {
40 pub event: Event,
41 pub source_url: String,
42}
43
44/// Manages a WebSocket connection to a single relay for syncing
45pub struct SyncConnection {
46 url: String,
47 client: Client,
48 filter_service: Arc<FilterService>,
49 remote_domain: String,
50 subscription_manager: SubscriptionManager,
51 metrics: Option<SyncMetrics>,
52}
53
54impl SyncConnection {
55 /// Create a new sync connection to the given relay URL
56 pub async fn new(
57 url: &str,
58 filter_service: Arc<FilterService>,
59 remote_domain: &str,
60 metrics: Option<SyncMetrics>,
61 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
62 let client = Client::default();
63
64 // Add the relay
65 client.add_relay(url).await?;
66
67 // Connect to the relay
68 client.connect().await;
69
70 tracing::info!("Sync connection established to {}", url);
71
72 // Create subscription manager for this connection
73 let subscription_manager =
74 SubscriptionManager::new(filter_service.clone(), remote_domain.to_string());
75
76 Ok(Self {
77 url: url.to_string(),
78 client,
79 filter_service,
80 remote_domain: remote_domain.to_string(),
81 subscription_manager,
82 metrics,
83 })
84 }
85
86 /// Start receiving events and send them through the channel
87 ///
88 /// This method runs indefinitely, handling events from all three filter layers.
89 /// Dynamic subscription updates are triggered when new announcements or PRs arrive.
90 pub async fn run(mut self, tx: mpsc::Sender<SyncedEvent>) {
91 // Subscribe to all three filter layers
92
93 // Layer 1: Announcement discovery (kinds 30617 + 30618)
94 let layer1_filters = self.filter_service.get_layer1_filters();
95 for filter in &layer1_filters {
96 match self.client.subscribe(filter.clone(), None).await {
97 Ok(output) => {
98 tracing::info!(
99 "Subscribed to Layer 1 (announcements) on {} (subscription: {})",
100 self.url,
101 output.id()
102 );
103 }
104 Err(e) => {
105 tracing::error!("Failed to subscribe Layer 1 on {}: {}", self.url, e);
106 }
107 }
108 }
109
110 // Layer 2: Repository events (A/a tags)
111 let layer2_filters = self
112 .filter_service
113 .get_layer2_filters(&self.remote_domain)
114 .await;
115 for filter in &layer2_filters {
116 match self.client.subscribe(filter.clone(), None).await {
117 Ok(output) => {
118 tracing::info!(
119 "Subscribed to Layer 2 (repo events) on {} (subscription: {})",
120 self.url,
121 output.id()
122 );
123 }
124 Err(e) => {
125 tracing::error!("Failed to subscribe Layer 2 on {}: {}", self.url, e);
126 }
127 }
128 }
129
130 // Layer 3: Related events (E/e tags)
131 let layer3_filters = self.filter_service.get_layer3_filters().await;
132 for filter in &layer3_filters {
133 match self.client.subscribe(filter.clone(), None).await {
134 Ok(output) => {
135 tracing::info!(
136 "Subscribed to Layer 3 (related events) on {} (subscription: {})",
137 self.url,
138 output.id()
139 );
140 }
141 Err(e) => {
142 tracing::error!("Failed to subscribe Layer 3 on {}: {}", self.url, e);
143 }
144 }
145 }
146
147 tracing::info!(
148 "Sync subscriptions active on {} (L1: {}, L2: {}, L3: {})",
149 self.url,
150 layer1_filters.len(),
151 layer2_filters.len(),
152 layer3_filters.len()
153 );
154
155 // Handle incoming notifications
156 let url = self.url.clone();
157 let metrics = self.metrics.clone();
158 self.client
159 .handle_notifications(|notification| {
160 let tx = tx.clone();
161 let url = url.clone();
162 let metrics = metrics.clone();
163 async move {
164 match notification {
165 RelayPoolNotification::Event { event, .. } => {
166 tracing::debug!(
167 "Received event {} from {} (kind {})",
168 event.id,
169 url,
170 event.kind.as_u16()
171 );
172
173 // Record live event metric
174 if let Some(ref m) = metrics {
175 m.record_event(event_source::LIVE);
176 }
177
178 // Send the event to the manager for processing
179 let synced = SyncedEvent {
180 event: (*event).clone(),
181 source_url: url.clone(),
182 };
183
184 if let Err(e) = tx.send(synced).await {
185 tracing::warn!("Failed to send synced event: {}", e);
186 return Ok(true); // Stop if channel is closed
187 }
188 }
189 RelayPoolNotification::Shutdown => {
190 tracing::warn!("Relay connection shutdown for {}", url);
191 return Ok(true); // Stop on shutdown
192 }
193 RelayPoolNotification::Message { message, .. } => {
194 tracing::trace!("Received message from {}: {:?}", url, message);
195 }
196 }
197 Ok(false) // Continue processing
198 }
199 })
200 .await
201 .ok();
202 }
203
204 /// Handle dynamic subscription updates based on incoming event kind
205 ///
206 /// - kind 30617/30618: New announcement → add Layer 2 subscription
207 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription
208 async fn handle_dynamic_subscription(&mut self, event: &Event) {
209 // Check if this is an announcement kind (triggers Layer 2 subscription)
210 if matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
211 if let Some(new_filters) = self.subscription_manager.add_announcement(event) {
212 tracing::info!(
213 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})",
214 event.id.to_hex(),
215 self.url,
216 new_filters.len(),
217 self.subscription_manager.get_filter_count()
218 );
219 self.subscribe_to_filters(new_filters, "Layer 2").await;
220 }
221 }
222
223 // Check if this is a Patch/PR/Issue kind (triggers Layer 3 subscription)
224 if matches!(
225 event.kind,
226 Kind::GitPatch | Kind::GitIssue | Kind::Custom(1618)
227 ) {
228 if let Some(new_filters) = self.subscription_manager.add_event(event) {
229 tracing::info!(
230 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})",
231 event.id.to_hex(),
232 self.url,
233 new_filters.len(),
234 self.subscription_manager.get_filter_count()
235 );
236 self.subscribe_to_filters(new_filters, "Layer 3").await;
237 }
238 }
239
240 // Check if we need to consolidate
241 if self.subscription_manager.should_consolidate() {
242 self.consolidate_subscriptions().await;
243 }
244 }
245
246 /// Subscribe to new filters
247 async fn subscribe_to_filters(&self, filters: Vec<Filter>, layer_name: &str) {
248 for filter in filters {
249 match self.client.subscribe(filter, None).await {
250 Ok(output) => {
251 tracing::debug!(
252 "Dynamic {} subscription on {} (subscription: {})",
253 layer_name,
254 self.url,
255 output.id()
256 );
257 }
258 Err(e) => {
259 tracing::warn!(
260 "Failed to add dynamic {} subscription on {}: {}",
261 layer_name,
262 self.url,
263 e
264 );
265 }
266 }
267 }
268 }
269
270 /// Consolidate subscriptions back to Layer 1 only
271 ///
272 /// This is triggered when the filter count exceeds 150.
273 /// All existing subscriptions are closed and only Layer 1 is re-subscribed.
274 async fn consolidate_subscriptions(&mut self) {
275 tracing::warn!(
276 "Filter count {} exceeds threshold, consolidating subscriptions on {}",
277 self.subscription_manager.get_filter_count(),
278 self.url
279 );
280
281 // Get consolidated filters (clears tracking and returns Layer 1 only)
282 let layer1_filters = self.subscription_manager.consolidate();
283
284 // Note: nostr-sdk doesn't provide a way to close all subscriptions easily
285 // The client will manage subscription count internally
286 // We just add the new Layer 1 subscription
287
288 for filter in layer1_filters {
289 match self.client.subscribe(filter, None).await {
290 Ok(output) => {
291 tracing::info!(
292 "Consolidated to Layer 1 subscription on {} (subscription: {})",
293 self.url,
294 output.id()
295 );
296 }
297 Err(e) => {
298 tracing::error!(
299 "Failed to subscribe Layer 1 after consolidation on {}: {}",
300 self.url,
301 e
302 );
303 }
304 }
305 }
306 }
307
308 /// Get the current filter count from the subscription manager
309 pub fn get_filter_count(&self) -> usize {
310 self.subscription_manager.get_filter_count()
311 }
312
313 /// Check if subscriptions have been consolidated
314 pub fn is_consolidated(&self) -> bool {
315 self.subscription_manager.is_consolidated()
316 }
317}
318
319/// Reconnect loop with health-aware exponential backoff
320///
321/// This function manages the connection lifecycle with health tracking:
322/// - Checks health state before attempting connections
323/// - Reports success/failure to the health tracker
324/// - Respects backoff delays from the health tracker
325/// - Handles dead relay detection (24h+ failures)
326///
327/// # Arguments
328/// * `url` - The relay URL to connect to
329/// * `tx` - Channel sender for synced events
330/// * `filter_service` - FilterService for building subscriptions
331/// * `our_domain` - Our relay's domain (used to extract remote domain)
332/// * `health_tracker` - Health tracker for managing connection state
333/// * `metrics` - Optional sync metrics for Prometheus
334pub async fn connect_with_retry(
335 url: &str,
336 tx: mpsc::Sender<SyncedEvent>,
337 filter_service: Arc<FilterService>,
338 _our_domain: &str,
339 health_tracker: Arc<RelayHealthTracker>,
340 metrics: Option<SyncMetrics>,
341) {
342 // Extract remote domain from URL
343 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
344
345 loop {
346 // Check if we should attempt connection based on health state
347 if !health_tracker.should_attempt_connection(url) {
348 // Wait for remaining backoff
349 if let Some(remaining) = health_tracker.get_remaining_backoff(url) {
350 tracing::debug!(
351 "Relay {} in backoff, waiting {:?} before retry",
352 url,
353 remaining
354 );
355 tokio::time::sleep(remaining).await;
356 continue;
357 }
358 }
359
360 // Log current health state for dead relays
361 if health_tracker.is_dead(url) {
362 tracing::info!(
363 "Attempting reconnection to dead relay {} (daily retry)",
364 url
365 );
366 }
367
368 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone())
369 .await
370 {
371 Ok(conn) => {
372 // Record successful connection
373 health_tracker.record_success(url);
374
375 // Record metrics
376 if let Some(ref m) = metrics {
377 m.record_connection_attempt(url, true);
378 m.set_relay_connected(url, true);
379 m.inc_connected_count();
380 m.record_health_state(url, health_tracker.get_state(url));
381 m.record_failure_count(url, 0);
382 }
383
384 tracing::info!("Sync connection established to {}", url);
385
386 // Run the connection (this blocks until disconnection)
387 conn.run(tx.clone()).await;
388
389 // Connection ended - record as failure for reconnection backoff
390 // (The connection ending is considered a failure even if it worked for a while)
391 health_tracker.record_failure(url);
392
393 // Update metrics for disconnection
394 if let Some(ref m) = metrics {
395 m.set_relay_connected(url, false);
396 m.dec_connected_count();
397 m.record_health_state(url, health_tracker.get_state(url));
398 m.record_failure_count(url, health_tracker.get_failure_count(url));
399 }
400
401 tracing::warn!("Sync connection to {} ended, will reconnect", url);
402 }
403 Err(e) => {
404 // Record connection failure
405 health_tracker.record_failure(url);
406
407 let failure_count = health_tracker.get_failure_count(url);
408 let state = health_tracker.get_state(url);
409
410 // Record metrics
411 if let Some(ref m) = metrics {
412 m.record_connection_attempt(url, false);
413 m.set_relay_connected(url, false);
414 m.record_health_state(url, state);
415 m.record_failure_count(url, failure_count);
416
417 // Track dead relays
418 if state == super::health::HealthState::Dead {
419 m.inc_dead_count();
420 }
421 }
422
423 tracing::error!(
424 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}",
425 url,
426 failure_count,
427 state,
428 e
429 );
430 }
431 }
432
433 // Get the backoff duration from health tracker
434 // If the health tracker has no backoff set (shouldn't happen), use a small default
435 let wait_duration = health_tracker
436 .get_remaining_backoff(url)
437 .unwrap_or(Duration::from_secs(5));
438
439 tracing::debug!("Waiting {:?} before reconnecting to {}", wait_duration, url);
440 tokio::time::sleep(wait_duration).await;
441 }
442}
443
444/// Extract domain from a URL
445fn extract_domain_from_url(url: &str) -> Option<String> {
446 let url = url
447 .trim_start_matches("ws://")
448 .trim_start_matches("wss://")
449 .trim_start_matches("http://")
450 .trim_start_matches("https://");
451
452 // Remove path
453 let domain = url.split('/').next()?;
454
455 Some(domain.to_string())
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_extract_domain() {
464 assert_eq!(
465 extract_domain_from_url("ws://127.0.0.1:8080"),
466 Some("127.0.0.1:8080".to_string())
467 );
468 assert_eq!(
469 extract_domain_from_url("wss://relay.example.com/path"),
470 Some("relay.example.com".to_string())
471 );
472 }
473}