upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/connection.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:43:49 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:43:49 +0000
commitdd403b17e7c74db9443d0891a9de1f0f0f9f89eb (patch)
tree177dd9f664dde3565492c1d11016dabfeda28bbc /src/sync/connection.rs
parent950c2e4e68448d2abcad90a31bfffaca6d7bc47e (diff)
feat(sync): Phase 6 - observability and production readiness
- Add SyncMetrics with full Prometheus integration - Track sync gaps via catchup events - Update Grafana dashboard with sync panels - Document all sync configuration options - Update design doc with implementation notes
Diffstat (limited to 'src/sync/connection.rs')
-rw-r--r--src/sync/connection.rs47
1 files changed, 46 insertions, 1 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index cd7a603..e921185 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -31,6 +31,7 @@ use tokio::sync::mpsc;
31 31
32use super::filter::FilterService; 32use super::filter::FilterService;
33use super::health::RelayHealthTracker; 33use super::health::RelayHealthTracker;
34use super::metrics::{event_source, SyncMetrics};
34use super::subscription::SubscriptionManager; 35use super::subscription::SubscriptionManager;
35 36
36/// Event received from the sync connection 37/// Event received from the sync connection
@@ -47,6 +48,7 @@ pub struct SyncConnection {
47 filter_service: Arc<FilterService>, 48 filter_service: Arc<FilterService>,
48 remote_domain: String, 49 remote_domain: String,
49 subscription_manager: SubscriptionManager, 50 subscription_manager: SubscriptionManager,
51 metrics: Option<SyncMetrics>,
50} 52}
51 53
52impl SyncConnection { 54impl SyncConnection {
@@ -55,6 +57,7 @@ impl SyncConnection {
55 url: &str, 57 url: &str,
56 filter_service: Arc<FilterService>, 58 filter_service: Arc<FilterService>,
57 remote_domain: &str, 59 remote_domain: &str,
60 metrics: Option<SyncMetrics>,
58 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { 61 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
59 let client = Client::default(); 62 let client = Client::default();
60 63
@@ -78,6 +81,7 @@ impl SyncConnection {
78 filter_service, 81 filter_service,
79 remote_domain: remote_domain.to_string(), 82 remote_domain: remote_domain.to_string(),
80 subscription_manager, 83 subscription_manager,
84 metrics,
81 }) 85 })
82 } 86 }
83 87
@@ -152,10 +156,12 @@ impl SyncConnection {
152 156
153 // Handle incoming notifications 157 // Handle incoming notifications
154 let url = self.url.clone(); 158 let url = self.url.clone();
159 let metrics = self.metrics.clone();
155 self.client 160 self.client
156 .handle_notifications(|notification| { 161 .handle_notifications(|notification| {
157 let tx = tx.clone(); 162 let tx = tx.clone();
158 let url = url.clone(); 163 let url = url.clone();
164 let metrics = metrics.clone();
159 async move { 165 async move {
160 match notification { 166 match notification {
161 RelayPoolNotification::Event { event, .. } => { 167 RelayPoolNotification::Event { event, .. } => {
@@ -166,6 +172,11 @@ impl SyncConnection {
166 event.kind.as_u16() 172 event.kind.as_u16()
167 ); 173 );
168 174
175 // Record live event metric
176 if let Some(ref m) = metrics {
177 m.record_event(event_source::LIVE);
178 }
179
169 // Send the event to the manager for processing 180 // Send the event to the manager for processing
170 let synced = SyncedEvent { 181 let synced = SyncedEvent {
171 event: (*event).clone(), 182 event: (*event).clone(),
@@ -320,12 +331,14 @@ impl SyncConnection {
320/// * `filter_service` - FilterService for building subscriptions 331/// * `filter_service` - FilterService for building subscriptions
321/// * `our_domain` - Our relay's domain (used to extract remote domain) 332/// * `our_domain` - Our relay's domain (used to extract remote domain)
322/// * `health_tracker` - Health tracker for managing connection state 333/// * `health_tracker` - Health tracker for managing connection state
334/// * `metrics` - Optional sync metrics for Prometheus
323pub async fn connect_with_retry( 335pub async fn connect_with_retry(
324 url: &str, 336 url: &str,
325 tx: mpsc::Sender<SyncedEvent>, 337 tx: mpsc::Sender<SyncedEvent>,
326 filter_service: Arc<FilterService>, 338 filter_service: Arc<FilterService>,
327 _our_domain: &str, 339 _our_domain: &str,
328 health_tracker: Arc<RelayHealthTracker>, 340 health_tracker: Arc<RelayHealthTracker>,
341 metrics: Option<SyncMetrics>,
329) { 342) {
330 // Extract remote domain from URL 343 // Extract remote domain from URL
331 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); 344 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
@@ -353,10 +366,20 @@ pub async fn connect_with_retry(
353 ); 366 );
354 } 367 }
355 368
356 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { 369 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await {
357 Ok(conn) => { 370 Ok(conn) => {
358 // Record successful connection 371 // Record successful connection
359 health_tracker.record_success(url); 372 health_tracker.record_success(url);
373
374 // Record metrics
375 if let Some(ref m) = metrics {
376 m.record_connection_attempt(url, true);
377 m.set_relay_connected(url, true);
378 m.inc_connected_count();
379 m.record_health_state(url, health_tracker.get_state(url));
380 m.record_failure_count(url, 0);
381 }
382
360 tracing::info!("Sync connection established to {}", url); 383 tracing::info!("Sync connection established to {}", url);
361 384
362 // Run the connection (this blocks until disconnection) 385 // Run the connection (this blocks until disconnection)
@@ -365,6 +388,15 @@ pub async fn connect_with_retry(
365 // Connection ended - record as failure for reconnection backoff 388 // Connection ended - record as failure for reconnection backoff
366 // (The connection ending is considered a failure even if it worked for a while) 389 // (The connection ending is considered a failure even if it worked for a while)
367 health_tracker.record_failure(url); 390 health_tracker.record_failure(url);
391
392 // Update metrics for disconnection
393 if let Some(ref m) = metrics {
394 m.set_relay_connected(url, false);
395 m.dec_connected_count();
396 m.record_health_state(url, health_tracker.get_state(url));
397 m.record_failure_count(url, health_tracker.get_failure_count(url));
398 }
399
368 tracing::warn!("Sync connection to {} ended, will reconnect", url); 400 tracing::warn!("Sync connection to {} ended, will reconnect", url);
369 } 401 }
370 Err(e) => { 402 Err(e) => {
@@ -373,6 +405,19 @@ pub async fn connect_with_retry(
373 405
374 let failure_count = health_tracker.get_failure_count(url); 406 let failure_count = health_tracker.get_failure_count(url);
375 let state = health_tracker.get_state(url); 407 let state = health_tracker.get_state(url);
408
409 // Record metrics
410 if let Some(ref m) = metrics {
411 m.record_connection_attempt(url, false);
412 m.set_relay_connected(url, false);
413 m.record_health_state(url, state);
414 m.record_failure_count(url, failure_count);
415
416 // Track dead relays
417 if state == super::health::HealthState::Dead {
418 m.inc_dead_count();
419 }
420 }
376 421
377 tracing::error!( 422 tracing::error!(
378 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", 423 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}",