diff options
| -rw-r--r-- | tests/sync/historic_sync.rs | 206 |
1 files changed, 192 insertions, 14 deletions
diff --git a/tests/sync/historic_sync.rs b/tests/sync/historic_sync.rs index d13886f..c388a7f 100644 --- a/tests/sync/historic_sync.rs +++ b/tests/sync/historic_sync.rs | |||
| @@ -32,11 +32,8 @@ async fn test_bootstrap_syncs_existing_layer2_events() { | |||
| 32 | .kind(Kind::Custom(KIND_REPOSITORY_STATE)) | 32 | .kind(Kind::Custom(KIND_REPOSITORY_STATE)) |
| 33 | .author(result.maintainer_keys.public_key()); | 33 | .author(result.maintainer_keys.public_key()); |
| 34 | 34 | ||
| 35 | let synced = wait_for_event_on_relay( | 35 | let synced = |
| 36 | result.syncing_relay.url(), | 36 | wait_for_event_on_relay(result.syncing_relay.url(), filter, Duration::from_secs(5)).await; |
| 37 | filter, | ||
| 38 | Duration::from_secs(5) | ||
| 39 | ).await; | ||
| 40 | 37 | ||
| 41 | // Cleanup | 38 | // Cleanup |
| 42 | result.syncing_relay.stop().await; | 39 | result.syncing_relay.stop().await; |
| @@ -73,8 +70,9 @@ async fn test_relay_replays_events_after_restart() { | |||
| 73 | let synced_first = wait_for_event_on_relay( | 70 | let synced_first = wait_for_event_on_relay( |
| 74 | result.syncing_relay.url(), | 71 | result.syncing_relay.url(), |
| 75 | filter.clone(), | 72 | filter.clone(), |
| 76 | Duration::from_secs(5) | 73 | Duration::from_secs(5), |
| 77 | ).await; | 74 | ) |
| 75 | .await; | ||
| 78 | 76 | ||
| 79 | println!("First sync check: {}", synced_first); | 77 | println!("First sync check: {}", synced_first); |
| 80 | 78 | ||
| @@ -96,11 +94,8 @@ async fn test_relay_replays_events_after_restart() { | |||
| 96 | tokio::time::sleep(Duration::from_secs(2)).await; | 94 | tokio::time::sleep(Duration::from_secs(2)).await; |
| 97 | 95 | ||
| 98 | // Verify announcement is available on restarted syncing relay | 96 | // Verify announcement is available on restarted syncing relay |
| 99 | let synced_after_restart = wait_for_event_on_relay( | 97 | let synced_after_restart = |
| 100 | syncing_new.url(), | 98 | wait_for_event_on_relay(syncing_new.url(), filter, Duration::from_secs(5)).await; |
| 101 | filter, | ||
| 102 | Duration::from_secs(5) | ||
| 103 | ).await; | ||
| 104 | 99 | ||
| 105 | // Cleanup | 100 | // Cleanup |
| 106 | syncing_new.stop().await; | 101 | syncing_new.stop().await; |
| @@ -220,7 +215,11 @@ async fn test_history_sync_without_negentropy() { | |||
| 220 | 215 | ||
| 221 | // Start source relay | 216 | // Start source relay |
| 222 | let source = TestRelay::start().await; | 217 | let source = TestRelay::start().await; |
| 223 | println!("Source started at {} (domain: {})", source.url(), source.domain()); | 218 | println!( |
| 219 | "Source started at {} (domain: {})", | ||
| 220 | source.url(), | ||
| 221 | source.domain() | ||
| 222 | ); | ||
| 224 | 223 | ||
| 225 | // Create keys | 224 | // Create keys |
| 226 | let keys = Keys::generate(); | 225 | let keys = Keys::generate(); |
| @@ -290,4 +289,183 @@ async fn test_history_sync_without_negentropy() { | |||
| 290 | announcement_id | 289 | announcement_id |
| 291 | ); | 290 | ); |
| 292 | println!("SUCCESS: History sync works without negentropy (using REQ+EOSE fallback)"); | 291 | println!("SUCCESS: History sync works without negentropy (using REQ+EOSE fallback)"); |
| 293 | } \ No newline at end of file | 292 | } |
| 293 | |||
| 294 | /// Test: Pagination for large result sets without negentropy | ||
| 295 | /// | ||
| 296 | /// Note: this only actually tests pagination if we temporary settings (PAGINATION_THRESHOLD=7, filter limit=10), | ||
| 297 | /// otherwise multiple pages aren't required to sync all events. | ||
| 298 | /// | ||
| 299 | /// This tests that historic sync correctly handles many events | ||
| 300 | /// when negentropy is disabled and pagination logic may be triggered. | ||
| 301 | /// | ||
| 302 | /// Scenario: | ||
| 303 | /// 1. Pre-allocate port for syncing relay to get its domain | ||
| 304 | /// 2. Start source relay | ||
| 305 | /// 3. Create repository announcement listing both relay domains | ||
| 306 | /// 4. Create 40 issue events (enough to trigger pagination with limit=10, threshold=7) | ||
| 307 | /// 5. Send all events to source relay BEFORE syncing relay starts | ||
| 308 | /// 6. Start syncing relay with negentropy DISABLED (forces REQ+EOSE) | ||
| 309 | /// 7. Verify all 40 issues synced correctly | ||
| 310 | /// | ||
| 311 | #[tokio::test] | ||
| 312 | #[ignore] | ||
| 313 | async fn test_pagination_for_large_historic_sync() { | ||
| 314 | // Pre-allocate syncing relay port to get its domain | ||
| 315 | let syncing_port = TestRelay::find_free_port(); | ||
| 316 | let syncing_domain = format!("127.0.0.1:{}", syncing_port); | ||
| 317 | println!("Pre-allocated syncing relay domain: {}", syncing_domain); | ||
| 318 | |||
| 319 | // Start source relay | ||
| 320 | let source = TestRelay::start().await; | ||
| 321 | println!( | ||
| 322 | "Source started at {} (domain: {})", | ||
| 323 | source.url(), | ||
| 324 | source.domain() | ||
| 325 | ); | ||
| 326 | |||
| 327 | // Create keys for repository owner | ||
| 328 | let keys = Keys::generate(); | ||
| 329 | let repo_id = "test-repo-pagination"; | ||
| 330 | |||
| 331 | // Create repository announcement listing BOTH relay domains | ||
| 332 | let announcement = | ||
| 333 | create_repo_announcement(&keys, &[&source.domain(), &syncing_domain], repo_id); | ||
| 334 | println!( | ||
| 335 | "Created announcement {} for repo '{}'", | ||
| 336 | announcement.id, repo_id | ||
| 337 | ); | ||
| 338 | |||
| 339 | // Create 40 issue events to test pagination (with limit=10, threshold=7) | ||
| 340 | let repo_coord = format!( | ||
| 341 | "{}:{}:{}", | ||
| 342 | KIND_REPOSITORY_STATE, | ||
| 343 | keys.public_key().to_hex(), | ||
| 344 | repo_id | ||
| 345 | ); | ||
| 346 | |||
| 347 | let mut issue_events = Vec::new(); | ||
| 348 | for i in 1..=40 { | ||
| 349 | let issue = build_layer2_issue_event( | ||
| 350 | &keys, | ||
| 351 | &repo_coord, | ||
| 352 | &format!("Issue #{} - Testing large sync", i), | ||
| 353 | ) | ||
| 354 | .expect("Failed to create issue event"); | ||
| 355 | issue_events.push(issue); | ||
| 356 | } | ||
| 357 | println!( | ||
| 358 | "Created {} issue events for pagination test", | ||
| 359 | issue_events.len() | ||
| 360 | ); | ||
| 361 | |||
| 362 | // Send announcement to source (must be accepted first for issues to reference it) | ||
| 363 | let client = TestClient::new(source.url(), keys.clone()) | ||
| 364 | .await | ||
| 365 | .expect("Failed to connect to source"); | ||
| 366 | |||
| 367 | client | ||
| 368 | .send_event(&announcement) | ||
| 369 | .await | ||
| 370 | .expect("Failed to send announcement to source"); | ||
| 371 | println!("Announcement sent to source"); | ||
| 372 | |||
| 373 | // Wait for announcement to be stored | ||
| 374 | tokio::time::sleep(Duration::from_millis(200)).await; | ||
| 375 | |||
| 376 | // Send all 40 issue events to source (before syncing relay starts) | ||
| 377 | println!("Sending {} issues to source relay...", issue_events.len()); | ||
| 378 | for (i, issue) in issue_events.iter().enumerate() { | ||
| 379 | client | ||
| 380 | .send_event(issue) | ||
| 381 | .await | ||
| 382 | .unwrap_or_else(|e| panic!("Failed to send issue #{}: {}", i + 1, e)); | ||
| 383 | |||
| 384 | // Progress indicator every 50 events | ||
| 385 | if (i + 1) % 50 == 0 { | ||
| 386 | println!(" Sent {} / {} issues", i + 1, issue_events.len()); | ||
| 387 | } | ||
| 388 | } | ||
| 389 | println!( | ||
| 390 | "All {} issues sent to source (events exist BEFORE syncing relay connects)", | ||
| 391 | issue_events.len() | ||
| 392 | ); | ||
| 393 | |||
| 394 | client.disconnect().await; | ||
| 395 | |||
| 396 | // Wait to ensure all events are stored | ||
| 397 | tokio::time::sleep(Duration::from_millis(500)).await; | ||
| 398 | |||
| 399 | // NOW start syncing relay on the reserved port, with negentropy DISABLED | ||
| 400 | // This forces it to use REQ+EOSE historic sync with pagination | ||
| 401 | let syncing = TestRelay::start_on_port_with_options( | ||
| 402 | syncing_port, | ||
| 403 | Some(source.url().into()), | ||
| 404 | true, // disable_negentropy = true (force REQ+EOSE) | ||
| 405 | ) | ||
| 406 | .await; | ||
| 407 | println!( | ||
| 408 | "Syncing relay started at {} (domain: {}) - negentropy DISABLED, pagination enabled with limit=10, threshold=7", | ||
| 409 | syncing.url(), | ||
| 410 | syncing.domain() | ||
| 411 | ); | ||
| 412 | |||
| 413 | // Wait for historic sync with pagination to complete | ||
| 414 | println!("Waiting for historic sync with pagination to complete..."); | ||
| 415 | tokio::time::sleep(Duration::from_secs(8)).await; | ||
| 416 | |||
| 417 | // Verify announcement synced | ||
| 418 | let announcement_filter = Filter::new() | ||
| 419 | .kind(Kind::Custom(KIND_REPOSITORY_STATE)) | ||
| 420 | .author(keys.public_key()); | ||
| 421 | |||
| 422 | let announcement_synced = | ||
| 423 | wait_for_event_on_relay(syncing.url(), announcement_filter, Duration::from_secs(3)).await; | ||
| 424 | |||
| 425 | // Verify ALL 40 issues synced | ||
| 426 | let issues_filter = Filter::new() | ||
| 427 | .kind(Kind::Custom(KIND_ISSUE)) | ||
| 428 | .author(keys.public_key()); | ||
| 429 | |||
| 430 | // Query for all issues | ||
| 431 | let temp_keys = Keys::generate(); | ||
| 432 | let client = Client::new(temp_keys); | ||
| 433 | client | ||
| 434 | .add_relay(syncing.url()) | ||
| 435 | .await | ||
| 436 | .expect("Failed to add syncing relay to client"); | ||
| 437 | client.connect().await; | ||
| 438 | |||
| 439 | // Wait for connection | ||
| 440 | tokio::time::sleep(Duration::from_millis(500)).await; | ||
| 441 | |||
| 442 | let synced_issues = client | ||
| 443 | .fetch_events(issues_filter, Duration::from_secs(5)) | ||
| 444 | .await | ||
| 445 | .expect("Failed to fetch issues from syncing relay"); | ||
| 446 | |||
| 447 | let synced_count = synced_issues.len(); | ||
| 448 | println!("Synced {} out of 40 expected issues", synced_count); | ||
| 449 | |||
| 450 | client.disconnect().await; | ||
| 451 | |||
| 452 | // Cleanup | ||
| 453 | syncing.stop().await; | ||
| 454 | source.stop().await; | ||
| 455 | |||
| 456 | // Assertions | ||
| 457 | assert!( | ||
| 458 | announcement_synced, | ||
| 459 | "Repository announcement should have synced" | ||
| 460 | ); | ||
| 461 | |||
| 462 | assert_eq!( | ||
| 463 | synced_count, 40, | ||
| 464 | "All 40 issues should have synced via pagination (limit=10, threshold=7 should trigger multiple pages)" | ||
| 465 | ); | ||
| 466 | |||
| 467 | println!( | ||
| 468 | "SUCCESS: Pagination worked correctly - all {} issues synced", | ||
| 469 | synced_count | ||
| 470 | ); | ||
| 471 | } | ||