From dd403b17e7c74db9443d0891a9de1f0f0f9f89eb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:43:49 +0000 Subject: feat(sync): Phase 6 - observability and production readiness - Add SyncMetrics with full Prometheus integration - Track sync gaps via catchup events - Update Grafana dashboard with sync panels - Document all sync configuration options - Update design doc with implementation notes --- src/sync/manager.rs | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) (limited to 'src/sync/manager.rs') diff --git a/src/sync/manager.rs b/src/sync/manager.rs index f594454..3bc190d 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -35,6 +35,7 @@ use tokio::sync::mpsc; use super::connection::{connect_with_retry, SyncedEvent}; use super::filter::FilterService; use super::health::RelayHealthTracker; +use super::metrics::SyncMetrics; use super::SYNC_SOURCE_ADDR; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; @@ -54,6 +55,8 @@ pub struct SyncManager { write_policy: Nip34WritePolicy, /// Health tracker for relay connections health_tracker: Arc, + /// Sync metrics for Prometheus + metrics: Option, } impl SyncManager { @@ -78,6 +81,34 @@ impl SyncManager { database, write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), + metrics: None, + } + } + + /// Create a new SyncManager with metrics + /// + /// # Arguments + /// * `initial_relay_url` - Optional initial relay URL from config + /// * `relay_domain` - Our relay's domain (used to exclude self from sync) + /// * `database` - Shared database for storing events and querying announcements + /// * `write_policy` - Write policy for validating synced events + /// * `config` - Configuration for health tracking settings + /// * `metrics` - Sync metrics for Prometheus + pub fn with_metrics( + initial_relay_url: Option, + relay_domain: String, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + config: &Config, + metrics: SyncMetrics, + ) -> Self { + Self { + initial_relay_url, + relay_domain, + database, + write_policy, + health_tracker: Arc::new(RelayHealthTracker::new(config)), + metrics: Some(metrics), } } @@ -95,9 +126,20 @@ impl SyncManager { database, write_policy, health_tracker: Arc::new(RelayHealthTracker::with_defaults()), + metrics: None, } } + /// Set metrics for the sync manager + pub fn set_metrics(&mut self, metrics: SyncMetrics) { + self.metrics = Some(metrics); + } + + /// Get a reference to the metrics + pub fn metrics(&self) -> Option<&SyncMetrics> { + self.metrics.as_ref() + } + /// Get a reference to the health tracker pub fn health_tracker(&self) -> Arc { self.health_tracker.clone() @@ -148,6 +190,11 @@ impl SyncManager { } } + // Record initial tracked relay count + if let Some(ref metrics) = self.metrics { + metrics.set_tracked_count(active_relays.len() as i64); + } + // Spawn connections with startup jitter to prevent thundering herd for url in relays_to_connect { tracing::info!("Scheduling connection to sync relay: {}", url); @@ -172,6 +219,12 @@ impl SyncManager { if !active_relays.contains(&url) && !self.is_own_relay(&url) { tracing::info!("Discovered new relay from event, connecting: {}", url); active_relays.insert(url.clone()); + + // Update tracked relay count + if let Some(ref metrics) = self.metrics { + metrics.inc_tracked_count(); + } + // New relays discovered during runtime don't need jitter self.spawn_connection(url, tx.clone(), filter_service.clone()); } @@ -200,6 +253,7 @@ impl SyncManager { ) { let domain = self.relay_domain.clone(); let health_tracker = self.health_tracker.clone(); + let metrics = self.metrics.clone(); tokio::spawn(async move { // Apply startup jitter @@ -211,7 +265,7 @@ impl SyncManager { ); tokio::time::sleep(Duration::from_millis(jitter_ms)).await; - connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; + connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; }); } @@ -226,9 +280,10 @@ impl SyncManager { ) { let domain = self.relay_domain.clone(); let health_tracker = self.health_tracker.clone(); + let metrics = self.metrics.clone(); tokio::spawn(async move { - connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; + connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; }); } -- cgit v1.2.3