From 64a9df3046e21d896f143c2fbbbefabdf86048f9 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 17:12:42 +0000 Subject: test: sync add test for pagination of historic that requires manually setting lower thresholds --- tests/sync/historic_sync.rs | 206 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 192 insertions(+), 14 deletions(-) (limited to 'tests/sync') diff --git a/tests/sync/historic_sync.rs b/tests/sync/historic_sync.rs index d13886f..c388a7f 100644 --- a/tests/sync/historic_sync.rs +++ b/tests/sync/historic_sync.rs @@ -32,11 +32,8 @@ async fn test_bootstrap_syncs_existing_layer2_events() { .kind(Kind::Custom(KIND_REPOSITORY_STATE)) .author(result.maintainer_keys.public_key()); - let synced = wait_for_event_on_relay( - result.syncing_relay.url(), - filter, - Duration::from_secs(5) - ).await; + let synced = + wait_for_event_on_relay(result.syncing_relay.url(), filter, Duration::from_secs(5)).await; // Cleanup result.syncing_relay.stop().await; @@ -73,8 +70,9 @@ async fn test_relay_replays_events_after_restart() { let synced_first = wait_for_event_on_relay( result.syncing_relay.url(), filter.clone(), - Duration::from_secs(5) - ).await; + Duration::from_secs(5), + ) + .await; println!("First sync check: {}", synced_first); @@ -96,11 +94,8 @@ async fn test_relay_replays_events_after_restart() { tokio::time::sleep(Duration::from_secs(2)).await; // Verify announcement is available on restarted syncing relay - let synced_after_restart = wait_for_event_on_relay( - syncing_new.url(), - filter, - Duration::from_secs(5) - ).await; + let synced_after_restart = + wait_for_event_on_relay(syncing_new.url(), filter, Duration::from_secs(5)).await; // Cleanup syncing_new.stop().await; @@ -220,7 +215,11 @@ async fn test_history_sync_without_negentropy() { // Start source relay let source = TestRelay::start().await; - println!("Source started at {} (domain: {})", source.url(), source.domain()); + println!( + "Source started at {} (domain: {})", + source.url(), + source.domain() + ); // Create keys let keys = Keys::generate(); @@ -290,4 +289,183 @@ async fn test_history_sync_without_negentropy() { announcement_id ); println!("SUCCESS: History sync works without negentropy (using REQ+EOSE fallback)"); -} \ No newline at end of file +} + +/// Test: Pagination for large result sets without negentropy +/// +/// Note: this only actually tests pagination if we temporary settings (PAGINATION_THRESHOLD=7, filter limit=10), +/// otherwise multiple pages aren't required to sync all events. +/// +/// This tests that historic sync correctly handles many events +/// when negentropy is disabled and pagination logic may be triggered. +/// +/// Scenario: +/// 1. Pre-allocate port for syncing relay to get its domain +/// 2. Start source relay +/// 3. Create repository announcement listing both relay domains +/// 4. Create 40 issue events (enough to trigger pagination with limit=10, threshold=7) +/// 5. Send all events to source relay BEFORE syncing relay starts +/// 6. Start syncing relay with negentropy DISABLED (forces REQ+EOSE) +/// 7. Verify all 40 issues synced correctly +/// +#[tokio::test] +#[ignore] +async fn test_pagination_for_large_historic_sync() { + // Pre-allocate syncing relay port to get its domain + let syncing_port = TestRelay::find_free_port(); + let syncing_domain = format!("127.0.0.1:{}", syncing_port); + println!("Pre-allocated syncing relay domain: {}", syncing_domain); + + // Start source relay + let source = TestRelay::start().await; + println!( + "Source started at {} (domain: {})", + source.url(), + source.domain() + ); + + // Create keys for repository owner + let keys = Keys::generate(); + let repo_id = "test-repo-pagination"; + + // Create repository announcement listing BOTH relay domains + let announcement = + create_repo_announcement(&keys, &[&source.domain(), &syncing_domain], repo_id); + println!( + "Created announcement {} for repo '{}'", + announcement.id, repo_id + ); + + // Create 40 issue events to test pagination (with limit=10, threshold=7) + let repo_coord = format!( + "{}:{}:{}", + KIND_REPOSITORY_STATE, + keys.public_key().to_hex(), + repo_id + ); + + let mut issue_events = Vec::new(); + for i in 1..=40 { + let issue = build_layer2_issue_event( + &keys, + &repo_coord, + &format!("Issue #{} - Testing large sync", i), + ) + .expect("Failed to create issue event"); + issue_events.push(issue); + } + println!( + "Created {} issue events for pagination test", + issue_events.len() + ); + + // Send announcement to source (must be accepted first for issues to reference it) + let client = TestClient::new(source.url(), keys.clone()) + .await + .expect("Failed to connect to source"); + + client + .send_event(&announcement) + .await + .expect("Failed to send announcement to source"); + println!("Announcement sent to source"); + + // Wait for announcement to be stored + tokio::time::sleep(Duration::from_millis(200)).await; + + // Send all 40 issue events to source (before syncing relay starts) + println!("Sending {} issues to source relay...", issue_events.len()); + for (i, issue) in issue_events.iter().enumerate() { + client + .send_event(issue) + .await + .unwrap_or_else(|e| panic!("Failed to send issue #{}: {}", i + 1, e)); + + // Progress indicator every 50 events + if (i + 1) % 50 == 0 { + println!(" Sent {} / {} issues", i + 1, issue_events.len()); + } + } + println!( + "All {} issues sent to source (events exist BEFORE syncing relay connects)", + issue_events.len() + ); + + client.disconnect().await; + + // Wait to ensure all events are stored + tokio::time::sleep(Duration::from_millis(500)).await; + + // NOW start syncing relay on the reserved port, with negentropy DISABLED + // This forces it to use REQ+EOSE historic sync with pagination + let syncing = TestRelay::start_on_port_with_options( + syncing_port, + Some(source.url().into()), + true, // disable_negentropy = true (force REQ+EOSE) + ) + .await; + println!( + "Syncing relay started at {} (domain: {}) - negentropy DISABLED, pagination enabled with limit=10, threshold=7", + syncing.url(), + syncing.domain() + ); + + // Wait for historic sync with pagination to complete + println!("Waiting for historic sync with pagination to complete..."); + tokio::time::sleep(Duration::from_secs(8)).await; + + // Verify announcement synced + let announcement_filter = Filter::new() + .kind(Kind::Custom(KIND_REPOSITORY_STATE)) + .author(keys.public_key()); + + let announcement_synced = + wait_for_event_on_relay(syncing.url(), announcement_filter, Duration::from_secs(3)).await; + + // Verify ALL 40 issues synced + let issues_filter = Filter::new() + .kind(Kind::Custom(KIND_ISSUE)) + .author(keys.public_key()); + + // Query for all issues + let temp_keys = Keys::generate(); + let client = Client::new(temp_keys); + client + .add_relay(syncing.url()) + .await + .expect("Failed to add syncing relay to client"); + client.connect().await; + + // Wait for connection + tokio::time::sleep(Duration::from_millis(500)).await; + + let synced_issues = client + .fetch_events(issues_filter, Duration::from_secs(5)) + .await + .expect("Failed to fetch issues from syncing relay"); + + let synced_count = synced_issues.len(); + println!("Synced {} out of 40 expected issues", synced_count); + + client.disconnect().await; + + // Cleanup + syncing.stop().await; + source.stop().await; + + // Assertions + assert!( + announcement_synced, + "Repository announcement should have synced" + ); + + assert_eq!( + synced_count, 40, + "All 40 issues should have synced via pagination (limit=10, threshold=7 should trigger multiple pages)" + ); + + println!( + "SUCCESS: Pagination worked correctly - all {} issues synced", + synced_count + ); +} -- cgit v1.2.3