From 82cc74ade1524edc096608795b4e13c3cb19c5eb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 22:26:42 +0000 Subject: feat: create sync metrics module (Phase 1) --- src/sync/metrics.rs | 454 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 167 +------------------ 2 files changed, 458 insertions(+), 163 deletions(-) create mode 100644 src/sync/metrics.rs (limited to 'src/sync') diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs new file mode 100644 index 0000000..411ff63 --- /dev/null +++ b/src/sync/metrics.rs @@ -0,0 +1,454 @@ +//! Prometheus Metrics for Proactive Sync (GRASP-02) +//! +//! This module provides comprehensive sync monitoring metrics including: +//! - Connection status and attempts per relay +//! - Health state tracking (Healthy/Degraded/Dead) +//! - Event sync tracking by source (live/startup/reconnect/daily catchup) +//! - Gap events filled during catchup operations +//! +//! All metrics follow the `ngit_sync_` prefix convention. + +use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; + +use super::health::HealthState; + +/// Prometheus metrics for the proactive sync system. +/// +/// Tracks relay connections, sync progress, health states, and operational statistics. +/// Designed for comprehensive monitoring of GRASP-02 proactive sync operations. +#[derive(Clone)] +pub struct SyncMetrics { + // === Connection metrics === + /// Per-relay connection status (1=connected, 0=disconnected) + relay_connected: IntGaugeVec, + /// Connection attempts by relay and result (success/failure) + connection_attempts_total: IntCounterVec, + + // === Health metrics === + /// Per-relay health status (healthy=1, degraded=2, dead=3) + relay_status: IntGaugeVec, + /// Per-relay consecutive failure count + relay_failures: IntGaugeVec, + + // === Event metrics === + /// Events synced by source (live/startup/reconnect/daily) + events_total: IntCounterVec, + /// Gap events filled during catchup, by relay + gap_events_total: IntCounterVec, + + // === Summary metrics === + /// Total relays discovered and tracked + relays_tracked_total: IntGauge, + /// Currently connected relay count + relays_connected_total: IntGauge, + /// Relays marked as dead + relays_dead_total: IntGauge, +} + +impl SyncMetrics { + /// Register all sync metrics with the provided Prometheus registry. + /// + /// # Errors + /// + /// Returns an error if metrics are already registered (e.g., in tests). + pub fn register(registry: &Registry) -> Result { + // Connection metrics + let relay_connected = IntGaugeVec::new( + Opts::new( + "ngit_sync_relay_connected", + "Relay connection status (1=connected, 0=disconnected)", + ), + &["relay"], + )?; + registry.register(Box::new(relay_connected.clone()))?; + + let connection_attempts_total = IntCounterVec::new( + Opts::new( + "ngit_sync_connection_attempts_total", + "Total connection attempts by relay and result", + ), + &["relay", "result"], + )?; + registry.register(Box::new(connection_attempts_total.clone()))?; + + // Health metrics + let relay_status = IntGaugeVec::new( + Opts::new( + "ngit_sync_relay_status", + "Relay health status (1=healthy, 2=degraded, 3=dead)", + ), + &["relay"], + )?; + registry.register(Box::new(relay_status.clone()))?; + + let relay_failures = IntGaugeVec::new( + Opts::new( + "ngit_sync_relay_failures", + "Consecutive failure count per relay", + ), + &["relay"], + )?; + registry.register(Box::new(relay_failures.clone()))?; + + // Event metrics + let events_total = IntCounterVec::new( + Opts::new( + "ngit_sync_events_total", + "Total events synced by source type", + ), + &["source"], + )?; + registry.register(Box::new(events_total.clone()))?; + + let gap_events_total = IntCounterVec::new( + Opts::new( + "ngit_sync_gap_events_total", + "Gap events filled during catchup by relay", + ), + &["relay"], + )?; + registry.register(Box::new(gap_events_total.clone()))?; + + // Summary metrics + let relays_tracked_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_tracked_total", + "Total number of relays discovered and tracked", + ))?; + registry.register(Box::new(relays_tracked_total.clone()))?; + + let relays_connected_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_connected_total", + "Number of currently connected relays", + ))?; + registry.register(Box::new(relays_connected_total.clone()))?; + + let relays_dead_total = IntGauge::with_opts(Opts::new( + "ngit_sync_relays_dead_total", + "Number of relays marked as dead", + ))?; + registry.register(Box::new(relays_dead_total.clone()))?; + + Ok(Self { + relay_connected, + connection_attempts_total, + relay_status, + relay_failures, + events_total, + gap_events_total, + relays_tracked_total, + relays_connected_total, + relays_dead_total, + }) + } + + // === Connection Recording Methods === + + /// Record a connection attempt (success or failure). + /// + /// # Arguments + /// + /// * `relay` - The relay URL + /// * `success` - Whether the connection attempt succeeded + pub fn record_connection_attempt(&self, relay: &str, success: bool) { + let result = if success { "success" } else { "failure" }; + self.connection_attempts_total + .with_label_values(&[relay, result]) + .inc(); + } + + /// Set relay connection status. + /// + /// # Arguments + /// + /// * `relay` - The relay URL + /// * `connected` - Whether the relay is currently connected + pub fn set_relay_connected(&self, relay: &str, connected: bool) { + self.relay_connected + .with_label_values(&[relay]) + .set(if connected { 1 } else { 0 }); + + // Note: Connected count should be updated via update_connected_count() for accuracy + } + + /// Update the total connected relay count. + /// + /// This directly sets the count rather than deriving it from individual relay states, + /// which is more accurate when relay connection states are managed elsewhere. + pub fn update_connected_count(&self, count: i64) { + self.relays_connected_total.set(count); + } + + /// Increment connected count by one. + pub fn inc_connected_count(&self) { + self.relays_connected_total.inc(); + } + + /// Decrement connected count by one. + pub fn dec_connected_count(&self) { + self.relays_connected_total.dec(); + } + + // === Health Recording Methods === + + /// Record relay health state change. + /// + /// Maps health states to numeric values for Prometheus: + /// - Healthy = 1 + /// - Degraded = 2 + /// - Dead = 3 + /// + /// # Arguments + /// + /// * `relay` - The relay URL + /// * `state` - The current health state + pub fn record_health_state(&self, relay: &str, state: HealthState) { + let state_value = match state { + HealthState::Healthy => 1, + HealthState::Degraded => 2, + HealthState::Dead => 3, + }; + self.relay_status.with_label_values(&[relay]).set(state_value); + } + + /// Record relay failure count. + /// + /// # Arguments + /// + /// * `relay` - The relay URL + /// * `count` - The number of consecutive failures + pub fn record_failure_count(&self, relay: &str, count: u32) { + self.relay_failures + .with_label_values(&[relay]) + .set(count as i64); + } + + /// Update dead relay count. + pub fn update_dead_count(&self, count: i64) { + self.relays_dead_total.set(count); + } + + /// Increment dead relay count by one. + pub fn inc_dead_count(&self) { + self.relays_dead_total.inc(); + } + + /// Decrement dead relay count by one. + pub fn dec_dead_count(&self) { + self.relays_dead_total.dec(); + } + + // === Event Recording Methods === + + /// Record a synced event by source type. + /// + /// # Arguments + /// + /// * `source` - The event source type. Use constants from [`event_source`]: + /// - [`event_source::LIVE`] - Real-time subscription events + /// - [`event_source::STARTUP`] - Events from startup catchup + /// - [`event_source::RECONNECT`] - Events from reconnection catchup + /// - [`event_source::DAILY`] - Events from daily catchup + pub fn record_event(&self, source: &str) { + self.events_total.with_label_values(&[source]).inc(); + } + + /// Record multiple events synced by source type. + /// + /// # Arguments + /// + /// * `source` - The event source type (see [`record_event`](Self::record_event)) + /// * `count` - Number of events to record + pub fn record_events(&self, source: &str, count: u64) { + self.events_total + .with_label_values(&[source]) + .inc_by(count); + } + + /// Record a gap event filled during catchup. + /// + /// Gap events are historical events discovered during catchup that weren't + /// received during live sync. + /// + /// # Arguments + /// + /// * `relay` - The relay URL from which the gap event was received + pub fn record_gap_event(&self, relay: &str) { + self.gap_events_total.with_label_values(&[relay]).inc(); + } + + /// Record multiple gap events filled during catchup. + /// + /// # Arguments + /// + /// * `relay` - The relay URL from which the gap events were received + /// * `count` - Number of gap events to record + pub fn record_gap_events(&self, relay: &str, count: u64) { + self.gap_events_total + .with_label_values(&[relay]) + .inc_by(count); + } + + // === Summary Recording Methods === + + /// Set the total tracked relay count. + pub fn set_tracked_count(&self, count: i64) { + self.relays_tracked_total.set(count); + } + + /// Increment tracked relay count by one. + pub fn inc_tracked_count(&self) { + self.relays_tracked_total.inc(); + } + + /// Get current tracked relay count. + pub fn get_tracked_count(&self) -> i64 { + self.relays_tracked_total.get() + } + + /// Get current connected relay count. + pub fn get_connected_count(&self) -> i64 { + self.relays_connected_total.get() + } + + /// Get current dead relay count. + pub fn get_dead_count(&self) -> i64 { + self.relays_dead_total.get() + } +} + +/// Event source types for metrics tracking. +/// +/// These constants are used as labels for the `ngit_sync_events_total` metric +/// to categorize events by how they were discovered. +pub mod event_source { + /// Real-time subscription events received during live sync. + pub const LIVE: &str = "live"; + + /// Events from startup catchup when the relay first starts. + pub const STARTUP: &str = "startup"; + + /// Events from reconnection catchup after a relay reconnects. + pub const RECONNECT: &str = "reconnect"; + + /// Events from daily catchup for drift detection. + pub const DAILY: &str = "daily"; +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_test_registry() -> Registry { + Registry::new() + } + + #[test] + fn test_metrics_registration() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry); + assert!(metrics.is_ok()); + } + + #[test] + fn test_connection_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + // Record connection attempts + metrics.record_connection_attempt("wss://relay1.example.com", true); + metrics.record_connection_attempt("wss://relay1.example.com", false); + metrics.record_connection_attempt("wss://relay2.example.com", true); + + // Set relay connection status + metrics.set_relay_connected("wss://relay1.example.com", true); + metrics.inc_connected_count(); + + assert_eq!(metrics.get_connected_count(), 1); + + // Test decrement + metrics.dec_connected_count(); + assert_eq!(metrics.get_connected_count(), 0); + } + + #[test] + fn test_health_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + // Record health states + metrics.record_health_state("wss://relay1.example.com", HealthState::Healthy); + metrics.record_health_state("wss://relay2.example.com", HealthState::Degraded); + metrics.record_health_state("wss://relay3.example.com", HealthState::Dead); + + // Record failure count + metrics.record_failure_count("wss://relay2.example.com", 5); + + // Test dead count tracking + metrics.update_dead_count(1); + assert_eq!(metrics.get_dead_count(), 1); + + metrics.inc_dead_count(); + assert_eq!(metrics.get_dead_count(), 2); + + metrics.dec_dead_count(); + assert_eq!(metrics.get_dead_count(), 1); + } + + #[test] + fn test_event_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + // Record single events + metrics.record_event(event_source::LIVE); + metrics.record_event(event_source::STARTUP); + metrics.record_event(event_source::RECONNECT); + metrics.record_event(event_source::DAILY); + + // Record multiple events + metrics.record_events(event_source::STARTUP, 10); + + // Record gap events + metrics.record_gap_event("wss://relay1.example.com"); + metrics.record_gap_events("wss://relay2.example.com", 5); + } + + #[test] + fn test_summary_metrics() { + let registry = create_test_registry(); + let metrics = SyncMetrics::register(®istry).unwrap(); + + // Test tracked count + metrics.set_tracked_count(5); + assert_eq!(metrics.get_tracked_count(), 5); + + metrics.inc_tracked_count(); + assert_eq!(metrics.get_tracked_count(), 6); + + // Test connected count + metrics.update_connected_count(3); + assert_eq!(metrics.get_connected_count(), 3); + } + + #[test] + fn test_event_source_constants() { + // Verify constants have expected values + assert_eq!(event_source::LIVE, "live"); + assert_eq!(event_source::STARTUP, "startup"); + assert_eq!(event_source::RECONNECT, "reconnect"); + assert_eq!(event_source::DAILY, "daily"); + } + + #[test] + fn test_duplicate_registration_fails() { + let registry = create_test_registry(); + + // First registration should succeed + let metrics1 = SyncMetrics::register(®istry); + assert!(metrics1.is_ok()); + + // Second registration should fail (metrics already registered) + let metrics2 = SyncMetrics::register(®istry); + assert!(metrics2.is_err()); + } +} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1e60e4a..dd0479c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -15,12 +15,16 @@ pub mod algorithms; pub mod filters; pub mod health; +pub mod metrics; pub mod relay_connection; pub mod self_subscriber; // Re-export core algorithm types pub use algorithms::{AddFilters, RelaySyncNeeds}; +// Re-export metrics types +pub use metrics::{event_source, SyncMetrics}; + // Re-export relay connection types pub use relay_connection::{RelayConnection, RelayEvent}; @@ -35,7 +39,6 @@ use std::sync::Arc; use std::time::Duration; use nostr_sdk::prelude::*; -use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; use tokio::sync::{broadcast, Mutex, RwLock}; use crate::config::Config; @@ -156,168 +159,6 @@ pub struct PendingItems { pub root_events: HashSet, } -// ============================================================================= -// SyncMetrics - Prometheus Metrics for Sync System -// ============================================================================= - -/// Prometheus metrics for the proactive sync system. -/// -/// Tracks relay connections, sync progress, and operational statistics. -/// Following the comprehensive v3 metrics design. -#[derive(Clone)] -pub struct SyncMetrics { - // === Connection metrics === - /// Per-relay connection status (1=connected, 0=disconnected) - relay_connected: IntGaugeVec, - /// Connection attempts by relay and result (success/failure) - connection_attempts_total: IntCounterVec, - - // === Event metrics === - /// Events synced by source (live/startup/reconnect/daily) - events_total: IntCounterVec, - - // === Summary metrics === - /// Total relays discovered and tracked - relays_tracked_total: IntGauge, - /// Currently connected relay count - relays_connected_total: IntGauge, -} - -impl SyncMetrics { - /// Register sync metrics with a Prometheus registry. - /// - /// Returns an error if metrics are already registered (e.g., in tests). - pub fn register(registry: &Registry) -> Result { - // Connection metrics - let relay_connected = IntGaugeVec::new( - Opts::new( - "ngit_sync_relay_connected", - "Relay connection status (1=connected, 0=disconnected)", - ), - &["relay"], - )?; - registry.register(Box::new(relay_connected.clone()))?; - - let connection_attempts_total = IntCounterVec::new( - Opts::new( - "ngit_sync_connection_attempts_total", - "Total connection attempts by relay and result", - ), - &["relay", "result"], - )?; - registry.register(Box::new(connection_attempts_total.clone()))?; - - // Event metrics - let events_total = IntCounterVec::new( - Opts::new( - "ngit_sync_events_total", - "Total events synced by source type", - ), - &["source"], - )?; - registry.register(Box::new(events_total.clone()))?; - - // Summary metrics - let relays_tracked_total = IntGauge::with_opts(Opts::new( - "ngit_sync_relays_tracked_total", - "Total number of relays discovered and tracked", - ))?; - registry.register(Box::new(relays_tracked_total.clone()))?; - - let relays_connected_total = IntGauge::with_opts(Opts::new( - "ngit_sync_relays_connected_total", - "Number of currently connected relays", - ))?; - registry.register(Box::new(relays_connected_total.clone()))?; - - Ok(Self { - relay_connected, - connection_attempts_total, - events_total, - relays_tracked_total, - relays_connected_total, - }) - } - - // === Connection Recording Methods === - - /// Record a connection attempt (success or failure) - pub fn record_connection_attempt(&self, relay: &str, success: bool) { - let result = if success { "success" } else { "failure" }; - self.connection_attempts_total - .with_label_values(&[relay, result]) - .inc(); - } - - /// Set relay connection status - pub fn set_relay_connected(&self, relay: &str, connected: bool) { - self.relay_connected - .with_label_values(&[relay]) - .set(if connected { 1 } else { 0 }); - } - - /// Increment connected count - pub fn inc_connected_count(&self) { - self.relays_connected_total.inc(); - } - - /// Decrement connected count - pub fn dec_connected_count(&self) { - self.relays_connected_total.dec(); - } - - // === Event Recording Methods === - - /// Record a synced event by source type - /// - /// Source types: - /// - "live" - Real-time subscription events - /// - "startup" - Events from startup catchup - /// - "reconnect" - Events from reconnection catchup - pub fn record_event(&self, source: &str) { - self.events_total.with_label_values(&[source]).inc(); - } - - /// Record multiple events synced by source type - pub fn record_events(&self, source: &str, count: u64) { - self.events_total - .with_label_values(&[source]) - .inc_by(count); - } - - // === Summary Recording Methods === - - /// Set the total tracked relay count - pub fn set_tracked_count(&self, count: i64) { - self.relays_tracked_total.set(count); - } - - /// Increment tracked relay count - pub fn inc_tracked_count(&self) { - self.relays_tracked_total.inc(); - } - - /// Get current tracked relay count - pub fn get_tracked_count(&self) -> i64 { - self.relays_tracked_total.get() - } - - /// Get current connected relay count - pub fn get_connected_count(&self) -> i64 { - self.relays_connected_total.get() - } -} - -/// Event source types for metrics tracking -pub mod event_source { - /// Real-time subscription events - pub const LIVE: &str = "live"; - /// Events from startup catchup - pub const STARTUP: &str = "startup"; - /// Events from reconnection catchup - pub const RECONNECT: &str = "reconnect"; -} - // ============================================================================= // SyncManager - Main Entry Point // ============================================================================= -- cgit v1.2.3