From dd403b17e7c74db9443d0891a9de1f0f0f9f89eb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:43:49 +0000 Subject: feat(sync): Phase 6 - observability and production readiness - Add SyncMetrics with full Prometheus integration - Track sync gaps via catchup events - Update Grafana dashboard with sync panels - Document all sync configuration options - Update design doc with implementation notes --- src/sync/connection.rs | 47 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) (limited to 'src/sync/connection.rs') diff --git a/src/sync/connection.rs b/src/sync/connection.rs index cd7a603..e921185 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -31,6 +31,7 @@ use tokio::sync::mpsc; use super::filter::FilterService; use super::health::RelayHealthTracker; +use super::metrics::{event_source, SyncMetrics}; use super::subscription::SubscriptionManager; /// Event received from the sync connection @@ -47,6 +48,7 @@ pub struct SyncConnection { filter_service: Arc, remote_domain: String, subscription_manager: SubscriptionManager, + metrics: Option, } impl SyncConnection { @@ -55,6 +57,7 @@ impl SyncConnection { url: &str, filter_service: Arc, remote_domain: &str, + metrics: Option, ) -> Result> { let client = Client::default(); @@ -78,6 +81,7 @@ impl SyncConnection { filter_service, remote_domain: remote_domain.to_string(), subscription_manager, + metrics, }) } @@ -152,10 +156,12 @@ impl SyncConnection { // Handle incoming notifications let url = self.url.clone(); + let metrics = self.metrics.clone(); self.client .handle_notifications(|notification| { let tx = tx.clone(); let url = url.clone(); + let metrics = metrics.clone(); async move { match notification { RelayPoolNotification::Event { event, .. } => { @@ -166,6 +172,11 @@ impl SyncConnection { event.kind.as_u16() ); + // Record live event metric + if let Some(ref m) = metrics { + m.record_event(event_source::LIVE); + } + // Send the event to the manager for processing let synced = SyncedEvent { event: (*event).clone(), @@ -320,12 +331,14 @@ impl SyncConnection { /// * `filter_service` - FilterService for building subscriptions /// * `our_domain` - Our relay's domain (used to extract remote domain) /// * `health_tracker` - Health tracker for managing connection state +/// * `metrics` - Optional sync metrics for Prometheus pub async fn connect_with_retry( url: &str, tx: mpsc::Sender, filter_service: Arc, _our_domain: &str, health_tracker: Arc, + metrics: Option, ) { // Extract remote domain from URL let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); @@ -353,10 +366,20 @@ pub async fn connect_with_retry( ); } - match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { + match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await { Ok(conn) => { // Record successful connection health_tracker.record_success(url); + + // Record metrics + if let Some(ref m) = metrics { + m.record_connection_attempt(url, true); + m.set_relay_connected(url, true); + m.inc_connected_count(); + m.record_health_state(url, health_tracker.get_state(url)); + m.record_failure_count(url, 0); + } + tracing::info!("Sync connection established to {}", url); // Run the connection (this blocks until disconnection) @@ -365,6 +388,15 @@ pub async fn connect_with_retry( // Connection ended - record as failure for reconnection backoff // (The connection ending is considered a failure even if it worked for a while) health_tracker.record_failure(url); + + // Update metrics for disconnection + if let Some(ref m) = metrics { + m.set_relay_connected(url, false); + m.dec_connected_count(); + m.record_health_state(url, health_tracker.get_state(url)); + m.record_failure_count(url, health_tracker.get_failure_count(url)); + } + tracing::warn!("Sync connection to {} ended, will reconnect", url); } Err(e) => { @@ -373,6 +405,19 @@ pub async fn connect_with_retry( let failure_count = health_tracker.get_failure_count(url); let state = health_tracker.get_state(url); + + // Record metrics + if let Some(ref m) = metrics { + m.record_connection_attempt(url, false); + m.set_relay_connected(url, false); + m.record_health_state(url, state); + m.record_failure_count(url, failure_count); + + // Track dead relays + if state == super::health::HealthState::Dead { + m.inc_dead_count(); + } + } tracing::error!( "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", -- cgit v1.2.3