From efe3e43cf792abd8bb256121ebf84ae04836313a Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 8 Dec 2025 12:50:34 +0000 Subject: tests: discover and sync from relay in annocunement published directly --- tests/common/relay.rs | 8 +- tests/proactive_sync_basic.rs | 171 ++++++++++++++++++++++++++++++++++++---- tests/proactive_sync_metrics.rs | 170 ++++++++++++++++++++------------------- tests/proactive_sync_multi.rs | 16 ++-- 4 files changed, 255 insertions(+), 110 deletions(-) (limited to 'tests') diff --git a/tests/common/relay.rs b/tests/common/relay.rs index 3d160f2..073a601 100644 --- a/tests/common/relay.rs +++ b/tests/common/relay.rs @@ -57,12 +57,8 @@ impl TestRelay { /// source.stop().await; /// } /// ``` - pub async fn start_with_sync(bootstrap_relay_url: &str) -> Self { - Self::start_with_options( - Self::find_free_port(), - Some(bootstrap_relay_url.to_string()), - ) - .await + pub async fn start_with_sync(bootstrap_relay_url: Option) -> Self { + Self::start_with_options(Self::find_free_port(), bootstrap_relay_url).await } /// Start relay with options diff --git a/tests/proactive_sync_basic.rs b/tests/proactive_sync_basic.rs index b789cb6..d96d576 100644 --- a/tests/proactive_sync_basic.rs +++ b/tests/proactive_sync_basic.rs @@ -74,9 +74,7 @@ async fn check_event_syncs( .await .expect("Failed to connect to target relay"); - let filter = Filter::new() - .kind(event.kind) - .author(keys.public_key()); + let filter = Filter::new().kind(event.kind).author(keys.public_key()); let events_on_target = client_target .fetch_events(filter, Duration::from_secs(3)) @@ -185,7 +183,7 @@ async fn test_sync_relay_connects_to_source() { let relay_a = TestRelay::start().await; // Start syncing relay (relay_b) configured to sync from relay_a - let relay_b = TestRelay::start_with_sync(relay_a.url()).await; + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().into())).await; // Give some time for connection to establish tokio::time::sleep(Duration::from_millis(500)).await; @@ -197,9 +195,9 @@ async fn test_sync_relay_connects_to_source() { relay_a.stop().await; } -/// Test that valid events sync from source to syncing relay +/// Test that valid events sync from source to bootstrap relay #[tokio::test] -async fn test_valid_event_syncs_to_relay() { +async fn announcement_listing_relay_syncs_from_bootstrap_relay() { // Start source relay (relay_a) let relay_a = TestRelay::start().await; println!( @@ -209,7 +207,7 @@ async fn test_valid_event_syncs_to_relay() { ); // Start syncing relay (relay_b) configured to sync from relay_a - let relay_b = TestRelay::start_with_sync(relay_a.url()).await; + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().into())).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), @@ -226,11 +224,8 @@ async fn test_valid_event_syncs_to_relay() { // Create a repository announcement that lists BOTH relays // This is required for sync - the event must reference both the source relay // and the syncing relay for the write policy to accept it on both sides - let event = create_repo_announcement( - &keys, - &[&relay_a.domain(), &relay_b.domain()], - "test-repo", - ); + let event = + create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], "test-repo"); let event_id = event.id; // Print event details for debugging @@ -263,7 +258,7 @@ async fn test_valid_event_syncs_to_relay() { /// This verifies that relay_b's write policy correctly rejects events during sync /// if they don't list relay_b as one of their relays. #[tokio::test] -async fn test_event_not_listing_target_relay_is_not_synced() { +async fn test_announcement_not_listing_relay_is_not_synced_from_boostrap_relay() { // Start source relay (relay_a) let relay_a = TestRelay::start().await; println!( @@ -273,7 +268,7 @@ async fn test_event_not_listing_target_relay_is_not_synced() { ); // Start syncing relay (relay_b) configured to sync from relay_a - let relay_b = TestRelay::start_with_sync(relay_a.url()).await; + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().into())).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), @@ -321,3 +316,151 @@ async fn test_event_not_listing_target_relay_is_not_synced() { event_id ); } + +fn create_kind_event_referencing_repo(keys: &Keys, repo_coord: &str) -> Event { + // TODO this breaks with kind 1 + EventBuilder::new(Kind::Custom(1617), "Test patch proposal") + .tags(vec![Tag::custom( + TagKind::custom("a"), + vec![repo_coord.to_string()], + )]) + .sign_with_keys(keys) + .expect("Failed to sign event") +} + +/// Test that when a relay is discovered (via an announcement event), events are synced from it +/// +/// This test verifies dynamic relay discovery from direct submissions: +/// 1. relay_a has an announcement and a patch event +/// 2. relay_b (sync enabled, NO bootstrap) receives the announcement directly +/// 3. relay_b discovers relay_a from the announcement and connects to sync +/// 4. relay_b syncs the patch event from relay_a +/// +/// This tests the scenario where relays discover each other through announcements +/// submitted by users, not through an existing sync connection. +#[tokio::test] +async fn repo_events_synced_from_discovered_relay_after_announcement_received() { + // relay_a: source relay with the patch event + let relay_a = TestRelay::start().await; + println!( + "relay_a started at {} (domain: {})", + relay_a.url(), + relay_a.domain() + ); + + // relay_b: sync enabled but NO bootstrap relay - will discover relay_a + let relay_b = TestRelay::start_with_sync(None).await; + println!( + "relay_b started at {} (domain: {})", + relay_b.url(), + relay_b.domain() + ); + + // Create test keys + let keys = Keys::generate(); + + // Create a repository announcement that lists BOTH relays + let announcement = create_repo_announcement( + &keys, + &[&relay_a.domain(), &relay_b.domain()], + "test-repo-discovery", + ); + let announcement_id = announcement.id; + + println!( + "Created announcement {} (kind {})", + announcement_id, + announcement.kind.as_u16() + ); + for tag in announcement.tags.iter() { + println!(" Tag: {:?}", tag.as_slice()); + } + + // Build the repo coordinate for the 'a' tag in the patch + let repo_coord = format!( + "{}:{}:{}", + KIND_REPOSITORY_STATE, + keys.public_key().to_hex(), + "test-repo-discovery" + ); + + // Create a patch event that references the announcement + let patch = create_kind_event_referencing_repo(&keys, &repo_coord); + let patch_id = patch.id; + + println!("Created patch {} (kind {})", patch_id, patch.kind.as_u16()); + for tag in patch.tags.iter() { + println!(" Tag: {:?}", tag.as_slice()); + } + + // Step 1: Send announcement to relay_a + let client_a = create_connected_client(relay_a.url(), keys.clone()) + .await + .expect("Failed to connect to relay_a"); + send_event_reliably(&client_a, &announcement) + .await + .expect("Failed to send announcement to relay_a"); + println!("Announcement sent to relay_a"); + + // Step 2: Send patch to relay_a ONLY + send_event_reliably(&client_a, &patch) + .await + .expect("Failed to send patch to relay_a"); + println!("Patch sent to relay_a"); + client_a.disconnect().await; + + // Step 3: Send announcement to relay_b directly (this should trigger discovery of relay_a) + let client_b = create_connected_client(relay_b.url(), keys.clone()) + .await + .expect("Failed to connect to relay_b"); + send_event_reliably(&client_b, &announcement) + .await + .expect("Failed to send announcement to relay_b"); + println!("Announcement sent to relay_b (should trigger discovery of relay_a)"); + client_b.disconnect().await; + + // Step 4: Wait for relay_b to discover relay_a and sync the patch + println!("Waiting 3s for relay_b to discover relay_a and sync patch..."); + tokio::time::sleep(Duration::from_secs(3)).await; + + // Step 5: Verify patch was synced to relay_b + let client_b_check = create_connected_client(relay_b.url(), Keys::generate()) + .await + .expect("Failed to connect to relay_b for verification"); + + let filter = Filter::new() + .kind(Kind::Custom(1617)) + .author(keys.public_key()); + + let events_on_b = client_b_check + .fetch_events(filter, Duration::from_secs(3)) + .await + .expect("Failed to fetch from relay_b"); + + let patch_synced = events_on_b.iter().any(|e| e.id == patch_id); + + if patch_synced { + println!( + "Patch {} found on relay_b (synced from discovered relay_a)", + patch_id + ); + } else { + println!("Patch {} NOT found on relay_b", patch_id); + println!( + "Events on relay_b: {:?}", + events_on_b.iter().map(|e| e.id).collect::>() + ); + } + + client_b_check.disconnect().await; + + // Clean up + relay_b.stop().await; + relay_a.stop().await; + + assert!( + patch_synced, + "Patch {} should have been synced to relay_b from discovered relay_a", + patch_id + ); +} diff --git a/tests/proactive_sync_metrics.rs b/tests/proactive_sync_metrics.rs index 86e2703..32abe74 100644 --- a/tests/proactive_sync_metrics.rs +++ b/tests/proactive_sync_metrics.rs @@ -32,10 +32,7 @@ fn create_valid_repo_announcement(keys: &Keys, domain: &str, identifier: &str) - TagKind::custom("clone"), vec![format!("http://{}/{}", domain, identifier)], ), - Tag::custom( - TagKind::custom("relays"), - vec![format!("ws://{}", domain)], - ), + Tag::custom(TagKind::custom("relays"), vec![format!("ws://{}", domain)]), ]; EventBuilder::new(Kind::Custom(KIND_REPOSITORY_STATE), "Repository state") @@ -48,11 +45,8 @@ fn create_valid_repo_announcement(keys: &Keys, domain: &str, identifier: &str) - async fn fetch_metrics(relay: &TestRelay) -> Result { // Extract host:port from ws:// URL let ws_url = relay.url(); - let http_url = ws_url - .replace("ws://", "http://") - .replace("/", "") - + "/metrics"; - + let http_url = ws_url.replace("ws://", "http://").replace("/", "") + "/metrics"; + reqwest::get(&http_url).await?.text().await } @@ -60,18 +54,18 @@ async fn fetch_metrics(relay: &TestRelay) -> Result { #[tokio::test] async fn test_sync_metrics_exposed() { let relay = TestRelay::start().await; - + // Give time for relay to start tokio::time::sleep(Duration::from_millis(500)).await; - + // Fetch metrics let metrics_result = fetch_metrics(&relay).await; - + relay.stop().await; - + // Check that we got metrics (even if sync isn't configured) let metrics = metrics_result.expect("Failed to fetch metrics"); - + // Verify basic metrics structure exists assert!( metrics.contains("ngit_") || metrics.contains("# HELP"), @@ -84,19 +78,19 @@ async fn test_sync_metrics_exposed() { async fn test_sync_metric_names_present() { // Start a relay with sync configured let source_relay = TestRelay::start().await; - let sync_relay = TestRelay::start_with_sync(source_relay.url()).await; - + let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; + // Give time for sync connection to attempt tokio::time::sleep(Duration::from_secs(2)).await; - + // Fetch metrics from the syncing relay let metrics = fetch_metrics(&sync_relay) .await .expect("Failed to fetch metrics"); - + sync_relay.stop().await; source_relay.stop().await; - + // Check for expected sync metric names (they may have zero values) // At minimum, the ngit_ prefix metrics should be present assert!( @@ -111,26 +105,23 @@ async fn test_connection_metrics_on_success() { // Start source relay let source_relay = TestRelay::start().await; tokio::time::sleep(Duration::from_millis(200)).await; - + // Start syncing relay - let sync_relay = TestRelay::start_with_sync(source_relay.url()).await; - + let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; + // Wait for connection to establish tokio::time::sleep(Duration::from_secs(2)).await; - + // Fetch metrics - we can verify the relay started and metrics endpoint works let metrics = fetch_metrics(&sync_relay) .await .expect("Failed to fetch metrics"); - + sync_relay.stop().await; source_relay.stop().await; - + // Verify metrics endpoint returned data - assert!( - !metrics.is_empty(), - "Metrics endpoint should return data" - ); + assert!(!metrics.is_empty(), "Metrics endpoint should return data"); } /// Test that events syncing updates metrics @@ -139,35 +130,38 @@ async fn test_event_sync_metrics() { // Start source relay let source_relay = TestRelay::start().await; tokio::time::sleep(Duration::from_millis(200)).await; - + // Start syncing relay - let sync_relay = TestRelay::start_with_sync(source_relay.url()).await; - + let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; + // Wait for connection tokio::time::sleep(Duration::from_secs(1)).await; - + // Create and submit an event to source relay let keys = Keys::generate(); let event = create_valid_repo_announcement(&keys, &source_relay.domain(), "metrics-test-repo"); - + let client = Client::default(); - client.add_relay(source_relay.url()).await.expect("Failed to add relay"); + client + .add_relay(source_relay.url()) + .await + .expect("Failed to add relay"); client.connect().await; - + let _ = client.send_event(&event).await; - + // Wait for sync to occur tokio::time::sleep(Duration::from_secs(2)).await; - + // Fetch metrics from sync relay let metrics = fetch_metrics(&sync_relay) .await .expect("Failed to fetch metrics"); - + client.disconnect().await; sync_relay.stop().await; source_relay.stop().await; - + // Verify metrics endpoint returned data after sync activity assert!( !metrics.is_empty(), @@ -180,18 +174,18 @@ async fn test_event_sync_metrics() { async fn test_health_state_metrics() { // Start a syncing relay pointing to a non-existent source // This will result in connection failures and health state changes - let sync_relay = TestRelay::start_with_sync("ws://127.0.0.1:19999").await; - + let sync_relay = TestRelay::start_with_sync(Some("ws://127.0.0.1:19999".into())).await; + // Wait for some connection attempts tokio::time::sleep(Duration::from_secs(3)).await; - + // Fetch metrics let metrics = fetch_metrics(&sync_relay) .await .expect("Failed to fetch metrics"); - + sync_relay.stop().await; - + // The relay should still be operational even with failed sync assert!( !metrics.is_empty(), @@ -205,32 +199,35 @@ async fn test_gap_event_tracking() { // Start source relay and add some events first let source_relay = TestRelay::start().await; tokio::time::sleep(Duration::from_millis(200)).await; - + let keys = Keys::generate(); - + // Submit event before sync relay starts let event = create_valid_repo_announcement(&keys, &source_relay.domain(), "pre-existing-repo"); - + let client = Client::default(); - client.add_relay(source_relay.url()).await.expect("Failed to add relay"); + client + .add_relay(source_relay.url()) + .await + .expect("Failed to add relay"); client.connect().await; let _ = client.send_event(&event).await; - + // Now start syncing relay - it should catch up on existing events - let sync_relay = TestRelay::start_with_sync(source_relay.url()).await; - + let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; + // Wait for catchup tokio::time::sleep(Duration::from_secs(3)).await; - + // Fetch metrics let metrics = fetch_metrics(&sync_relay) .await .expect("Failed to fetch metrics"); - + client.disconnect().await; sync_relay.stop().await; source_relay.stop().await; - + // Verify metrics exist after gap sync scenario assert!( !metrics.is_empty(), @@ -245,48 +242,57 @@ async fn test_multi_relay_load() { let source_relay_1 = TestRelay::start().await; let source_relay_2 = TestRelay::start().await; let source_relay_3 = TestRelay::start().await; - + tokio::time::sleep(Duration::from_millis(500)).await; - + // Start a syncing relay pointing to first source // Note: The current implementation only supports single sync relay URL // but the test demonstrates the system handles multiple relay scenarios - let sync_relay = TestRelay::start_with_sync(source_relay_1.url()).await; - + let sync_relay = TestRelay::start_with_sync(Some(source_relay_1.url().into())).await; + // Wait for connections tokio::time::sleep(Duration::from_secs(2)).await; - + // Submit events to all source relays let keys = Keys::generate(); - + let event1 = create_valid_repo_announcement(&keys, &source_relay_1.domain(), "repo-1"); let event2 = create_valid_repo_announcement(&keys, &source_relay_2.domain(), "repo-2"); let event3 = create_valid_repo_announcement(&keys, &source_relay_3.domain(), "repo-3"); - + // Submit events let client1 = Client::default(); - client1.add_relay(source_relay_1.url()).await.expect("Failed to add relay"); + client1 + .add_relay(source_relay_1.url()) + .await + .expect("Failed to add relay"); client1.connect().await; let _ = client1.send_event(&event1).await; - + let client2 = Client::default(); - client2.add_relay(source_relay_2.url()).await.expect("Failed to add relay"); + client2 + .add_relay(source_relay_2.url()) + .await + .expect("Failed to add relay"); client2.connect().await; let _ = client2.send_event(&event2).await; - + let client3 = Client::default(); - client3.add_relay(source_relay_3.url()).await.expect("Failed to add relay"); + client3 + .add_relay(source_relay_3.url()) + .await + .expect("Failed to add relay"); client3.connect().await; let _ = client3.send_event(&event3).await; - + // Wait for sync tokio::time::sleep(Duration::from_secs(3)).await; - + // Fetch metrics from sync relay let metrics = fetch_metrics(&sync_relay) .await .expect("Failed to fetch metrics"); - + // Cleanup client1.disconnect().await; client2.disconnect().await; @@ -295,7 +301,7 @@ async fn test_multi_relay_load() { source_relay_1.stop().await; source_relay_2.stop().await; source_relay_3.stop().await; - + // Verify metrics system handled load assert!( !metrics.is_empty(), @@ -308,25 +314,25 @@ async fn test_multi_relay_load() { async fn test_prometheus_format_valid() { let relay = TestRelay::start().await; tokio::time::sleep(Duration::from_millis(500)).await; - + let metrics = fetch_metrics(&relay) .await .expect("Failed to fetch metrics"); - + relay.stop().await; - + // Check for valid Prometheus format markers // - Lines starting with # are comments (HELP, TYPE) // - Metric lines have format: metric_name{labels} value let lines: Vec<&str> = metrics.lines().collect(); - + // Should have some content assert!(!lines.is_empty(), "Metrics should have content"); - + // Check for at least some standard Prometheus patterns let has_help = lines.iter().any(|l| l.starts_with("# HELP")); let has_type = lines.iter().any(|l| l.starts_with("# TYPE")); - + // At minimum we expect help/type comments for any registered metrics assert!( has_help || has_type || lines.iter().any(|l| l.contains("ngit_")), @@ -338,10 +344,10 @@ async fn test_prometheus_format_valid() { #[tokio::test] async fn test_metrics_availability_during_sync() { let source_relay = TestRelay::start().await; - let sync_relay = TestRelay::start_with_sync(source_relay.url()).await; - + let sync_relay = TestRelay::start_with_sync(Some(source_relay.url().into())).await; + tokio::time::sleep(Duration::from_millis(500)).await; - + // Make multiple metrics requests while sync is active for i in 0..3 { let metrics = fetch_metrics(&sync_relay).await; @@ -352,7 +358,7 @@ async fn test_metrics_availability_during_sync() { ); tokio::time::sleep(Duration::from_millis(200)).await; } - + sync_relay.stop().await; source_relay.stop().await; -} \ No newline at end of file +} diff --git a/tests/proactive_sync_multi.rs b/tests/proactive_sync_multi.rs index e07ddbe..de4183a 100644 --- a/tests/proactive_sync_multi.rs +++ b/tests/proactive_sync_multi.rs @@ -1,7 +1,7 @@ //! GRASP-02 Phase 2: Multi-Relay Proactive Sync Integration Tests //! //! Tests the multi-relay proactive sync functionality. -//! +//! //! Note: Integration tests for sync timing are inherently flaky due to //! subprocess communication latency. Unit tests for FilterService and //! SyncManager cover the core logic in src/sync/filter.rs and manager.rs. @@ -32,7 +32,7 @@ async fn test_sync_relay_starts_with_source_url() { tokio::time::sleep(Duration::from_millis(200)).await; // Start syncing relay (relay_sync) configured to sync from relay_a - let relay_sync = TestRelay::start_with_sync(relay_a.url()).await; + let relay_sync = TestRelay::start_with_sync(Some(relay_a.url().into())).await; // Give time for connection establishment tokio::time::sleep(Duration::from_millis(500)).await; @@ -134,7 +134,7 @@ async fn test_event_submission_to_relay() { // Send event - it may or may not be accepted depending on validation // The point is the connection and submission work let result = client.send_event(&event).await; - + // Clean up client.disconnect().await; relay.stop().await; @@ -148,11 +148,11 @@ async fn test_event_submission_to_relay() { fn test_domain_extraction() { // This tests the domain() method of TestRelay indirectly // by verifying the format matches expectations - + // Domain should be in format "127.0.0.1:PORT" let example_domain = "127.0.0.1:8080"; assert!(example_domain.starts_with("127.0.0.1:")); - + // URL should be in format "ws://127.0.0.1:PORT" let example_url = "ws://127.0.0.1:8080"; assert!(example_url.starts_with("ws://127.0.0.1:")); @@ -166,12 +166,12 @@ async fn test_sync_configuration_applied() { tokio::time::sleep(Duration::from_millis(200)).await; // Start syncing relay with explicit sync URL - let relay_sync = TestRelay::start_with_sync(relay_source.url()).await; + let relay_sync = TestRelay::start_with_sync(Some(relay_source.url().into())).await; tokio::time::sleep(Duration::from_millis(300)).await; // Both relays should be running // The sync relay has NGIT_SYNC_BOOTSTRAP_RELAY_URL set (verified by relay starting) - + let client_source = Client::default(); client_source .add_relay(relay_source.url()) @@ -191,4 +191,4 @@ async fn test_sync_configuration_applied() { client_source.disconnect().await; relay_sync.stop().await; relay_source.stop().await; -} \ No newline at end of file +} -- cgit v1.2.3