//! GRASP-02 Phase 1: Proactive Sync Basic Integration Tests //! //! Tests the basic proactive sync functionality using two TestRelay instances: //! - relay_a: Source relay with events //! - relay_b: Sync relay configured to sync from relay_a //! //! # Running Tests //! //! ```bash //! cargo test --test proactive_sync_basic //! cargo test --test proactive_sync_basic -- --nocapture //! ``` mod common; use std::time::Duration; use common::TestRelay; use nostr_sdk::prelude::*; /// Kind 30617 - Repository State (NIP-34) const KIND_REPOSITORY_STATE: u16 = 30617; /// Result of checking if an event syncs between relays #[derive(Debug)] struct SyncCheckResult { /// Whether the event was successfully stored on the source relay stored_on_source: bool, /// Whether the event was synced to the target relay synced_to_target: bool, } /// Helper to check if an event syncs from source relay to target relay /// /// This function: /// 1. Sends the event to the source relay /// 2. Verifies if it was stored on the source relay /// 3. Waits for potential sync /// 4. Checks if the event appears on the target relay /// /// Note: The sync subscription must already be established before calling this. async fn check_event_syncs( source_relay: &TestRelay, target_relay: &TestRelay, event: &Event, keys: &Keys, ) -> SyncCheckResult { let event_id = event.id; // Create client and connect to source relay let client_source = create_connected_client(source_relay.url(), keys.clone()) .await .expect("Failed to connect to source relay"); // Send event to source relay let send_result = send_event_reliably(&client_source, event).await; let stored_on_source = send_result.is_ok(); if stored_on_source { println!("Event {} stored on source relay", event_id); } else { println!( "Event {} NOT stored on source relay: {:?}", event_id, send_result.err() ); } // Wait for sync to occur tokio::time::sleep(Duration::from_secs(1)).await; // Check if event exists on target relay let client_target = create_connected_client(target_relay.url(), Keys::generate()) .await .expect("Failed to connect to target relay"); let filter = Filter::new().kind(event.kind).author(keys.public_key()); let events_on_target = client_target .fetch_events(filter, Duration::from_secs(3)) .await .expect("Failed to fetch from target relay"); let synced_to_target = events_on_target.iter().any(|e| e.id == event_id); if synced_to_target { println!("Event {} found on target relay (synced)", event_id); } else { println!("Event {} NOT found on target relay", event_id); } // Clean up client_source.disconnect().await; client_target.disconnect().await; SyncCheckResult { stored_on_source, synced_to_target, } } /// Create a client with keys, connect to relay, and wait for connection async fn create_connected_client(relay_url: &str, keys: Keys) -> Result { let client = Client::new(keys); client .add_relay(relay_url) .await .map_err(|e| e.to_string())?; client.connect().await; // Wait for connection to establish (with retries, matching grasp-audit pattern) for _ in 0..30 { tokio::time::sleep(Duration::from_millis(100)).await; let relays = client.relays().await; if relays.values().any(|r| r.is_connected()) { return Ok(client); } } Err("Failed to connect to relay after 3 seconds".to_string()) } /// Send an event and wait for successful delivery async fn send_event_reliably(client: &Client, event: &Event) -> Result { // Try sending the event with retries for attempt in 1..=5 { let result = client.send_event(event).await; match result { Ok(output) => { if !output.success.is_empty() { return Ok(output.val); } // Check what went wrong if !output.failed.is_empty() { println!(" Attempt {} - failures: {:?}", attempt, output.failed); // If relay not connected, try reconnecting client.connect().await; } } Err(e) => { println!(" Attempt {} - error: {}", attempt, e); } } tokio::time::sleep(Duration::from_millis(500)).await; } Err("Failed to send event after 5 attempts".to_string()) } /// Create a valid repository announcement event for testing /// /// This creates a kind 30617 event with required clone and relays tags. /// Accepts one or more domains - for sync tests, include all relay domains /// so the event will be accepted by each relay's write policy. /// Uses TagKind::custom("clone") and TagKind::custom("relays") to match grasp-audit patterns. fn create_repo_announcement(keys: &Keys, domains: &[&str], identifier: &str) -> Event { // Build clone URLs for all domains (with .git suffix) let clone_urls: Vec = domains .iter() .map(|d| format!("http://{}/{}.git", d, identifier)) .collect(); // Build relay URLs for all domains let relay_urls: Vec = domains.iter().map(|d| format!("ws://{}", d)).collect(); // Build tags for repository announcement using custom tag kinds (as grasp-audit does) let tags = vec![ Tag::identifier(identifier), Tag::custom(TagKind::custom("clone"), clone_urls), Tag::custom(TagKind::custom("relays"), relay_urls), ]; EventBuilder::new(Kind::Custom(KIND_REPOSITORY_STATE), "Repository state") .tags(tags) .sign_with_keys(keys) .expect("Failed to sign event") } /// Test that syncing relay connects to source relay #[tokio::test] async fn test_sync_relay_connects_to_source() { // Start source relay (relay_a) let relay_a = TestRelay::start().await; // Start syncing relay (relay_b) configured to sync from relay_a let relay_b = TestRelay::start_with_sync(Some(relay_a.url().into())).await; // Give some time for connection to establish tokio::time::sleep(Duration::from_millis(500)).await; // If we got here without panicking, the relays started successfully // The sync connection happens in the background relay_b.stop().await; relay_a.stop().await; } /// Test that valid events sync from source to bootstrap relay #[tokio::test] async fn announcement_listing_relay_syncs_from_bootstrap_relay() { // Start source relay (relay_a) let relay_a = TestRelay::start().await; println!( "relay_a started at {} (domain: {})", relay_a.url(), relay_a.domain() ); // Start syncing relay (relay_b) configured to sync from relay_a let relay_b = TestRelay::start_with_sync(Some(relay_a.url().into())).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), relay_b.domain() ); // Create test keys let keys = Keys::generate(); // Wait for relay_b's sync connection to establish println!("Waiting 1s for relay_b sync connection to establish..."); tokio::time::sleep(Duration::from_secs(1)).await; // Create a repository announcement that lists BOTH relays // This is required for sync - the event must reference both the source relay // and the syncing relay for the write policy to accept it on both sides let event = create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], "test-repo"); let event_id = event.id; // Print event details for debugging println!("Created event {} (kind {})", event_id, event.kind.as_u16()); for tag in event.tags.iter() { println!(" Tag: {:?}", tag.as_slice()); } // Use helper to send and check sync let result = check_event_syncs(&relay_a, &relay_b, &event, &keys).await; // Clean up relay_b.stop().await; relay_a.stop().await; assert!( result.stored_on_source, "Event {} was not stored on relay_a! This is a prerequisite for sync.", event_id ); assert!( result.synced_to_target, "Event {} was not synced to relay_b", event_id ); } /// Test that events not listing relay_b in their relays tag are NOT synced /// /// This verifies that relay_b's write policy correctly rejects events during sync /// if they don't list relay_b as one of their relays. #[tokio::test] async fn test_announcement_not_listing_relay_is_not_synced_from_boostrap_relay() { // Start source relay (relay_a) let relay_a = TestRelay::start().await; println!( "relay_a started at {} (domain: {})", relay_a.url(), relay_a.domain() ); // Start syncing relay (relay_b) configured to sync from relay_a let relay_b = TestRelay::start_with_sync(Some(relay_a.url().into())).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), relay_b.domain() ); // Create test keys let keys = Keys::generate(); // Wait for relay_b's sync connection to establish println!("Waiting 1s for relay_b sync connection to establish..."); tokio::time::sleep(Duration::from_secs(1)).await; // Create a repository announcement that lists ONLY relay_a (NOT relay_b) // This event is valid and will be accepted by relay_a, but should be // rejected by relay_b's write policy during sync let event = create_repo_announcement(&keys, &[&relay_a.domain()], "test-repo-only-a"); let event_id = event.id; // Print event details for debugging println!("Created event {} (kind {})", event_id, event.kind.as_u16()); for tag in event.tags.iter() { println!(" Tag: {:?}", tag.as_slice()); } println!("Note: This event only lists relay_a, not relay_b"); // Use helper to send and check sync let result = check_event_syncs(&relay_a, &relay_b, &event, &keys).await; // Clean up relay_b.stop().await; relay_a.stop().await; // Event should be stored on relay_a (it lists relay_a) assert!( result.stored_on_source, "Event {} should have been stored on relay_a (it lists relay_a)", event_id ); // Event should NOT be synced to relay_b (it doesn't list relay_b) assert!( !result.synced_to_target, "Event {} should NOT have been synced to relay_b (it doesn't list relay_b)", event_id ); } fn create_kind_event_referencing_repo(keys: &Keys, repo_coord: &str) -> Event { // TODO this breaks with kind 1 EventBuilder::new(Kind::Custom(1617), "Test patch proposal") .tags(vec![Tag::custom( TagKind::custom("a"), vec![repo_coord.to_string()], )]) .sign_with_keys(keys) .expect("Failed to sign event") } /// Test that when a relay is discovered (via an announcement event), events are synced from it /// /// This test verifies dynamic relay discovery from direct submissions: /// 1. relay_a has an announcement and a patch event /// 2. relay_b (sync enabled, NO bootstrap) receives the announcement directly /// 3. relay_b discovers relay_a from the announcement and connects to sync /// 4. relay_b syncs the patch event from relay_a /// /// This tests the scenario where relays discover each other through announcements /// submitted by users, not through an existing sync connection. #[tokio::test] async fn repo_events_synced_from_discovered_relay_after_announcement_received() { // relay_a: source relay with the patch event let relay_a = TestRelay::start().await; println!( "relay_a started at {} (domain: {})", relay_a.url(), relay_a.domain() ); // relay_b: sync enabled but NO bootstrap relay - will discover relay_a let relay_b = TestRelay::start_with_sync(None).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), relay_b.domain() ); // Create test keys let keys = Keys::generate(); // Create a repository announcement that lists BOTH relays let announcement = create_repo_announcement( &keys, &[&relay_a.domain(), &relay_b.domain()], "test-repo-discovery", ); let announcement_id = announcement.id; println!( "Created announcement {} (kind {})", announcement_id, announcement.kind.as_u16() ); for tag in announcement.tags.iter() { println!(" Tag: {:?}", tag.as_slice()); } // Build the repo coordinate for the 'a' tag in the patch let repo_coord = format!( "{}:{}:{}", KIND_REPOSITORY_STATE, keys.public_key().to_hex(), "test-repo-discovery" ); // Create a patch event that references the announcement let patch = create_kind_event_referencing_repo(&keys, &repo_coord); let patch_id = patch.id; println!("Created patch {} (kind {})", patch_id, patch.kind.as_u16()); for tag in patch.tags.iter() { println!(" Tag: {:?}", tag.as_slice()); } // Step 1: Send announcement to relay_a let client_a = create_connected_client(relay_a.url(), keys.clone()) .await .expect("Failed to connect to relay_a"); send_event_reliably(&client_a, &announcement) .await .expect("Failed to send announcement to relay_a"); println!("Announcement sent to relay_a"); // Step 2: Send patch to relay_a ONLY send_event_reliably(&client_a, &patch) .await .expect("Failed to send patch to relay_a"); println!("Patch sent to relay_a"); client_a.disconnect().await; // Step 3: Send announcement to relay_b directly (this should trigger discovery of relay_a) let client_b = create_connected_client(relay_b.url(), keys.clone()) .await .expect("Failed to connect to relay_b"); send_event_reliably(&client_b, &announcement) .await .expect("Failed to send announcement to relay_b"); println!("Announcement sent to relay_b (should trigger discovery of relay_a)"); client_b.disconnect().await; // Step 4: Wait for relay_b to discover relay_a and sync the patch println!("Waiting 3s for relay_b to discover relay_a and sync patch..."); tokio::time::sleep(Duration::from_secs(3)).await; // Step 5: Verify patch was synced to relay_b let client_b_check = create_connected_client(relay_b.url(), Keys::generate()) .await .expect("Failed to connect to relay_b for verification"); let filter = Filter::new() .kind(Kind::Custom(1617)) .author(keys.public_key()); let events_on_b = client_b_check .fetch_events(filter, Duration::from_secs(3)) .await .expect("Failed to fetch from relay_b"); let patch_synced = events_on_b.iter().any(|e| e.id == patch_id); if patch_synced { println!( "Patch {} found on relay_b (synced from discovered relay_a)", patch_id ); } else { println!("Patch {} NOT found on relay_b", patch_id); println!( "Events on relay_b: {:?}", events_on_b.iter().map(|e| e.id).collect::>() ); } client_b_check.disconnect().await; // Clean up relay_b.stop().await; relay_a.stop().await; assert!( patch_synced, "Patch {} should have been synced to relay_b from discovered relay_a", patch_id ); }