From 6b03c60d06639d1e5eb7226b809f41fa1e27c4a7 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 09:32:23 +0000 Subject: Phase 1: Add ParsedMetrics and MetricsTestHarness infrastructure --- tests/common/sync_helpers.rs | 314 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 314 insertions(+) (limited to 'tests/common/sync_helpers.rs') diff --git a/tests/common/sync_helpers.rs b/tests/common/sync_helpers.rs index 7788783..531ebe1 100644 --- a/tests/common/sync_helpers.rs +++ b/tests/common/sync_helpers.rs @@ -10,10 +10,13 @@ //! - Use `Tag::custom(TagKind::custom("name"), vec![...])` syntax //! - Use `EventBuilder::new(kind, content).tags(tags)` syntax +use std::collections::HashMap; use std::time::Duration; use nostr_sdk::prelude::*; +use super::relay::TestRelay; + /// Kind 1618 - Issue (NIP-34 git-related event) pub const KIND_ISSUE: u16 = 1618; @@ -592,6 +595,280 @@ pub async fn fetch_metrics(relay_url: &str) -> Result { reqwest::get(&http_url).await?.text().await } +// ============================================================================ +// Prometheus Metrics Parser +// ============================================================================ + +/// A single metric value with its labels +#[derive(Debug, Clone)] +struct MetricValue { + labels: HashMap, + value: f64, +} + +/// Parsed Prometheus metrics with typed accessors. +/// +/// Parses Prometheus text format and provides strongly-typed access +/// to metric values with label filtering support. +/// +/// # Example +/// ```ignore +/// let metrics = ParsedMetrics::parse(text); +/// let events = metrics.counter("ngit_sync_events_total", &[("source", "live")]); +/// let connected = metrics.relay_connected("ws://127.0.0.1:8080"); +/// ``` +#[derive(Debug)] +pub struct ParsedMetrics { + metrics: HashMap>, +} + +impl ParsedMetrics { + /// Parse Prometheus text format into structured data + pub fn parse(text: &str) -> Self { + let mut metrics = HashMap::new(); + + for line in text.lines() { + // Skip comments and empty lines + if line.starts_with('#') || line.trim().is_empty() { + continue; + } + + // Parse metric line: metric_name{label="value"} 123.45 + // Handle both labeled and unlabeled metrics + if let Some((name_labels, value_str)) = line.rsplit_once(' ') { + if let Ok(value) = value_str.trim().parse::() { + let (name, labels) = Self::parse_name_and_labels(name_labels); + metrics + .entry(name.to_string()) + .or_insert_with(Vec::new) + .push(MetricValue { labels, value }); + } + } + } + + ParsedMetrics { metrics } + } + + fn parse_name_and_labels(name_labels: &str) -> (&str, HashMap) { + if let Some(brace_pos) = name_labels.find('{') { + let name = &name_labels[..brace_pos]; + let labels_str = &name_labels[brace_pos + 1..name_labels.len() - 1]; + let labels = Self::parse_labels(labels_str); + (name, labels) + } else { + (name_labels, HashMap::new()) + } + } + + fn parse_labels(labels_str: &str) -> HashMap { + let mut labels = HashMap::new(); + for pair in labels_str.split(',') { + if let Some((key, value)) = pair.split_once('=') { + let key = key.trim(); + let value = value.trim().trim_matches('"'); + labels.insert(key.to_string(), value.to_string()); + } + } + labels + } + + /// Get counter value with optional label matching + pub fn counter(&self, name: &str, labels: &[(&str, &str)]) -> Option { + self.get_metric(name, labels).map(|v| v as u64) + } + + /// Get gauge value with optional label matching + pub fn gauge(&self, name: &str, labels: &[(&str, &str)]) -> Option { + self.get_metric(name, labels).map(|v| v as i64) + } + + fn get_metric(&self, name: &str, labels: &[(&str, &str)]) -> Option { + let values = self.metrics.get(name)?; + + if labels.is_empty() { + // No label filtering - return first match + values.first().map(|v| v.value) + } else { + // Find matching labels + values + .iter() + .find(|v| { + labels.iter().all(|(k, expected)| { + v.labels + .get(*k) + .map(|actual| actual == expected) + .unwrap_or(false) + }) + }) + .map(|v| v.value) + } + } + + // Convenience accessors for sync metrics + + /// Get total events synced from a specific source + pub fn events_total(&self, source: &str) -> Option { + self.counter("ngit_sync_events_total", &[("source", source)]) + } + + /// Check if a specific relay is connected + pub fn relay_connected(&self, relay: &str) -> Option { + self.gauge("ngit_sync_relay_connected", &[("relay", relay)]) + .map(|v| v == 1) + } + + /// Get total number of connected relays + pub fn relays_connected_total(&self) -> Option { + self.gauge("ngit_sync_relays_connected_total", &[]) + } + + /// Get total number of tracked relays + pub fn relays_tracked_total(&self) -> Option { + self.gauge("ngit_sync_relays_tracked_total", &[]) + } +} + +// ============================================================================ +// Metrics Test Harness +// ============================================================================ + +/// Multi-relay test harness for metrics validation. +/// +/// Manages multiple source relays and a syncing relay for testing +/// sync metrics functionality. Uses random ports for all relays +/// to avoid conflicts. +/// +/// # Example +/// ```ignore +/// let mut harness = MetricsTestHarness::with_sources(2).await; +/// harness.start_syncing_relay(0).await; // Sync from source[0] +/// +/// let metrics = harness.get_metrics().await?; +/// assert_eq!(metrics.relays_connected_total(), Some(1)); +/// +/// harness.stop_all().await; +/// ``` +pub struct MetricsTestHarness { + source_relays: Vec, + syncing_relay: Option, + #[allow(dead_code)] + nowhere_url: Option, +} + +impl MetricsTestHarness { + /// Start N source relays (uses TestRelay::start with random ports) + pub async fn with_sources(count: usize) -> Self { + let mut source_relays = Vec::new(); + for _ in 0..count { + source_relays.push(TestRelay::start().await); + } + + Self { + source_relays, + syncing_relay: None, + nowhere_url: None, + } + } + + /// Get source relay URL + pub fn source_url(&self, idx: usize) -> &str { + self.source_relays[idx].url() + } + + /// Get source relay domain (for announcement tags) + pub fn source_domain(&self, idx: usize) -> String { + self.source_relays[idx].domain() + } + + /// Submit events to a specific source relay + pub async fn submit_events(&self, source_idx: usize, events: &[Event]) -> Result<(), String> { + let relay = &self.source_relays[source_idx]; + let keys = Keys::generate(); + let client = TestClient::new(relay.url(), keys).await?; + + for event in events { + client.send_event(event).await?; + } + + client.disconnect().await; + Ok(()) + } + + /// Start syncing relay pointing to source[idx] + pub async fn start_syncing_relay(&mut self, source_idx: usize) { + let source_url = self.source_relays[source_idx].url().to_string(); + self.syncing_relay = Some(TestRelay::start_with_sync(Some(source_url)).await); + } + + /// Start syncing relay pointing to random unused port (for failure tests) + pub async fn start_syncing_relay_to_nowhere(&mut self) { + let port = random_unused_port(); + let nowhere_url = format!("ws://127.0.0.1:{}", port); + self.nowhere_url = Some(nowhere_url.clone()); + self.syncing_relay = Some(TestRelay::start_with_sync(Some(nowhere_url)).await); + } + + /// Stop a source relay + pub async fn stop_source(&mut self, source_idx: usize) { + // We need to take ownership to stop, so we swap with a new relay + // that we immediately stop. This is a workaround since TestRelay::stop + // takes self by value. + let relay = std::mem::replace( + &mut self.source_relays[source_idx], + TestRelay::start().await, + ); + relay.stop().await; + // Stop the placeholder too + let placeholder = std::mem::replace( + &mut self.source_relays[source_idx], + TestRelay::start().await, + ); + placeholder.stop().await; + } + + /// Fetch and parse metrics from syncing relay + pub async fn get_metrics(&self) -> Result { + let relay = self + .syncing_relay + .as_ref() + .ok_or_else(|| "Syncing relay not started".to_string())?; + + let metrics_text = fetch_metrics(relay.url()) + .await + .map_err(|e| format!("Failed to fetch metrics: {}", e))?; + + Ok(ParsedMetrics::parse(&metrics_text)) + } + + /// Get the syncing relay URL (for metrics with relay URL labels) + pub fn syncing_relay_url(&self) -> Option<&str> { + self.syncing_relay.as_ref().map(|r| r.url()) + } + + /// Stop all relays + pub async fn stop_all(mut self) { + if let Some(relay) = self.syncing_relay.take() { + relay.stop().await; + } + for relay in self.source_relays.drain(..) { + relay.stop().await; + } + } +} + +// ============================================================================ +// Port Helpers +// ============================================================================ + +/// Get a random unused port by binding to port 0 and letting the OS assign one +pub fn random_unused_port() -> u16 { + std::net::TcpListener::bind("127.0.0.1:0") + .expect("Failed to bind to random port") + .local_addr() + .expect("Failed to get local addr") + .port() +} + #[cfg(test)] mod tests { use super::*; @@ -742,4 +1019,41 @@ mod tests { }); assert!(has_q_tag, "Should have 'q' tag"); } + + // ======================================================================== + // ParsedMetrics Tests + // ======================================================================== + + #[test] + fn test_parse_counter_with_labels() { + let text = r#"ngit_sync_events_total{source="live"} 5"#; + let metrics = ParsedMetrics::parse(text); + assert_eq!( + metrics.counter("ngit_sync_events_total", &[("source", "live")]), + Some(5) + ); + } + + #[test] + fn test_parse_gauge_without_labels() { + let text = r#"ngit_sync_relays_tracked_total 3"#; + let metrics = ParsedMetrics::parse(text); + assert_eq!(metrics.gauge("ngit_sync_relays_tracked_total", &[]), Some(3)); + } + + #[test] + fn test_parse_empty_metrics() { + let metrics = ParsedMetrics::parse(""); + assert_eq!(metrics.counter("nonexistent", &[]), None); + } + + #[test] + fn test_parse_metric_with_relay_url_label() { + let text = r#"ngit_sync_relay_connected{relay="ws://127.0.0.1:12345"} 1"#; + let metrics = ParsedMetrics::parse(text); + assert_eq!( + metrics.relay_connected("ws://127.0.0.1:12345"), + Some(true) + ); + } } \ No newline at end of file -- cgit v1.2.3