//! Live Sync Tests //! //! Tests for real-time event synchronization between relays. //! These tests verify that events published to one relay are synced //! to another relay in real-time via the discovery mechanism. //! //! # Tests //! - Test 5: `test_live_sync_layer2_events` - Layer 2 (kind 1618) events sync in real-time //! - Test 6: `test_live_sync_layer3_events` - Layer 3 (comments) sync when referencing Layer 2 //! - Test 7: `test_live_sync_event_ordering` - Events arrive in chronological order //! //! # Sync Mechanism //! These tests use the discovery-based sync pattern: //! 1. Send announcement to both relays //! 2. Each relay discovers the other from the announcement's relays tag //! 3. Events sync between relays //! //! This tests "live" sync behavior - events syncing after connection is established, //! as opposed to bootstrap sync which syncs existing events on startup. use std::time::Duration; use nostr_sdk::prelude::*; use crate::common::{sync_helpers::*, TestRelay}; /// Test 5: Live sync Layer 2 events /// /// Verifies that Layer 2 events (kind 1618 issues) published to one relay /// are synced to another relay in real-time via discovery. /// /// Flow: /// 1. Start relay_a (source) /// 2. Start relay_b (with sync enabled, no bootstrap) /// 3. Send announcement to both relays (triggers discovery) /// 4. Publish Layer 2 issue to relay_a /// 5. Verify event syncs to relay_b within 5 seconds #[tokio::test] async fn test_live_sync_layer2_events() { // 1. Start source relay (relay_a) let relay_a = TestRelay::start().await; println!( "relay_a started at {} (domain: {})", relay_a.url(), relay_a.domain() ); // 2. Start relay_b with sync enabled (no bootstrap - sync via discovery) let relay_b = TestRelay::start_with_sync(None).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), relay_b.domain() ); // 3. Create test keys let keys = Keys::generate(); // 4. Create a repository announcement that lists BOTH relays let repo_id = "test-repo-live-l2"; let announcement = create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); println!( "Created announcement {} (kind {})", announcement.id, announcement.kind.as_u16() ); // 5. Send announcement to relay_a let client_a = TestClient::new(relay_a.url(), keys.clone()) .await .expect("Failed to connect to relay_a"); client_a .send_event(&announcement) .await .expect("Failed to send announcement to relay_a"); println!("Announcement sent to relay_a"); // 6. Send announcement to relay_b (triggers discovery of relay_a) let client_b = TestClient::new(relay_b.url(), keys.clone()) .await .expect("Failed to connect to relay_b"); client_b .send_event(&announcement) .await .expect("Failed to send announcement to relay_b"); println!("Announcement sent to relay_b (triggers discovery)"); // 7. Wait for discovery to complete tokio::time::sleep(Duration::from_secs(1)).await; // 8. Create and send a Layer 2 issue event (using helper) let repo_coordinate = repo_coord(&keys, repo_id); let issue = build_layer2_issue_event(&keys, &repo_coordinate, "Test Issue for Live Sync") .expect("Failed to create issue event"); let issue_id = issue.id; println!("Created issue {} (kind {})", issue_id, issue.kind.as_u16()); for tag in issue.tags.iter() { println!(" Tag: {:?}", tag.as_slice()); } // Send issue to relay_a only client_a .send_event(&issue) .await .expect("Failed to send issue to relay_a"); println!("Issue sent to relay_a"); client_a.disconnect().await; client_b.disconnect().await; // 9. Wait and verify event syncs to relay_b let filter = Filter::new() .kind(Kind::GitIssue) .author(keys.public_key()) .id(issue_id); let synced = wait_for_event_on_relay(relay_b.url(), filter, Duration::from_secs(5)).await; println!("Issue {} synced to relay_b: {}", issue_id, synced); // 10. Cleanup relay_b.stop().await; relay_a.stop().await; assert!( synced, "Layer 2 issue {} should have synced from relay_a to relay_b in real-time", issue_id ); } /// Test 6: Live sync Layer 3 events /// /// Verifies that Layer 3 events (comments) sync when they reference Layer 2 events. /// /// Flow: /// 1. Start relay_a and relay_b (with sync enabled) /// 2. Send announcement to both relays (triggers discovery) /// 3. Publish Layer 2 issue to relay_a /// 4. Wait for Layer 2 issue to sync to relay_b /// 5. Publish Layer 3 comment (referencing the issue) to relay_a /// 6. Verify comment syncs to relay_b within 5 seconds /// 7. Verify comment has correct 'E' tag reference /// #[tokio::test] async fn test_live_sync_layer3_events() { // 1. Start relays let relay_a = TestRelay::start().await; println!( "relay_a started at {} (domain: {})", relay_a.url(), relay_a.domain() ); let relay_b = TestRelay::start_with_sync(None).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), relay_b.domain() ); let keys = Keys::generate(); // 2. Create and send repository announcement to both relays let repo_id = "test-repo-live-l3"; let announcement = create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); let client_a = TestClient::new(relay_a.url(), keys.clone()) .await .expect("Failed to connect to relay_a"); let client_b = TestClient::new(relay_b.url(), keys.clone()) .await .expect("Failed to connect to relay_b"); client_a .send_event(&announcement) .await .expect("Failed to send announcement to relay_a"); println!("Announcement sent to relay_a"); client_b .send_event(&announcement) .await .expect("Failed to send announcement to relay_b"); println!("Announcement sent to relay_b (triggers discovery)"); // 3. Wait for discovery tokio::time::sleep(Duration::from_secs(1)).await; // 4. Create and send Layer 2 issue let repo_coordinate = repo_coord(&keys, repo_id); let issue = build_layer2_issue_event(&keys, &repo_coordinate, "Parent Issue for Comment Test") .expect("Failed to create issue"); let issue_id = issue.id; client_a .send_event(&issue) .await .expect("Failed to send issue"); println!("Layer 2 issue {} sent to relay_a", issue_id); // 5. Create and send Layer 3 comment IMMEDIATELY (before waiting for sync) // This tests that subscriptions without 'since' will catch pre-existing events let comment = build_layer3_comment_with_uppercase_e_tag( &keys, &issue_id, "This is a comment on the issue", ) .expect("Failed to create comment"); let comment_id = comment.id; println!( "Created comment {} (kind {})", comment_id, comment.kind.as_u16() ); for tag in comment.tags.iter() { println!(" Tag: {:?}", tag.as_slice()); } client_a .send_event(&comment) .await .expect("Failed to send comment"); println!( "Layer 3 comment {} sent to relay_a BEFORE Layer 3 subscription established", comment_id ); // 6. Now wait for issue to sync to relay_b (this triggers Layer 3 filter creation) tokio::time::sleep(Duration::from_secs(2)).await; let issue_filter = Filter::new().kind(Kind::GitIssue).id(issue_id); let issue_synced = wait_for_event_on_relay(relay_b.url(), issue_filter, Duration::from_secs(3)).await; println!("Issue synced to relay_b: {}", issue_synced); client_a.disconnect().await; client_b.disconnect().await; // 7. Wait and verify comment syncs to relay_b let comment_filter = Filter::new() .kind(Kind::Comment) .author(keys.public_key()) .id(comment_id); let comment_synced = wait_for_event_on_relay(relay_b.url(), comment_filter, Duration::from_secs(5)).await; println!( "Comment {} synced to relay_b: {}", comment_id, comment_synced ); // 8. Verify the comment has correct 'E' tag reference let mut has_correct_ref = false; if comment_synced { let temp_keys = Keys::generate(); let client = Client::new(temp_keys); if client.add_relay(relay_b.url()).await.is_ok() { client.connect().await; tokio::time::sleep(Duration::from_millis(500)).await; let fetch_filter = Filter::new().kind(Kind::Comment).id(comment_id); if let Ok(events) = client .fetch_events(fetch_filter, Duration::from_secs(2)) .await { if let Some(event) = events.first() { // Check for 'E' tag with parent event ID for tag in event.tags.iter() { let slice = tag.as_slice(); if slice.first() == Some(&"E".to_string()) && slice.get(1) == Some(&issue_id.to_hex()) { has_correct_ref = true; println!("Found correct E tag reference to issue"); break; } } } } client.disconnect().await; } } // 9. Cleanup relay_b.stop().await; relay_a.stop().await; assert!( issue_synced, "Layer 2 issue {} should have synced first", issue_id ); assert!( comment_synced, "Layer 3 comment {} should have synced to relay_b", comment_id ); assert!( has_correct_ref, "Comment should have 'E' tag referencing issue {}", issue_id ); } /// Test 7: Live sync event ordering /// /// Verifies that events arrive in chronological order when synced. /// Note: We test ordering based on created_at timestamps, allowing for /// minor timing variations inherent in async systems. /// /// Flow: /// 1. Start relay_a and relay_b (with sync enabled) /// 2. Send announcement to both relays (triggers discovery) /// 3. Publish 3 Layer 2 events to relay_a with 100ms delays between them /// 4. Collect events from relay_b /// 5. Verify events are ordered by created_at timestamp #[tokio::test] async fn test_live_sync_event_ordering() { // 1. Start relays let relay_a = TestRelay::start().await; println!( "relay_a started at {} (domain: {})", relay_a.url(), relay_a.domain() ); let relay_b = TestRelay::start_with_sync(None).await; println!( "relay_b started at {} (domain: {})", relay_b.url(), relay_b.domain() ); let keys = Keys::generate(); // 2. Create and send repository announcement to both relays let repo_id = "test-repo-ordering"; let announcement = create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); let client_a = TestClient::new(relay_a.url(), keys.clone()) .await .expect("Failed to connect to relay_a"); let client_b = TestClient::new(relay_b.url(), keys.clone()) .await .expect("Failed to connect to relay_b"); client_a .send_event(&announcement) .await .expect("Failed to send announcement to relay_a"); client_b .send_event(&announcement) .await .expect("Failed to send announcement to relay_b"); println!("Announcements sent to both relays"); // 3. Wait for discovery tokio::time::sleep(Duration::from_secs(1)).await; // 4. Create and send 3 issues with delays between them let repo_coordinate = repo_coord(&keys, repo_id); let mut issue_ids = Vec::new(); let mut expected_order_timestamps = Vec::new(); for i in 1..=3 { let issue = build_layer2_issue_event( &keys, &repo_coordinate, &format!("Ordering Test Issue {}", i), ) .expect("Failed to create issue"); // Store the created_at timestamp for ordering verification expected_order_timestamps.push(issue.created_at); issue_ids.push(issue.id); println!( "Created issue {} at timestamp {}", issue.id, issue.created_at ); client_a .send_event(&issue) .await .unwrap_or_else(|_| panic!("Failed to send issue {}", i)); // Delay between events to ensure different timestamps tokio::time::sleep(Duration::from_millis(150)).await; } client_a.disconnect().await; client_b.disconnect().await; // 5. Wait for all events to sync tokio::time::sleep(Duration::from_secs(3)).await; // 6. Fetch all events from relay_b let temp_keys = Keys::generate(); let client = Client::new(temp_keys); let events_found: Vec; if client.add_relay(relay_b.url()).await.is_ok() { client.connect().await; tokio::time::sleep(Duration::from_millis(500)).await; let filter = Filter::new().kind(Kind::GitIssue).author(keys.public_key()); match client.fetch_events(filter, Duration::from_secs(3)).await { Ok(events) => { events_found = events.into_iter().collect(); } Err(e) => { println!("Failed to fetch events: {}", e); events_found = Vec::new(); } } client.disconnect().await; } else { events_found = Vec::new(); } // 7. Verify we got events let found_count = events_found.len(); println!("Found {} events on relay_b", found_count); // Filter to only our test events (by ID) let test_events: Vec<&Event> = events_found .iter() .filter(|e| issue_ids.contains(&e.id)) .collect(); println!( "Found {} test events (out of {} total)", test_events.len(), events_found.len() ); // 8. Check ordering by created_at timestamp let mut ordered_correctly = true; if test_events.len() >= 2 { // Sort by created_at and check order matches let mut sorted_events = test_events.clone(); sorted_events.sort_by_key(|e| e.created_at); for (i, event) in sorted_events.iter().enumerate() { println!( "Event {} sorted: {} at timestamp {}", i + 1, event.id, event.created_at ); } // Verify ascending timestamp order for window in sorted_events.windows(2) { if window[0].created_at > window[1].created_at { ordered_correctly = false; println!( "Order violation: {} ({}) > {} ({})", window[0].id, window[0].created_at, window[1].id, window[1].created_at ); } } } // 9. Cleanup relay_b.stop().await; relay_a.stop().await; // Assert based on what we found // Note: We may not get all 3 events due to timing, but what we get should be ordered assert!( test_events.len() >= 2, "Should have synced at least 2 of 3 events; found {}", test_events.len() ); assert!( ordered_correctly, "Events should be ordered by created_at timestamp" ); }