diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 09:32:23 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 09:32:23 +0000 |
| commit | 6b03c60d06639d1e5eb7226b809f41fa1e27c4a7 (patch) | |
| tree | 7fbc21e2005c9fcf1f8740757c508763f98a0846 | |
| parent | 1bb259202fc67b4af96f470fe3769895356725c2 (diff) | |
Phase 1: Add ParsedMetrics and MetricsTestHarness infrastructure
| -rw-r--r-- | tests/common/sync_helpers.rs | 314 |
1 files changed, 314 insertions, 0 deletions
diff --git a/tests/common/sync_helpers.rs b/tests/common/sync_helpers.rs index 7788783..531ebe1 100644 --- a/tests/common/sync_helpers.rs +++ b/tests/common/sync_helpers.rs | |||
| @@ -10,10 +10,13 @@ | |||
| 10 | //! - Use `Tag::custom(TagKind::custom("name"), vec![...])` syntax | 10 | //! - Use `Tag::custom(TagKind::custom("name"), vec![...])` syntax |
| 11 | //! - Use `EventBuilder::new(kind, content).tags(tags)` syntax | 11 | //! - Use `EventBuilder::new(kind, content).tags(tags)` syntax |
| 12 | 12 | ||
| 13 | use std::collections::HashMap; | ||
| 13 | use std::time::Duration; | 14 | use std::time::Duration; |
| 14 | 15 | ||
| 15 | use nostr_sdk::prelude::*; | 16 | use nostr_sdk::prelude::*; |
| 16 | 17 | ||
| 18 | use super::relay::TestRelay; | ||
| 19 | |||
| 17 | /// Kind 1618 - Issue (NIP-34 git-related event) | 20 | /// Kind 1618 - Issue (NIP-34 git-related event) |
| 18 | pub const KIND_ISSUE: u16 = 1618; | 21 | pub const KIND_ISSUE: u16 = 1618; |
| 19 | 22 | ||
| @@ -592,6 +595,280 @@ pub async fn fetch_metrics(relay_url: &str) -> Result<String, reqwest::Error> { | |||
| 592 | reqwest::get(&http_url).await?.text().await | 595 | reqwest::get(&http_url).await?.text().await |
| 593 | } | 596 | } |
| 594 | 597 | ||
| 598 | // ============================================================================ | ||
| 599 | // Prometheus Metrics Parser | ||
| 600 | // ============================================================================ | ||
| 601 | |||
| 602 | /// A single metric value with its labels | ||
| 603 | #[derive(Debug, Clone)] | ||
| 604 | struct MetricValue { | ||
| 605 | labels: HashMap<String, String>, | ||
| 606 | value: f64, | ||
| 607 | } | ||
| 608 | |||
| 609 | /// Parsed Prometheus metrics with typed accessors. | ||
| 610 | /// | ||
| 611 | /// Parses Prometheus text format and provides strongly-typed access | ||
| 612 | /// to metric values with label filtering support. | ||
| 613 | /// | ||
| 614 | /// # Example | ||
| 615 | /// ```ignore | ||
| 616 | /// let metrics = ParsedMetrics::parse(text); | ||
| 617 | /// let events = metrics.counter("ngit_sync_events_total", &[("source", "live")]); | ||
| 618 | /// let connected = metrics.relay_connected("ws://127.0.0.1:8080"); | ||
| 619 | /// ``` | ||
| 620 | #[derive(Debug)] | ||
| 621 | pub struct ParsedMetrics { | ||
| 622 | metrics: HashMap<String, Vec<MetricValue>>, | ||
| 623 | } | ||
| 624 | |||
| 625 | impl ParsedMetrics { | ||
| 626 | /// Parse Prometheus text format into structured data | ||
| 627 | pub fn parse(text: &str) -> Self { | ||
| 628 | let mut metrics = HashMap::new(); | ||
| 629 | |||
| 630 | for line in text.lines() { | ||
| 631 | // Skip comments and empty lines | ||
| 632 | if line.starts_with('#') || line.trim().is_empty() { | ||
| 633 | continue; | ||
| 634 | } | ||
| 635 | |||
| 636 | // Parse metric line: metric_name{label="value"} 123.45 | ||
| 637 | // Handle both labeled and unlabeled metrics | ||
| 638 | if let Some((name_labels, value_str)) = line.rsplit_once(' ') { | ||
| 639 | if let Ok(value) = value_str.trim().parse::<f64>() { | ||
| 640 | let (name, labels) = Self::parse_name_and_labels(name_labels); | ||
| 641 | metrics | ||
| 642 | .entry(name.to_string()) | ||
| 643 | .or_insert_with(Vec::new) | ||
| 644 | .push(MetricValue { labels, value }); | ||
| 645 | } | ||
| 646 | } | ||
| 647 | } | ||
| 648 | |||
| 649 | ParsedMetrics { metrics } | ||
| 650 | } | ||
| 651 | |||
| 652 | fn parse_name_and_labels(name_labels: &str) -> (&str, HashMap<String, String>) { | ||
| 653 | if let Some(brace_pos) = name_labels.find('{') { | ||
| 654 | let name = &name_labels[..brace_pos]; | ||
| 655 | let labels_str = &name_labels[brace_pos + 1..name_labels.len() - 1]; | ||
| 656 | let labels = Self::parse_labels(labels_str); | ||
| 657 | (name, labels) | ||
| 658 | } else { | ||
| 659 | (name_labels, HashMap::new()) | ||
| 660 | } | ||
| 661 | } | ||
| 662 | |||
| 663 | fn parse_labels(labels_str: &str) -> HashMap<String, String> { | ||
| 664 | let mut labels = HashMap::new(); | ||
| 665 | for pair in labels_str.split(',') { | ||
| 666 | if let Some((key, value)) = pair.split_once('=') { | ||
| 667 | let key = key.trim(); | ||
| 668 | let value = value.trim().trim_matches('"'); | ||
| 669 | labels.insert(key.to_string(), value.to_string()); | ||
| 670 | } | ||
| 671 | } | ||
| 672 | labels | ||
| 673 | } | ||
| 674 | |||
| 675 | /// Get counter value with optional label matching | ||
| 676 | pub fn counter(&self, name: &str, labels: &[(&str, &str)]) -> Option<u64> { | ||
| 677 | self.get_metric(name, labels).map(|v| v as u64) | ||
| 678 | } | ||
| 679 | |||
| 680 | /// Get gauge value with optional label matching | ||
| 681 | pub fn gauge(&self, name: &str, labels: &[(&str, &str)]) -> Option<i64> { | ||
| 682 | self.get_metric(name, labels).map(|v| v as i64) | ||
| 683 | } | ||
| 684 | |||
| 685 | fn get_metric(&self, name: &str, labels: &[(&str, &str)]) -> Option<f64> { | ||
| 686 | let values = self.metrics.get(name)?; | ||
| 687 | |||
| 688 | if labels.is_empty() { | ||
| 689 | // No label filtering - return first match | ||
| 690 | values.first().map(|v| v.value) | ||
| 691 | } else { | ||
| 692 | // Find matching labels | ||
| 693 | values | ||
| 694 | .iter() | ||
| 695 | .find(|v| { | ||
| 696 | labels.iter().all(|(k, expected)| { | ||
| 697 | v.labels | ||
| 698 | .get(*k) | ||
| 699 | .map(|actual| actual == expected) | ||
| 700 | .unwrap_or(false) | ||
| 701 | }) | ||
| 702 | }) | ||
| 703 | .map(|v| v.value) | ||
| 704 | } | ||
| 705 | } | ||
| 706 | |||
| 707 | // Convenience accessors for sync metrics | ||
| 708 | |||
| 709 | /// Get total events synced from a specific source | ||
| 710 | pub fn events_total(&self, source: &str) -> Option<u64> { | ||
| 711 | self.counter("ngit_sync_events_total", &[("source", source)]) | ||
| 712 | } | ||
| 713 | |||
| 714 | /// Check if a specific relay is connected | ||
| 715 | pub fn relay_connected(&self, relay: &str) -> Option<bool> { | ||
| 716 | self.gauge("ngit_sync_relay_connected", &[("relay", relay)]) | ||
| 717 | .map(|v| v == 1) | ||
| 718 | } | ||
| 719 | |||
| 720 | /// Get total number of connected relays | ||
| 721 | pub fn relays_connected_total(&self) -> Option<i64> { | ||
| 722 | self.gauge("ngit_sync_relays_connected_total", &[]) | ||
| 723 | } | ||
| 724 | |||
| 725 | /// Get total number of tracked relays | ||
| 726 | pub fn relays_tracked_total(&self) -> Option<i64> { | ||
| 727 | self.gauge("ngit_sync_relays_tracked_total", &[]) | ||
| 728 | } | ||
| 729 | } | ||
| 730 | |||
| 731 | // ============================================================================ | ||
| 732 | // Metrics Test Harness | ||
| 733 | // ============================================================================ | ||
| 734 | |||
| 735 | /// Multi-relay test harness for metrics validation. | ||
| 736 | /// | ||
| 737 | /// Manages multiple source relays and a syncing relay for testing | ||
| 738 | /// sync metrics functionality. Uses random ports for all relays | ||
| 739 | /// to avoid conflicts. | ||
| 740 | /// | ||
| 741 | /// # Example | ||
| 742 | /// ```ignore | ||
| 743 | /// let mut harness = MetricsTestHarness::with_sources(2).await; | ||
| 744 | /// harness.start_syncing_relay(0).await; // Sync from source[0] | ||
| 745 | /// | ||
| 746 | /// let metrics = harness.get_metrics().await?; | ||
| 747 | /// assert_eq!(metrics.relays_connected_total(), Some(1)); | ||
| 748 | /// | ||
| 749 | /// harness.stop_all().await; | ||
| 750 | /// ``` | ||
| 751 | pub struct MetricsTestHarness { | ||
| 752 | source_relays: Vec<TestRelay>, | ||
| 753 | syncing_relay: Option<TestRelay>, | ||
| 754 | #[allow(dead_code)] | ||
| 755 | nowhere_url: Option<String>, | ||
| 756 | } | ||
| 757 | |||
| 758 | impl MetricsTestHarness { | ||
| 759 | /// Start N source relays (uses TestRelay::start with random ports) | ||
| 760 | pub async fn with_sources(count: usize) -> Self { | ||
| 761 | let mut source_relays = Vec::new(); | ||
| 762 | for _ in 0..count { | ||
| 763 | source_relays.push(TestRelay::start().await); | ||
| 764 | } | ||
| 765 | |||
| 766 | Self { | ||
| 767 | source_relays, | ||
| 768 | syncing_relay: None, | ||
| 769 | nowhere_url: None, | ||
| 770 | } | ||
| 771 | } | ||
| 772 | |||
| 773 | /// Get source relay URL | ||
| 774 | pub fn source_url(&self, idx: usize) -> &str { | ||
| 775 | self.source_relays[idx].url() | ||
| 776 | } | ||
| 777 | |||
| 778 | /// Get source relay domain (for announcement tags) | ||
| 779 | pub fn source_domain(&self, idx: usize) -> String { | ||
| 780 | self.source_relays[idx].domain() | ||
| 781 | } | ||
| 782 | |||
| 783 | /// Submit events to a specific source relay | ||
| 784 | pub async fn submit_events(&self, source_idx: usize, events: &[Event]) -> Result<(), String> { | ||
| 785 | let relay = &self.source_relays[source_idx]; | ||
| 786 | let keys = Keys::generate(); | ||
| 787 | let client = TestClient::new(relay.url(), keys).await?; | ||
| 788 | |||
| 789 | for event in events { | ||
| 790 | client.send_event(event).await?; | ||
| 791 | } | ||
| 792 | |||
| 793 | client.disconnect().await; | ||
| 794 | Ok(()) | ||
| 795 | } | ||
| 796 | |||
| 797 | /// Start syncing relay pointing to source[idx] | ||
| 798 | pub async fn start_syncing_relay(&mut self, source_idx: usize) { | ||
| 799 | let source_url = self.source_relays[source_idx].url().to_string(); | ||
| 800 | self.syncing_relay = Some(TestRelay::start_with_sync(Some(source_url)).await); | ||
| 801 | } | ||
| 802 | |||
| 803 | /// Start syncing relay pointing to random unused port (for failure tests) | ||
| 804 | pub async fn start_syncing_relay_to_nowhere(&mut self) { | ||
| 805 | let port = random_unused_port(); | ||
| 806 | let nowhere_url = format!("ws://127.0.0.1:{}", port); | ||
| 807 | self.nowhere_url = Some(nowhere_url.clone()); | ||
| 808 | self.syncing_relay = Some(TestRelay::start_with_sync(Some(nowhere_url)).await); | ||
| 809 | } | ||
| 810 | |||
| 811 | /// Stop a source relay | ||
| 812 | pub async fn stop_source(&mut self, source_idx: usize) { | ||
| 813 | // We need to take ownership to stop, so we swap with a new relay | ||
| 814 | // that we immediately stop. This is a workaround since TestRelay::stop | ||
| 815 | // takes self by value. | ||
| 816 | let relay = std::mem::replace( | ||
| 817 | &mut self.source_relays[source_idx], | ||
| 818 | TestRelay::start().await, | ||
| 819 | ); | ||
| 820 | relay.stop().await; | ||
| 821 | // Stop the placeholder too | ||
| 822 | let placeholder = std::mem::replace( | ||
| 823 | &mut self.source_relays[source_idx], | ||
| 824 | TestRelay::start().await, | ||
| 825 | ); | ||
| 826 | placeholder.stop().await; | ||
| 827 | } | ||
| 828 | |||
| 829 | /// Fetch and parse metrics from syncing relay | ||
| 830 | pub async fn get_metrics(&self) -> Result<ParsedMetrics, String> { | ||
| 831 | let relay = self | ||
| 832 | .syncing_relay | ||
| 833 | .as_ref() | ||
| 834 | .ok_or_else(|| "Syncing relay not started".to_string())?; | ||
| 835 | |||
| 836 | let metrics_text = fetch_metrics(relay.url()) | ||
| 837 | .await | ||
| 838 | .map_err(|e| format!("Failed to fetch metrics: {}", e))?; | ||
| 839 | |||
| 840 | Ok(ParsedMetrics::parse(&metrics_text)) | ||
| 841 | } | ||
| 842 | |||
| 843 | /// Get the syncing relay URL (for metrics with relay URL labels) | ||
| 844 | pub fn syncing_relay_url(&self) -> Option<&str> { | ||
| 845 | self.syncing_relay.as_ref().map(|r| r.url()) | ||
| 846 | } | ||
| 847 | |||
| 848 | /// Stop all relays | ||
| 849 | pub async fn stop_all(mut self) { | ||
| 850 | if let Some(relay) = self.syncing_relay.take() { | ||
| 851 | relay.stop().await; | ||
| 852 | } | ||
| 853 | for relay in self.source_relays.drain(..) { | ||
| 854 | relay.stop().await; | ||
| 855 | } | ||
| 856 | } | ||
| 857 | } | ||
| 858 | |||
| 859 | // ============================================================================ | ||
| 860 | // Port Helpers | ||
| 861 | // ============================================================================ | ||
| 862 | |||
| 863 | /// Get a random unused port by binding to port 0 and letting the OS assign one | ||
| 864 | pub fn random_unused_port() -> u16 { | ||
| 865 | std::net::TcpListener::bind("127.0.0.1:0") | ||
| 866 | .expect("Failed to bind to random port") | ||
| 867 | .local_addr() | ||
| 868 | .expect("Failed to get local addr") | ||
| 869 | .port() | ||
| 870 | } | ||
| 871 | |||
| 595 | #[cfg(test)] | 872 | #[cfg(test)] |
| 596 | mod tests { | 873 | mod tests { |
| 597 | use super::*; | 874 | use super::*; |
| @@ -742,4 +1019,41 @@ mod tests { | |||
| 742 | }); | 1019 | }); |
| 743 | assert!(has_q_tag, "Should have 'q' tag"); | 1020 | assert!(has_q_tag, "Should have 'q' tag"); |
| 744 | } | 1021 | } |
| 1022 | |||
| 1023 | // ======================================================================== | ||
| 1024 | // ParsedMetrics Tests | ||
| 1025 | // ======================================================================== | ||
| 1026 | |||
| 1027 | #[test] | ||
| 1028 | fn test_parse_counter_with_labels() { | ||
| 1029 | let text = r#"ngit_sync_events_total{source="live"} 5"#; | ||
| 1030 | let metrics = ParsedMetrics::parse(text); | ||
| 1031 | assert_eq!( | ||
| 1032 | metrics.counter("ngit_sync_events_total", &[("source", "live")]), | ||
| 1033 | Some(5) | ||
| 1034 | ); | ||
| 1035 | } | ||
| 1036 | |||
| 1037 | #[test] | ||
| 1038 | fn test_parse_gauge_without_labels() { | ||
| 1039 | let text = r#"ngit_sync_relays_tracked_total 3"#; | ||
| 1040 | let metrics = ParsedMetrics::parse(text); | ||
| 1041 | assert_eq!(metrics.gauge("ngit_sync_relays_tracked_total", &[]), Some(3)); | ||
| 1042 | } | ||
| 1043 | |||
| 1044 | #[test] | ||
| 1045 | fn test_parse_empty_metrics() { | ||
| 1046 | let metrics = ParsedMetrics::parse(""); | ||
| 1047 | assert_eq!(metrics.counter("nonexistent", &[]), None); | ||
| 1048 | } | ||
| 1049 | |||
| 1050 | #[test] | ||
| 1051 | fn test_parse_metric_with_relay_url_label() { | ||
| 1052 | let text = r#"ngit_sync_relay_connected{relay="ws://127.0.0.1:12345"} 1"#; | ||
| 1053 | let metrics = ParsedMetrics::parse(text); | ||
| 1054 | assert_eq!( | ||
| 1055 | metrics.relay_connected("ws://127.0.0.1:12345"), | ||
| 1056 | Some(true) | ||
| 1057 | ); | ||
| 1058 | } | ||
| 745 | } \ No newline at end of file | 1059 | } \ No newline at end of file |