From 5b9f55b95e56eccf4e3d67deb9ed8d97c7355baa Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 09:44:30 +0000 Subject: Phase 3: Delete useless tests, add 3 real metrics tests Deleted 12 existence-only tests that provided zero confidence: - test_sync_metrics_exposed - test_sync_metric_names_present - test_connection_metrics_on_success - test_event_sync_metrics - test_health_state_metrics - test_gap_event_tracking - test_connection_failure_metrics - test_failure_counter_increments - test_relay_count_metrics - test_event_source_labels_in_metrics - test_multi_relay_load - test_gap_events_tracked_separately Kept 5 valuable tests: - test_prometheus_format_valid - test_concurrent_metrics_requests - test_metric_values_are_numeric - test_startup_sync_event_count - test_metrics_availability_during_sync Added 3 real value-checking tests (currently ignored): - test_connection_failure_increments_counter - test_live_sync_event_count - test_relay_connected_status Test results: 6 passed, 0 failed, 3 ignored --- tests/sync/metrics.rs | 682 +++++++++++++------------------------------------- 1 file changed, 168 insertions(+), 514 deletions(-) (limited to 'tests/sync/metrics.rs') diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs index 09177ec..241af8f 100644 --- a/tests/sync/metrics.rs +++ b/tests/sync/metrics.rs @@ -1,11 +1,7 @@ //! Proactive Sync Metrics Tests //! -//! Tests for Prometheus metrics integration with proactive sync: -//! - All sync metrics exposed at `/metrics` endpoint -//! - Connection metrics update correctly -//! - Health state metrics reflect actual state -//! - Gap events tracked correctly -//! - Load test with 3+ relays +//! Tests for Prometheus metrics integration with proactive sync. +//! These tests validate actual metric VALUES, not just existence. //! //! # Running Tests //! @@ -18,266 +14,17 @@ use std::time::Duration; use nostr_sdk::prelude::*; -use crate::common::{sync_helpers::*, TestRelay}; - -/// Test that sync metrics are exposed at /metrics endpoint -#[tokio::test] -async fn test_sync_metrics_exposed() { - let relay = TestRelay::start().await; - - // Give time for relay to start - tokio::time::sleep(Duration::from_millis(500)).await; - - // Fetch metrics using the shared helper - let metrics_result = fetch_metrics(&relay.url()).await; - - relay.stop().await; - - // Check that we got metrics (even if sync isn't configured) - let metrics = metrics_result.expect("Failed to fetch metrics"); - - // Verify basic metrics structure exists - assert!( - metrics.contains("ngit_") || metrics.contains("# HELP"), - "Metrics endpoint should return Prometheus metrics" - ); -} - -/// Test that sync metrics include expected metric names -#[tokio::test] -async fn test_sync_metric_names_present() { - // Start a relay with sync configured - let source_relay = TestRelay::start().await; - let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; - - // Give time for sync connection to attempt - tokio::time::sleep(Duration::from_secs(2)).await; - - // Fetch metrics from the syncing relay - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Failed to fetch metrics"); - - sync_relay.stop().await; - source_relay.stop().await; - - // Check for expected sync metric names (they may have zero values) - // At minimum, the ngit_ prefix metrics should be present - assert!( - metrics.contains("ngit_"), - "Metrics should include ngit_ prefixed metrics" - ); -} - -/// Test connection metrics update correctly on successful connection -#[tokio::test] -async fn test_connection_metrics_on_success() { - // Start source relay - let source_relay = TestRelay::start().await; - tokio::time::sleep(Duration::from_millis(200)).await; - - // Start syncing relay - let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; - - // Wait for connection to establish - tokio::time::sleep(Duration::from_secs(2)).await; - - // Fetch metrics - we can verify the relay started and metrics endpoint works - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Failed to fetch metrics"); - - sync_relay.stop().await; - source_relay.stop().await; - - // Verify metrics endpoint returned data - assert!(!metrics.is_empty(), "Metrics endpoint should return data"); -} - -/// Test that events syncing updates metrics -#[tokio::test] -async fn test_event_sync_metrics() { - // Start source relay - let source_relay = TestRelay::start().await; - tokio::time::sleep(Duration::from_millis(200)).await; - - // Start syncing relay - let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; - - // Wait for connection - tokio::time::sleep(Duration::from_secs(1)).await; - - // Create and submit an event to source relay - let keys = Keys::generate(); - let event = create_repo_announcement(&keys, &[&source_relay.domain()], "metrics-test-repo"); - - let client = Client::default(); - client - .add_relay(source_relay.url()) - .await - .expect("Failed to add relay"); - client.connect().await; - - let _ = client.send_event(&event).await; - - // Wait for sync to occur - tokio::time::sleep(Duration::from_secs(2)).await; - - // Fetch metrics from sync relay - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Failed to fetch metrics"); - - client.disconnect().await; - sync_relay.stop().await; - source_relay.stop().await; - - // Verify metrics endpoint returned data after sync activity - assert!( - !metrics.is_empty(), - "Metrics should be present after sync activity" - ); -} - -/// Test health state tracking in metrics -#[tokio::test] -async fn test_health_state_metrics() { - // Start a syncing relay pointing to a non-existent source - // This will result in connection failures and health state changes - let sync_relay = TestRelay::start_with_sync(Some("ws://127.0.0.1:19999".into())).await; - - // Wait for some connection attempts - tokio::time::sleep(Duration::from_secs(3)).await; - - // Fetch metrics - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Failed to fetch metrics"); - - sync_relay.stop().await; - - // The relay should still be operational even with failed sync - assert!( - !metrics.is_empty(), - "Metrics should be present even with sync failures" - ); -} - -/// Test gap event tracking (events received during catchup) -#[tokio::test] -async fn test_gap_event_tracking() { - // Start source relay and add some events first - let source_relay = TestRelay::start().await; - tokio::time::sleep(Duration::from_millis(200)).await; - - let keys = Keys::generate(); - - // Submit event before sync relay starts - let event = create_repo_announcement(&keys, &[&source_relay.domain()], "pre-existing-repo"); - - let client = Client::default(); - client - .add_relay(source_relay.url()) - .await - .expect("Failed to add relay"); - client.connect().await; - let _ = client.send_event(&event).await; - - // Now start syncing relay - it should catch up on existing events - let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; - - // Wait for catchup - tokio::time::sleep(Duration::from_secs(3)).await; - - // Fetch metrics - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Failed to fetch metrics"); - - client.disconnect().await; - sync_relay.stop().await; - source_relay.stop().await; - - // Verify metrics exist after gap sync scenario - assert!( - !metrics.is_empty(), - "Metrics should track gap sync activity" - ); -} - -/// Load test with 3+ relays configured for sync -#[tokio::test] -async fn test_multi_relay_load() { - // Start 3 source relays - let source_relay_1 = TestRelay::start().await; - let source_relay_2 = TestRelay::start().await; - let source_relay_3 = TestRelay::start().await; - - tokio::time::sleep(Duration::from_millis(500)).await; - - // Start a syncing relay pointing to first source - // Note: The current implementation only supports single sync relay URL - // but the test demonstrates the system handles multiple relay scenarios - let sync_relay = TestRelay::start_with_sync(Some(source_relay_1.url().into())).await; - - // Wait for connections - tokio::time::sleep(Duration::from_secs(2)).await; - - // Submit events to all source relays - let keys = Keys::generate(); - - let event1 = create_repo_announcement(&keys, &[&source_relay_1.domain()], "repo-1"); - let event2 = create_repo_announcement(&keys, &[&source_relay_2.domain()], "repo-2"); - let event3 = create_repo_announcement(&keys, &[&source_relay_3.domain()], "repo-3"); - - // Submit events - let client1 = Client::default(); - client1 - .add_relay(source_relay_1.url()) - .await - .expect("Failed to add relay"); - client1.connect().await; - let _ = client1.send_event(&event1).await; - - let client2 = Client::default(); - client2 - .add_relay(source_relay_2.url()) - .await - .expect("Failed to add relay"); - client2.connect().await; - let _ = client2.send_event(&event2).await; - - let client3 = Client::default(); - client3 - .add_relay(source_relay_3.url()) - .await - .expect("Failed to add relay"); - client3.connect().await; - let _ = client3.send_event(&event3).await; - - // Wait for sync - tokio::time::sleep(Duration::from_secs(3)).await; - - // Fetch metrics from sync relay - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Failed to fetch metrics"); - - // Cleanup - client1.disconnect().await; - client2.disconnect().await; - client3.disconnect().await; - sync_relay.stop().await; - source_relay_1.stop().await; - source_relay_2.stop().await; - source_relay_3.stop().await; +use crate::common::{ + sync_helpers::{ + create_repo_announcement, fetch_metrics, MetricsTestHarness, ParsedMetrics, TestClient, + KIND_REPOSITORY_STATE, + }, + TestRelay, +}; - // Verify metrics system handled load - assert!( - !metrics.is_empty(), - "Metrics should be available under multi-relay load" - ); -} +// ============================================================================ +// Format and Availability Tests (Keepers) +// ============================================================================ /// Test that Prometheus text format is valid #[tokio::test] @@ -333,187 +80,6 @@ async fn test_metrics_availability_during_sync() { source_relay.stop().await; } -// ============================================================================ -// Additional Coverage Tests (Phase 8) -// ============================================================================ - -/// Test metrics when connection to sync source fails -/// -/// Verifies that: -/// - Metrics endpoint remains functional when sync connection fails -/// - Connection attempt metrics are recorded even for failures -/// - The relay continues to operate despite sync failures -#[tokio::test] -async fn test_connection_failure_metrics() { - // Start a syncing relay pointing to a non-existent relay - // Port 19998 should not have anything running - let sync_relay = TestRelay::start_with_sync(Some("ws://127.0.0.1:19998".into())).await; - - // Wait for connection attempts to fail - tokio::time::sleep(Duration::from_secs(3)).await; - - // Fetch metrics - should still work despite sync failures - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Metrics endpoint should remain functional"); - - sync_relay.stop().await; - - // Verify connection attempt metrics are present (even with zeroes) - // The metrics endpoint should contain ngit_sync prefixed metrics - assert!( - metrics.contains("ngit_sync"), - "Sync metrics should be exposed even during connection failures" - ); - - // Check for connection-related metric patterns - let has_connection_metrics = metrics.contains("connection") || metrics.contains("relay"); - assert!( - has_connection_metrics || metrics.contains("ngit_"), - "Should have some form of connection/relay metrics" - ); -} - -/// Test that failure counters increment on repeated connection failures -/// -/// Verifies that the relay tracks consecutive failures and exposes -/// them via metrics (ngit_sync_relay_failures metric). -#[tokio::test] -async fn test_failure_counter_increments() { - // Use a very high port that definitely won't be listening - let sync_relay = TestRelay::start_with_sync(Some("ws://127.0.0.1:59999".into())).await; - - // First check - initial state - tokio::time::sleep(Duration::from_secs(1)).await; - let metrics_initial = fetch_metrics(&sync_relay.url()) - .await - .expect("Should fetch initial metrics"); - - // Wait for more connection attempts - tokio::time::sleep(Duration::from_secs(3)).await; - - // Second check - after more failures - let metrics_after = fetch_metrics(&sync_relay.url()) - .await - .expect("Should fetch metrics after failures"); - - sync_relay.stop().await; - - // Metrics should be present at both times - assert!(!metrics_initial.is_empty(), "Initial metrics should exist"); - assert!(!metrics_after.is_empty(), "Later metrics should exist"); - - // Both should contain sync-related metrics - assert!( - metrics_after.contains("ngit_"), - "Should contain ngit_ prefixed metrics after failures" - ); -} - -/// Test that relay counts are properly tracked in metrics -/// -/// Verifies: -/// - ngit_sync_relays_tracked_total reflects discovered relays -/// - ngit_sync_relays_connected_total updates with connection state -/// - Count metrics use proper gauges (can go up and down) -#[tokio::test] -async fn test_relay_count_metrics() { - // Start source relay first - let source_relay = TestRelay::start().await; - tokio::time::sleep(Duration::from_millis(200)).await; - - // Start syncing relay pointing to actual source - let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; - - // Wait for connection to establish - tokio::time::sleep(Duration::from_secs(2)).await; - - let metrics_connected = fetch_metrics(&sync_relay.url()) - .await - .expect("Should fetch metrics when connected"); - - // Stop the source relay to trigger disconnection - source_relay.stop().await; - - // Wait for disconnect detection - tokio::time::sleep(Duration::from_secs(2)).await; - - let metrics_disconnected = fetch_metrics(&sync_relay.url()) - .await - .expect("Should fetch metrics after source disconnection"); - - sync_relay.stop().await; - - // Metrics should exist in both states - assert!( - !metrics_connected.is_empty(), - "Connected state metrics should exist" - ); - assert!( - !metrics_disconnected.is_empty(), - "Disconnected state metrics should exist" - ); -} - -/// Test event source label differentiation in metrics -/// -/// Verifies that the ngit_sync_events_total metric properly -/// distinguishes between event sources via labels: -/// - source="live" for real-time subscription events -/// - source="startup" for initial catchup events -/// - source="reconnect" for reconnection catchup events -/// - source="daily" for daily drift detection events -#[tokio::test] -async fn test_event_source_labels_in_metrics() { - // Set up source with pre-existing events (will trigger startup catchup) - let source_relay = TestRelay::start().await; - tokio::time::sleep(Duration::from_millis(200)).await; - - // Create and submit an event before sync relay starts - let keys = Keys::generate(); - let pre_event = create_repo_announcement(&keys, &[&source_relay.domain()], "pre-startup-repo"); - - let client = Client::default(); - client - .add_relay(source_relay.url()) - .await - .expect("Failed to add relay"); - client.connect().await; - let _ = client.send_event(&pre_event).await; - - // Now start syncing relay - this triggers startup catchup - let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; - - // Wait for startup sync - tokio::time::sleep(Duration::from_secs(2)).await; - - // Submit another event - this will be received via live sync - let live_event = create_repo_announcement(&keys, &[&source_relay.domain()], "live-sync-repo"); - let _ = client.send_event(&live_event).await; - - // Wait for live sync - tokio::time::sleep(Duration::from_secs(2)).await; - - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Should fetch metrics"); - - client.disconnect().await; - sync_relay.stop().await; - source_relay.stop().await; - - // Verify metric line exists for events_total - // It should have labels distinguishing sources - let has_events_metric = metrics.contains("ngit_sync_events_total") - || metrics.contains("events") - || metrics.contains("ngit_sync"); - - assert!( - has_events_metric, - "Should have event-related sync metrics" - ); -} - /// Test concurrent metrics requests don't cause issues /// /// Verifies that the metrics endpoint is thread-safe and can @@ -527,7 +93,7 @@ async fn test_concurrent_metrics_requests() { // Clone the URL string so we have an owned value for spawned tasks let sync_url: String = sync_relay.url().to_string(); - + // Spawn multiple concurrent metrics requests let handles: Vec<_> = (0..5) .map(|i| { @@ -605,65 +171,6 @@ async fn test_metric_values_are_numeric() { ); } -/// Test gap events are tracked distinctly from other sync events -/// -/// Gap events are historical events discovered during catchup that weren't -/// received during live sync. This test verifies they are tracked separately -/// in the ngit_sync_gap_events_total metric. -#[tokio::test] -async fn test_gap_events_tracked_separately() { - // Create source relay with initial content - let source_relay = TestRelay::start().await; - tokio::time::sleep(Duration::from_millis(200)).await; - - let keys = Keys::generate(); - - // Create multiple events on source before sync relay starts - let client = Client::default(); - client - .add_relay(source_relay.url()) - .await - .expect("Failed to add relay"); - client.connect().await; - - // Submit several events to create a "gap" - for i in 0..3 { - let event = create_repo_announcement( - &keys, - &[&source_relay.domain()], - &format!("gap-repo-{}", i), - ); - let _ = client.send_event(&event).await; - } - - // Wait for events to be stored - tokio::time::sleep(Duration::from_millis(500)).await; - - // Now start sync relay - it will catchup on the gap events - let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; - - // Wait for catchup to complete - tokio::time::sleep(Duration::from_secs(3)).await; - - let metrics = fetch_metrics(&sync_relay.url()) - .await - .expect("Should fetch metrics"); - - client.disconnect().await; - sync_relay.stop().await; - source_relay.stop().await; - - // Check for gap-related metrics or general sync metrics - let has_sync_metrics = metrics.contains("ngit_sync") - || metrics.contains("gap") - || metrics.contains("events"); - - assert!( - has_sync_metrics, - "Metrics should track sync activity including gap events" - ); -} - // ============================================================================ // Phase 2: Real Metrics Tests (Using MetricsTestHarness) // ============================================================================ @@ -700,11 +207,19 @@ fn create_event_referencing_repo(keys: &Keys, repo_coord: &str, kind: u16, conte async fn test_startup_sync_event_count() { // 1. Start source relay (where we'll put the Layer 2 event to be synced) let source_relay = TestRelay::start().await; - println!("Source relay started at {} (domain: {})", source_relay.url(), source_relay.domain()); + println!( + "Source relay started at {} (domain: {})", + source_relay.url(), + source_relay.domain() + ); // 2. Start syncing relay (with sync enabled but no bootstrap - will discover via announcements) let syncing_relay = TestRelay::start_with_sync(None).await; - println!("Syncing relay started at {} (domain: {})", syncing_relay.url(), syncing_relay.domain()); + println!( + "Syncing relay started at {} (domain: {})", + syncing_relay.url(), + syncing_relay.domain() + ); // 3. Create test keys let keys = Keys::generate(); @@ -715,7 +230,11 @@ async fn test_startup_sync_event_count() { &[&source_relay.domain(), &syncing_relay.domain()], "test-repo-metrics", ); - println!("Created announcement {} (kind {})", announcement.id, announcement.kind.as_u16()); + println!( + "Created announcement {} (kind {})", + announcement.id, + announcement.kind.as_u16() + ); // 5. Build the repo coordinate for the 'a' tag in the patches let repo_coord = format!( @@ -727,7 +246,9 @@ async fn test_startup_sync_event_count() { // 6. Create 3 patch events (Layer 2) that reference the announcement let patches: Vec<_> = (0..3) - .map(|i| create_event_referencing_repo(&keys, &repo_coord, KIND_PATCH, &format!("Test patch {}", i))) + .map(|i| { + create_event_referencing_repo(&keys, &repo_coord, KIND_PATCH, &format!("Test patch {}", i)) + }) .collect(); println!("Created {} patches", patches.len()); @@ -781,7 +302,7 @@ async fn test_startup_sync_event_count() { } println!("===================\n"); - let metrics = crate::common::sync_helpers::ParsedMetrics::parse(&raw_metrics); + let metrics = ParsedMetrics::parse(&raw_metrics); // 11. Check sync metrics let tracked = metrics.gauge("ngit_sync_relays_tracked_total", &[]); @@ -799,7 +320,8 @@ async fn test_startup_sync_event_count() { .kind(Kind::Custom(KIND_PATCH)) .author(keys.public_key()); - let patches_synced = wait_for_event_on_relay(syncing_relay.url(), filter, Duration::from_secs(2)).await; + let patches_synced = + crate::common::sync_helpers::wait_for_event_on_relay(syncing_relay.url(), filter, Duration::from_secs(2)).await; println!("Patches synced to syncing relay: {}", patches_synced); // Cleanup @@ -809,7 +331,10 @@ async fn test_startup_sync_event_count() { // Assertions: // 1. Patches should have been synced (functional verification) // This proves the sync mechanism works even if metrics aren't fully wired - assert!(patches_synced, "Patches should have been synced from source relay"); + assert!( + patches_synced, + "Patches should have been synced from source relay" + ); // 2. Sync metrics should be exposed (they're registered, values may be 0) // The ngit_sync_* metrics are defined and exposed at the /metrics endpoint. @@ -826,6 +351,135 @@ async fn test_startup_sync_event_count() { assert!( tracked.is_some() && connected.is_some(), "Sync metrics should be exposed (tracked={:?}, connected={:?})", - tracked, connected + tracked, + connected ); +} + +// ============================================================================ +// Phase 3: Real Value-Checking Tests +// ============================================================================ + +/// Test that connection failures increment the failure counter. +/// +/// This test validates that when sync cannot connect to a source relay, +/// the connection_attempts_total counter with result="failure" increases. +/// +/// NOTE: This test may fail until sync metrics recording is fully wired up. +/// The test documents the expected behavior. +#[tokio::test] +#[ignore] // Enable when metrics recording is implemented +async fn test_connection_failure_increments_counter() { + let mut harness = MetricsTestHarness::with_sources(0).await; // No sources + harness.start_syncing_relay_to_nowhere().await; + + // Wait for initial connection attempts + tokio::time::sleep(Duration::from_secs(2)).await; + let metrics_1 = harness.get_metrics().await.unwrap(); + + // Wait for more attempts + tokio::time::sleep(Duration::from_secs(2)).await; + let metrics_2 = harness.get_metrics().await.unwrap(); + + // Failure counter should have increased + let failures_1 = metrics_1 + .counter("ngit_sync_connection_attempts_total", &[("result", "failure")]) + .unwrap_or(0); + let failures_2 = metrics_2 + .counter("ngit_sync_connection_attempts_total", &[("result", "failure")]) + .unwrap_or(0); + + println!("Failures at t1: {}, at t2: {}", failures_1, failures_2); + + assert!( + failures_2 > failures_1, + "Failure counter should increase: {} -> {}", + failures_1, + failures_2 + ); + + harness.stop_all().await; +} + +/// Test that live sync events are counted in metrics. +/// +/// This test validates that events received via live subscription +/// (after sync connection is established) are counted separately +/// from startup/bootstrap events. +/// +/// NOTE: This test may fail until sync metrics recording is fully wired up. +/// The test documents the expected behavior. +#[tokio::test] +#[ignore] // Enable when metrics recording is implemented +async fn test_live_sync_event_count() { + let mut harness = MetricsTestHarness::with_sources(1).await; + + // Start syncing BEFORE adding events + harness.start_syncing_relay(0).await; + tokio::time::sleep(Duration::from_secs(2)).await; + + // Now add events - these should be "live" not "startup" + let keys = Keys::generate(); + let events: Vec<_> = (0..2) + .map(|i| create_repo_announcement(&keys, &[&harness.source_domain(0)], &format!("live-{}", i))) + .collect(); + harness.submit_events(0, &events).await.unwrap(); + + tokio::time::sleep(Duration::from_secs(2)).await; + let metrics = harness.get_metrics().await.unwrap(); + + let live_count = metrics.events_total("live"); + println!("Live events synced: {:?}", live_count); + + // NOTE: This will likely fail until sync metrics are wired up + // Test documents the expectation + assert_eq!(live_count, Some(2), "Should have 2 live events"); + + harness.stop_all().await; +} + +/// Test that relay connected status is tracked in metrics. +/// +/// This test validates that the ngit_sync_relay_connected gauge +/// correctly reflects the connection state of source relays. +/// +/// NOTE: This test may fail until sync metrics recording is fully wired up. +/// The test documents the expected behavior. +#[tokio::test] +#[ignore] // Enable when metrics recording is implemented +async fn test_relay_connected_status() { + let mut harness = MetricsTestHarness::with_sources(1).await; + harness.start_syncing_relay(0).await; + tokio::time::sleep(Duration::from_secs(2)).await; + + // Clone the URL to avoid borrow issues when stopping source + let source_url = harness.source_url(0).to_string(); + + // Check connected status + let metrics = harness.get_metrics().await.unwrap(); + + println!("Checking connection status for {}", source_url); + + // NOTE: This will likely fail until sync metrics are wired up + // Test documents the expectation + assert_eq!( + metrics.relay_connected(&source_url), + Some(true), + "Should be connected to {}", + source_url + ); + + // Stop the source + harness.stop_source(0).await; + tokio::time::sleep(Duration::from_secs(2)).await; + + let metrics = harness.get_metrics().await.unwrap(); + assert_eq!( + metrics.relay_connected(&source_url), + Some(false), + "Should be disconnected from {}", + source_url + ); + + harness.stop_all().await; } \ No newline at end of file -- cgit v1.2.3