From dd403b17e7c74db9443d0891a9de1f0f0f9f89eb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:43:49 +0000 Subject: feat(sync): Phase 6 - observability and production readiness - Add SyncMetrics with full Prometheus integration - Track sync gaps via catchup events - Update Grafana dashboard with sync panels - Document all sync configuration options - Update design doc with implementation notes --- src/sync/connection.rs | 47 ++++++- src/sync/manager.rs | 59 ++++++++- src/sync/metrics.rs | 348 +++++++++++++++++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 2 + 4 files changed, 453 insertions(+), 3 deletions(-) create mode 100644 src/sync/metrics.rs (limited to 'src/sync') diff --git a/src/sync/connection.rs b/src/sync/connection.rs index cd7a603..e921185 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -31,6 +31,7 @@ use tokio::sync::mpsc; use super::filter::FilterService; use super::health::RelayHealthTracker; +use super::metrics::{event_source, SyncMetrics}; use super::subscription::SubscriptionManager; /// Event received from the sync connection @@ -47,6 +48,7 @@ pub struct SyncConnection { filter_service: Arc, remote_domain: String, subscription_manager: SubscriptionManager, + metrics: Option, } impl SyncConnection { @@ -55,6 +57,7 @@ impl SyncConnection { url: &str, filter_service: Arc, remote_domain: &str, + metrics: Option, ) -> Result> { let client = Client::default(); @@ -78,6 +81,7 @@ impl SyncConnection { filter_service, remote_domain: remote_domain.to_string(), subscription_manager, + metrics, }) } @@ -152,10 +156,12 @@ impl SyncConnection { // Handle incoming notifications let url = self.url.clone(); + let metrics = self.metrics.clone(); self.client .handle_notifications(|notification| { let tx = tx.clone(); let url = url.clone(); + let metrics = metrics.clone(); async move { match notification { RelayPoolNotification::Event { event, .. } => { @@ -166,6 +172,11 @@ impl SyncConnection { event.kind.as_u16() ); + // Record live event metric + if let Some(ref m) = metrics { + m.record_event(event_source::LIVE); + } + // Send the event to the manager for processing let synced = SyncedEvent { event: (*event).clone(), @@ -320,12 +331,14 @@ impl SyncConnection { /// * `filter_service` - FilterService for building subscriptions /// * `our_domain` - Our relay's domain (used to extract remote domain) /// * `health_tracker` - Health tracker for managing connection state +/// * `metrics` - Optional sync metrics for Prometheus pub async fn connect_with_retry( url: &str, tx: mpsc::Sender, filter_service: Arc, _our_domain: &str, health_tracker: Arc, + metrics: Option, ) { // Extract remote domain from URL let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); @@ -353,10 +366,20 @@ pub async fn connect_with_retry( ); } - match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { + match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await { Ok(conn) => { // Record successful connection health_tracker.record_success(url); + + // Record metrics + if let Some(ref m) = metrics { + m.record_connection_attempt(url, true); + m.set_relay_connected(url, true); + m.inc_connected_count(); + m.record_health_state(url, health_tracker.get_state(url)); + m.record_failure_count(url, 0); + } + tracing::info!("Sync connection established to {}", url); // Run the connection (this blocks until disconnection) @@ -365,6 +388,15 @@ pub async fn connect_with_retry( // Connection ended - record as failure for reconnection backoff // (The connection ending is considered a failure even if it worked for a while) health_tracker.record_failure(url); + + // Update metrics for disconnection + if let Some(ref m) = metrics { + m.set_relay_connected(url, false); + m.dec_connected_count(); + m.record_health_state(url, health_tracker.get_state(url)); + m.record_failure_count(url, health_tracker.get_failure_count(url)); + } + tracing::warn!("Sync connection to {} ended, will reconnect", url); } Err(e) => { @@ -373,6 +405,19 @@ pub async fn connect_with_retry( let failure_count = health_tracker.get_failure_count(url); let state = health_tracker.get_state(url); + + // Record metrics + if let Some(ref m) = metrics { + m.record_connection_attempt(url, false); + m.set_relay_connected(url, false); + m.record_health_state(url, state); + m.record_failure_count(url, failure_count); + + // Track dead relays + if state == super::health::HealthState::Dead { + m.inc_dead_count(); + } + } tracing::error!( "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", diff --git a/src/sync/manager.rs b/src/sync/manager.rs index f594454..3bc190d 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -35,6 +35,7 @@ use tokio::sync::mpsc; use super::connection::{connect_with_retry, SyncedEvent}; use super::filter::FilterService; use super::health::RelayHealthTracker; +use super::metrics::SyncMetrics; use super::SYNC_SOURCE_ADDR; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; @@ -54,6 +55,8 @@ pub struct SyncManager { write_policy: Nip34WritePolicy, /// Health tracker for relay connections health_tracker: Arc, + /// Sync metrics for Prometheus + metrics: Option, } impl SyncManager { @@ -78,6 +81,34 @@ impl SyncManager { database, write_policy, health_tracker: Arc::new(RelayHealthTracker::new(config)), + metrics: None, + } + } + + /// Create a new SyncManager with metrics + /// + /// # Arguments + /// * `initial_relay_url` - Optional initial relay URL from config + /// * `relay_domain` - Our relay's domain (used to exclude self from sync) + /// * `database` - Shared database for storing events and querying announcements + /// * `write_policy` - Write policy for validating synced events + /// * `config` - Configuration for health tracking settings + /// * `metrics` - Sync metrics for Prometheus + pub fn with_metrics( + initial_relay_url: Option, + relay_domain: String, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + config: &Config, + metrics: SyncMetrics, + ) -> Self { + Self { + initial_relay_url, + relay_domain, + database, + write_policy, + health_tracker: Arc::new(RelayHealthTracker::new(config)), + metrics: Some(metrics), } } @@ -95,9 +126,20 @@ impl SyncManager { database, write_policy, health_tracker: Arc::new(RelayHealthTracker::with_defaults()), + metrics: None, } } + /// Set metrics for the sync manager + pub fn set_metrics(&mut self, metrics: SyncMetrics) { + self.metrics = Some(metrics); + } + + /// Get a reference to the metrics + pub fn metrics(&self) -> Option<&SyncMetrics> { + self.metrics.as_ref() + } + /// Get a reference to the health tracker pub fn health_tracker(&self) -> Arc { self.health_tracker.clone() @@ -148,6 +190,11 @@ impl SyncManager { } } + // Record initial tracked relay count + if let Some(ref metrics) = self.metrics { + metrics.set_tracked_count(active_relays.len() as i64); + } + // Spawn connections with startup jitter to prevent thundering herd for url in relays_to_connect { tracing::info!("Scheduling connection to sync relay: {}", url); @@ -172,6 +219,12 @@ impl SyncManager { if !active_relays.contains(&url) && !self.is_own_relay(&url) { tracing::info!("Discovered new relay from event, connecting: {}", url); active_relays.insert(url.clone()); + + // Update tracked relay count + if let Some(ref metrics) = self.metrics { + metrics.inc_tracked_count(); + } + // New relays discovered during runtime don't need jitter self.spawn_connection(url, tx.clone(), filter_service.clone()); } @@ -200,6 +253,7 @@ impl SyncManager { ) { let domain = self.relay_domain.clone(); let health_tracker = self.health_tracker.clone(); + let metrics = self.metrics.clone(); tokio::spawn(async move { // Apply startup jitter @@ -211,7 +265,7 @@ impl SyncManager { ); tokio::time::sleep(Duration::from_millis(jitter_ms)).await; - connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; + connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; }); } @@ -226,9 +280,10 @@ impl SyncManager { ) { let domain = self.relay_domain.clone(); let health_tracker = self.health_tracker.clone(); + let metrics = self.metrics.clone(); tokio::spawn(async move { - connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; + connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; }); } diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs new file mode 100644 index 0000000..c93e583 --- /dev/null +++ b/src/sync/metrics.rs @@ -0,0 +1,348 @@ +//! Prometheus Metrics for Proactive Sync (GRASP-02 Phase 6) +//! +//! This module provides comprehensive sync monitoring metrics including: +//! - Connection status and attempts per relay +//! - Health state tracking (Healthy/Degraded/Dead) +//! - Event sync tracking by source (live/startup/reconnect/daily catchup) +//! - Gap events filled during catchup operations +//! +//! All metrics follow the `ngit_sync_` prefix convention. + +use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; + +use super::health::HealthState; + +/// Prometheus metrics for the proactive sync system +#[derive(Clone)] +pub struct SyncMetrics { + // === Connection metrics === + /// Per-relay connection status (1=connected, 0=disconnected) + relay_connected: IntGaugeVec, + /// Connection attempts by relay and result (success/failure) + connection_attempts_total: IntCounterVec, + + // === Health metrics === + /// Per-relay health status (healthy=1, degraded=2, dead=3) + relay_status: IntGaugeVec, + /// Per-relay consecutive failure count + relay_failures: IntGaugeVec, + + // === Event metrics === + /// Events synced by source (live/startup/reconnect/daily) + events_total: IntCounterVec, + /// Gap events filled during catchup, by relay + gap_events_total: IntCounterVec, + + // === Summary metrics === + /// Total relays discovered and tracked + relays_tracked_total: IntGauge, + /// Currently connected relay count + relays_connected_total: IntGauge, + /// Relays marked as dead + relays_dead_total: IntGauge, +} + +impl SyncMetrics { + /// Register all sync metrics with the provided Prometheus registry + pub fn register(registry: &Registry) -> Result { + // Connection metrics + let relay_connected = IntGaugeVec::new( + Opts::new( + "ngit_sync_relay_connected", + "Relay connection status (1=connected, 0=disconnected)", + ), + &["relay"], + )?; + registry.register(Box::new(relay_connected.clone()))?; + + let connection_attempts_total = IntCounterVec::new( + Opts::new( + "ngit_sync_connection_attempts_total", + "Total connection attempts by relay and result", + ), + &["relay", "result"], + )?; + registry.register(Box::new(connection_attempts_total.clone()))?; + + // Health metrics + let relay_status = IntGaugeVec::new( + Opts::new( + "ngit_sync_relay_status", + "Relay health status (1=healthy, 2=degraded, 3=dead)", + ), + &["relay"], + )?; + registry.register(Box::new(relay_status.clone()))?; + + let relay_failures = IntGaugeVec::new( + Opts::new( + "ngit_sync_relay_failures", + "Consecutive failure count per relay", + ), + &["relay"], + )?; + registry.register(Box::new(relay_failures.clone()))?; + + // Event metrics + let events_total = IntCounterVec::new( + Opts::new( + "ngit_sync_events_total", + "Total events synced by source type", + ), + &["source"], + )?; + registry.register(Box::new(events_total.clone()))?; + + let gap_events_total = IntCounterVec::new( + Opts::new( + "ngit_sync_gap_events_total", + "Gap events filled during catchup by relay", + ), + &["relay"], + )?; + registry.register(Box::new(gap_events_total.clone()))?; + + // Summary metrics + let relays_tracked_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_tracked_total", + "Total number of relays discovered and tracked", + ))?; + registry.register(Box::new(relays_tracked_total.clone()))?; + + let relays_connected_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_connected_total", + "Number of currently connected relays", + ))?; + registry.register(Box::new(relays_connected_total.clone()))?; + + let relays_dead_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_dead_total", + "Number of relays marked as dead", + ))?; + registry.register(Box::new(relays_dead_total.clone()))?; + + Ok(Self { + relay_connected, + connection_attempts_total, + relay_status, + relay_failures, + events_total, + gap_events_total, + relays_tracked_total, + relays_connected_total, + relays_dead_total, + }) + } + + // === Connection Recording Methods === + + /// Record a connection attempt (success or failure) + pub fn record_connection_attempt(&self, relay: &str, success: bool) { + let result = if success { "success" } else { "failure" }; + self.connection_attempts_total + .with_label_values(&[relay, result]) + .inc(); + } + + /// Set relay connection status + pub fn set_relay_connected(&self, relay: &str, connected: bool) { + self.relay_connected + .with_label_values(&[relay]) + .set(if connected { 1 } else { 0 }); + + // Update connected count based on all relay values + // This is handled by update_connected_count() for accuracy + } + + /// Update the total connected relay count + pub fn update_connected_count(&self, count: i64) { + self.relays_connected_total.set(count); + } + + /// Increment connected count + pub fn inc_connected_count(&self) { + self.relays_connected_total.inc(); + } + + /// Decrement connected count + pub fn dec_connected_count(&self) { + self.relays_connected_total.dec(); + } + + // === Health Recording Methods === + + /// Record relay health state change + pub fn record_health_state(&self, relay: &str, state: HealthState) { + let state_value = match state { + HealthState::Healthy => 1, + HealthState::Degraded => 2, + HealthState::Dead => 3, + }; + self.relay_status.with_label_values(&[relay]).set(state_value); + } + + /// Record relay failure count + pub fn record_failure_count(&self, relay: &str, count: u32) { + self.relay_failures + .with_label_values(&[relay]) + .set(count as i64); + } + + /// Update dead relay count + pub fn update_dead_count(&self, count: i64) { + self.relays_dead_total.set(count); + } + + /// Increment dead relay count + pub fn inc_dead_count(&self) { + self.relays_dead_total.inc(); + } + + /// Decrement dead relay count + pub fn dec_dead_count(&self) { + self.relays_dead_total.dec(); + } + + // === Event Recording Methods === + + /// Record a synced event by source type + /// + /// Source types: + /// - "live" - Real-time subscription events + /// - "startup" - Events from startup catchup + /// - "reconnect" - Events from reconnection catchup + /// - "daily" - Events from daily catchup + pub fn record_event(&self, source: &str) { + self.events_total.with_label_values(&[source]).inc(); + } + + /// Record multiple events synced by source type + pub fn record_events(&self, source: &str, count: u64) { + self.events_total + .with_label_values(&[source]) + .inc_by(count); + } + + /// Record a gap event filled during catchup + pub fn record_gap_event(&self, relay: &str) { + self.gap_events_total.with_label_values(&[relay]).inc(); + } + + /// Record multiple gap events filled during catchup + pub fn record_gap_events(&self, relay: &str, count: u64) { + self.gap_events_total + .with_label_values(&[relay]) + .inc_by(count); + } + + // === Summary Recording Methods === + + /// Set the total tracked relay count + pub fn set_tracked_count(&self, count: i64) { + self.relays_tracked_total.set(count); + } + + /// Increment tracked relay count + pub fn inc_tracked_count(&self) { + self.relays_tracked_total.inc(); + } + + /// Get current tracked relay count + pub fn get_tracked_count(&self) -> i64 { + self.relays_tracked_total.get() + } + + /// Get current connected relay count + pub fn get_connected_count(&self) -> i64 { + self.relays_connected_total.get() + } + + /// Get current dead relay count + pub fn get_dead_count(&self) -> i64 { + self.relays_dead_total.get() + } +} + +/// Event source types for metrics tracking +pub mod event_source { + /// Real-time subscription events + pub const LIVE: &str = "live"; + /// Events from startup catchup + pub const STARTUP: &str = "startup"; + /// Events from reconnection catchup + pub const RECONNECT: &str = "reconnect"; + /// Events from daily catchup + pub const DAILY: &str = "daily"; +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_test_registry() -> Registry { + Registry::new() + } + + #[test] + fn test_metrics_registration() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry); + assert!(metrics.is_ok()); + } + + #[test] + fn test_connection_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + metrics.record_connection_attempt("wss://relay1.example.com", true); + metrics.record_connection_attempt("wss://relay1.example.com", false); + metrics.record_connection_attempt("wss://relay2.example.com", true); + + metrics.set_relay_connected("wss://relay1.example.com", true); + metrics.inc_connected_count(); + + assert_eq!(metrics.get_connected_count(), 1); + } + + #[test] + fn test_health_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + metrics.record_health_state("wss://relay1.example.com", HealthState::Healthy); + metrics.record_health_state("wss://relay2.example.com", HealthState::Degraded); + metrics.record_health_state("wss://relay3.example.com", HealthState::Dead); + + metrics.record_failure_count("wss://relay2.example.com", 5); + metrics.update_dead_count(1); + + assert_eq!(metrics.get_dead_count(), 1); + } + + #[test] + fn test_event_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + metrics.record_event(event_source::LIVE); + metrics.record_events(event_source::STARTUP, 10); + metrics.record_gap_event("wss://relay1.example.com"); + metrics.record_gap_events("wss://relay2.example.com", 5); + } + + #[test] + fn test_summary_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + metrics.set_tracked_count(5); + assert_eq!(metrics.get_tracked_count(), 5); + + metrics.inc_tracked_count(); + assert_eq!(metrics.get_tracked_count(), 6); + + metrics.update_connected_count(3); + assert_eq!(metrics.get_connected_count(), 3); + } +} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index dc11812..67d389e 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -21,12 +21,14 @@ mod connection; mod filter; pub mod health; mod manager; +pub mod metrics; pub mod negentropy; mod subscription; pub use filter::FilterService; pub use health::{HealthState, RelayHealth, RelayHealthTracker}; pub use manager::SyncManager; +pub use metrics::SyncMetrics; pub use negentropy::NegentropyService; pub use subscription::SubscriptionManager; -- cgit v1.2.3